hacomono TECH BLOG

フィットネスクラブやスクールなどの顧客管理・予約・決済を行う、業界特化型SaaS「hacomono」を提供する会社のテックブログです!

K8sのControllerに入門してみた

この記事は hacomono Advent Calendar 2024 9日目の記事です。

こんにちは!基盤本部プラットフォームグループのかいかいです。
今回初めてアドベントカレンダーなるものに参加してみました。
どんな話題でも OK とハードルを低くしてもらっているので、今回は Kubernetes 周りを学習したことを話したいと思います。
組織での取り組みではなく、個人の取り組みになりますのでご了承ください!

経緯

私はお家 k8s などを行っており、Kubernetes でできることや利用方法はなんとなくわかってきたのですが、Kubernetes の内部で起きていることまでは理解できていないなと感じていました。
また、今後 Kubernetes を採用しても運用できるように、有識者による Istio 勉強会が最近始まり、Kubernetes の内部詳細を知る必要性も出てきたと感じています。
そこで、Kubernetes の内部詳細を知ることができると噂のProgramming Kubernetesや公式のサンプル、Kubernetes 本体のコードなどを読んでみることにしました。
多くの学びがありましたので、今回はその中でも Controller にフォーカスして学んだ内容を共有したいと思います。

Controller の概要

以下は今回知ることができたものの概要です。

  • kube-apiserver は リソース(Pod,Deployment など)のバージョン(v1,v1beta1 など)ごとに CRUD な API や リソースの状態を監視できる watch クエリなどを提供している
  • リソースの状態は etcd に保存されており、CRUD な API が叩かれると kube-apiserver によって etcd に状態が保存・更新される
  • Controller は kube-apiserver の watch を使って対象リソースの状態を監視し、状態変更を検知すると何かしらのロジック実行し、あるべき状態へ調整を行う
  • client-goは Kubernetes の API を使うためのライブラリで Kubernetes のコードベースにも使われており、Controller 内のロジックにも使われていることが多い

上の説明を簡単に図にすると以下のようになります。

図のように kube-apiserver がリクエストを受け、リソースの状態を変更することができます。
すると Controller は kube-apiserver からリソースの状態変更を検知します。
そして何かしらのロジックを実行し、kube-apiserver に状態変更のリクエストを送ります。
するとまた kube-apiserver がリソースの状態を変更し、関係のある Controller がそれを検知...というループが続いていきます。
このようにして Kubernetes はリソースの状態を管理しているということがわかりました。
また、Deployment 作成による Pod 起動/更新の流れが Programming Kubernetes に書かれてあり、それが大変参考になったので紹介させていただきます。

  1. 開発者などが Deployment を作成する旨を kube-apiserver にリクエストする
  2. deployment-controller が kube-apiserver から Deployment 作成のイベントを検知する
  3. deployment-controller 内のロジックで Replicaset を作成する旨を kube-apiserver にリクエストする
  4. replicaset-controller が kube-apiserver から Replicaset 作成のイベントを検知する
  5. replicaset-controller 内のロジックで Pod を作成する旨を kube-apiserver にリクエストする
  6. Pod 作成のイベントを複数のコンポーネントが検知する
    • kube-scheduler が kube-apiserver から Pod 作成のイベントを検知する。初期の Pod は Node 指定が空文字になっているので、kube-scheduler のロジックで Pod をどの Node に配置するか決定する
    • kubelet も kube-apiserver から Pod 作成のイベントを検知するが、初期の Pod は Node 指定が空文字になっているので、kubelet はその Pod を無視する
  7. kube-scheduler が Pod の Node 指定を決定したら、kube-apiserver に Pod の Node 指定を更新する旨のリクエストを送る
  8. kubelet が Pod の更新イベントを検知
  9. kubelet は Pod の Node 指定を確認し、自 Node であれば Pod を起動し、Pod のステータスを kube-apiserver に更新する
  10. Replicaset は Pod ステータスの更新を検知するが、特に何もしない
  11. もし、Pod が停止した場合、Pod の管理をしている kubelet が Pod のステータスを更新し、kube-apiserver に通知する
  12. Replicaset は Pod ステータスの更新を確認し、Pod が停止していることを検知すると、停止した Pod を削除し、新たな Pod を作成する旨のリクエストを kube-apiserver に送る
  13. などなど。。。

以下の図は 1〜9 の Pod 起動までを図にしたものです。

概要レベルですが、個人的には Kubernetes 内の処理が分かりよかったです。
ここからは実際にsample-controller という 公式のサンプル Controller のコードを見ていきながら Controller 内部の仕組みを学んでいきます。
sample-controller は Foo というカスタムリソースを管理する Controller で、Foo を作成すると nginx を動かす Deployment を作成し、その Deployment が想定通りに動いているかを管理するものです。
個人的に Controller の挙動を知っておくと実際の Kubernetes のコードを読む際にも役立つと感じるようになりましたのでお勧めです。

sample-controller を見てみる

sample-controller の中でも今回はController 実装部分を読んでいきます。
主な Controller の主要ロジックは以下の client-go ライブラリのコンポーネントを使って実装されています。

  • clientset
    • clientset は kube-apiserver に対して CRUD なリクエストを送るためのクライアント
    • sample-controller では kubeclientset と sampleclientset の 2 つのクライアントが利用されている
      • kubeclientset は Kubernetes の定義済みリソース(Pod や Deployment など)に対して CRUD 操作が可能
      • sampleclientset は sample-controller で管理する Foo リソースの CRUD 操作が可能
  • informer
    • informer は Kubernetes からリソースのリストを取得し、キャッシュしたり、Kubernetes の watch API を利用して監視し、リソースの変更を通知するコンポーネント
    • Lister メソッドによりリソースのリストを取得するためのコンポーネントを生成可能
    • sample-controller では deploymentsLister と foosLister が利用されている
    • clientset を使って kube-apiserver へ直接リストを取得するのではなく、informer を使ってリストを取得することで、kube-apiserver への負荷を軽減することができる
    • AddEventHandler メソッドにより、リソースの変更を検知した際に実行するロジックを登録可能
  • workqueue
    • workqueue は Controller が処理するリソースをキューイングするためのコンポーネント
    • rate limit などのユースケースに合わせた様々なキューイングが可能

sample-controller の NewController 関数を見てみると Controller の初期化には上記コンポーネントが使われていることがわかります。

func NewController(
    ctx context.Context,
    kubeclientset kubernetes.Interface,
    sampleclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer,
    fooInformer informers.FooInformer) *Controller {
  ...
  controller := &Controller{
        kubeclientset:     kubeclientset,
        sampleclientset:   sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        foosLister:        fooInformer.Lister(),
        foosSynced:        fooInformer.Informer().HasSynced,
        workqueue:         workqueue.NewTypedRateLimitingQueue(ratelimiter),
        recorder:          recorder,
    }
  ...
    fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueFoo(new)
        },
    })
  ...
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
      ...
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })
    return controller
}

そして、sample-controller 含め主な Controller では clientset,informer,workqueue を使って以下の 1,2 をループして動作しています。

  1. informer にリソースの変更を検知した後に動作するロジックを登録
  2. informer がリソースの変更を検知すると、イベントに合わせて登録されたロジックを実行
    • 様々なコントローラを見ると、変更されたリソースを workqueue にキューイングする処理が多い印象
  3. workqueue からリソースを取り出して、リソースの状態を確認し、必要な処理を行う
    • 例えば、リソースをあるべき状態に変更して、clientset を使って リソース更新を kube-apiserver にリクエストするなど

図で表すと以下のようになります。

sample-controller では fooInformer.Informer().AddEventHandler と deploymentInformer.Informer().AddEventHandler にて、Foo リソースと Deployment リソースの変更に対してそれぞれのロジックを登録しています。
sample-controller の主なロジックは、Run メソッド内で実行されている以下の wait.UntilWithContext メソッドです。wait.UntilWithContext は指定した時間ごとに引数の c.runWorker メソッドを実行する処理です。

func (c *Controller)Run(ctx context.Context, workers int) {
  ...
    // Launch two workers to process Foo resources
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }
  ...
}

そして runWorker メソッドは以下のようになっています。

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextWorkItem(ctx) {
    }
}

runWorker 内でループ実行されている processNextWorkItem メソッドは以下のように workqueue からリソースを取り出して、syncHandler メソッドを実行しています。

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    objRef, shutdown := c.workqueue.Get()
    ...
    defer c.workqueue.Done(objRef)
    // Run the syncHandler, passing it the structured reference to the object to be synced.
    err := c.syncHandler(ctx, objRef)
    if err == nil {
        // If no error occurs then we Forget this item so it does not
        // get queued again until another change happens.
        c.workqueue.Forget(objRef)
        logger.Info("Successfully synced", "objectName", objRef)
        return true
    }
  ...
    return true
}

syncHandler メソッドはリソースの状態を確認し、必要な処理を行うメソッドで、sample-controller では以下のようになっています。

func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
  ...
    // Get the Foo resource with this namespace/name
    foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)
  ...
    deploymentName := foo.Spec.DeploymentName
  ...
    // Get the deployment with the name specified in Foo.spec
    deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
    // If the resource doesn't exist, we'll create it
    if errors.IsNotFound(err) {
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(ctx, newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
    }
  ...
    // If this number of the replicas on the Foo resource is specified, and the
    // number does not equal the current desired replicas on the Deployment, we
    // should update the Deployment resource.
    if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
        logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(ctx, newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
    }
    // Finally, we update the status block of the Foo resource to reflect the
    // current state of the world
    err = c.updateFooStatus(ctx, foo, deployment)
  ...
    return nil
}

ところどころ省略していますが、概ねやっていることは、

  1. objectRef(processNextWorkItem メソッド内で workqueue から取り出したリソース)を元に Foo リソースの状態を Lister から取得
  2. Foo リソースの Deployment 名を取得して、その情報を基に Deployment リソースの状態を Lister から取得
  3. Lister から対象の Deployment が取得できなかった場合は Deployment を作成
  4. Foo リソースの Replicas が指定されている場合、Deployment の Replicas と比較して異なる場合は Deployment を更新
  5. Foo リソースのステータスを更新

といった感じです。
最後の Foo リソースのステータス更新は以下のように現在稼働している Deployment の Replicas を Foo リソースのステータスに反映しています。

func (c *Controller) updateFooStatus(ctx context.Context, foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error {
    ...
    fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
    ...
    _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(ctx, fooCopy, metav1.UpdateOptions{FieldManager: FieldManager})
    return err
}

ちなみに deployment-controller など他の Controller も少し見てみましたが、sample-controller と同じような構成で、各々の syncHandler をループして処理を行っている感じでした。

おわりに

今回は Kubernetes の Controller に入門してみた話を書いてみました。
まだまだ理解が浅い部分も多いですが、なんとなく Kubernetes の内部の仕組みがわかってきた気がします。
また、実際に内部コードを読むことも大変刺激になり、よかったです。
最初は訳わからないコードも、Programming Kubernetes のおかげでなんとなくですが、理解できるようになりました。
内部コードを追っていくのは相当大変ですが、今後も続けていければなと思います。
また、今度は自分独自の Controller を作成してみようと思います!
ここまで読んでいたただきありがとうございました!


株式会社hacomonoでは一緒に働く仲間を募集しています。
エンジニア採用サイトや採用ウィッシュリストもぜひご覧ください!