其实 Kubernetes 官方提供了各种语言的客户端类库,但是由于 golang 在云原生领域的先天优势,client-go 相对来说是使用较多的类库。但是要把 client-go 在一片天文章中讲清楚实属困难,所以本文不可能覆盖到所有的细节,尽量追求将主要框架描述清楚,并且配合代码片段探讨 client-go 常用的接口以及使用方法。

client-go 主体框架

其实要了解 client-go 的主体功能模块以及各个模块的功能,对于 Kubernetes API 的设计理念是基础,特别是对于 Kubernetes API 分组与版本转换。还不清楚的同学强烈建议先去阅读我之前的笔记:

我们先来瞄一眼 client-go 的主要代码结构,我会给出各个主要部分的核心功能让大家有一个感性的认识:

$ tree -L 2 client-go
client-go
├── discovery    # 包含dicoveryClient,用于发现k8s所支持GVR(Group/Version,/Resource),'kubectl api-resources'命令正是使用它来列出cluster中的各种资源。
├── dynamic  # 包含dynamicClient,它封装了 RESTClient,可以动态的指定api资源的GVR,结合unstructured.Unstructured类型来访问各种类型的k8s资源(如: Pod,Deploy...),也可以访问用户自定义资源(CRD)。
├── informers # 为了减少client对于apiserver的频繁访问,需要informer来缓存apiserver中资源,只有当api资源对象发生变化的时候才收到通知。每种api资源会有自己的informer实现,也是按照api分组与版本来区分。
├── kubernetes # 主要定义ClientSet,它也对restClient进行了封装,并且包含对各种k8s资源和版本的管理方法。每个api资源有单独的client,而ClientSet则是多个客户端的集合。ClientSet以及每种k8s内置资源的client的所有请求最终还是由restClient发出的;在typed目录包括具体每种k8s内置资源的client实现,也是按照api分组与版本来区分。
│   ├── clientset.go
│   └── typed
├── listers # 包含各种k8s内置资源的只读客户端。每种lister都有Get()和List()方法,并且结果都是从缓存中读取的。
├── rest # 包含真正给apiserver发请求的client,实现了Restful的API,同时支持Protobuf和JSON格式数据。
├── scale # 只要包含scalClient用于Deploy, RS等的扩/缩容。
├── tools # 各种类型的工具包,常见的比如获取kubeconfig的方法,以SharedInformer、Reflector、DealtFIFO和Indexer等工具,这些工具主要用于实现client查询和缓存机制,减轻apiserver的负载等。

Note: 为了简化,不重要的文件与目录没有列出来。

总的来说 client-go 主要功能模块以及各模块的依赖关系大致如下面这张图所示:

k8s-client-go-arch.jpg

可以看到,不管是各种静态类型的客户端 ClientSet ,动态客户端 DynamicClient 还是资源发现客户端 DiscoveryClient 都封装了 restClient,也就是说最后请求的发送都是有 restClient 发送给 kube-apiserver 的。而 k8s.io/api-machinery 中 API 资源的分组与版本是所有类型客户端的基础,具体每种 API 资源的定义则是包含在 k8s.io/api 模块。

ClientSet

ClientSet 是各种 API 资源的静态类型的客户端的集合,它使用预生成的 API 对象来与 kube-apiserver 进行交互,类似于 RPC 的变成体验,好处是类型化的客户端使用程序编译来强制执行数据安全性和一些验证,但同时也带来了版本与类型强耦合的问题。

type Clientset struct {
    *authenticationv1beta1.AuthenticationV1beta1Client
    *authorizationv1.AuthorizationV1Client
    // ...
    *corev1.CoreV1Client
}

使用静态类型客户端的典型例子:

import (
    //...
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    kubeconfig := filepath.Join(
         os.Getenv("HOME"), ".kube", "config",
    )
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    // use the following one if running in cluster
    // config, err := rest.InClusterConfig()
    if err != nil {
        log.Fatal(err)
    }

    // create the typed ClientSet
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }

    // get the typed client for resources in CoreV1
    coreV1Client := clientset.CoreV1()

    // setup list options
    listOptions := metav1.ListOptions{
        LabelSelector: label, 
        FieldSelector: field,
    }

    // list PVCs with typed client
    pvcs, err := coreV1Client.PersistentVolumeClaims(ns).List(listOptions)
    if err != nil {
        log.Fatal(err)
    }
    printPVCs(pvcs)
}

在几乎所有情况下,除了 Server Side Apply(SSA) 之外,我们都应优先使用静态类型化的客户端而不是动态客户端。关于 SSA 以后有机会探讨一下。

DynamicClient

DynamicClient 也就是动态客户端,它没有使用 k8s.io/api 中定义的 的各种 API 资源的 Go 结构体,而是使用 unstructured.Unstructured 表示所有资源对象。unstructured.Unstructured 类型使用一个嵌套的 map[string]inferface{} 值来表示 API 资源的内部结构,该结构和服务端的 REST 负载非常相似。

动态客户端将所有数据绑定推迟到运行时,这意味着程序运行之前,使用动态客户端的程序将执行类型验证。对于某些需要强数据类型检查和验证的应用程序来说,这可能是一个问题。随之而来的好处是,松耦合意味着当客户端 API 资源对象发生变化时,使用动态客户端的程序不需要重新编译。客户端程序在处理 API 表面更新时具有更大的灵活性,而无需提前知道这些更改是什么。

import (
    //...
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
)

func main() {
    kubeconfig := filepath.Join(
         os.Getenv("HOME"), ".kube", "config",
    )
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    // use the following one if running in cluster
    // config, err := rest.InClusterConfig()
    if err != nil {
        log.Fatal(err)
    }

    // create the dynamic client
    dymClient, _ := dynamic.NewForConfig(config)
    podGVR := schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}

    // api resource object with type unstructured.Unstructured, also can be unmarshaled from JSON/YAML
    pod := &unstructured.Unstructured{
        Object: map[string]interface{}{
            "apiVersion": "v1",
            "kind":       "Pod",
            "metadata": map[string]interface{}{
                "name": "web",
            },
            "spec": map[string]interface{}{
                "serviceAccount": "default",
                ...
            }
        }
    }

    // create the resource with dynamic client
    dymClient.Resource(podGVR).Namespace(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
}

此外,使用动态客户端实现 Server Side Apply(SSA) 非常方便,感兴趣的读者可以看看这篇文章

DiscoveryClient

上面介绍的静态客户端和动态客户端都是面向资源对象的,例如创建 Pod,查看 PVC 等,DiscoveryClient 则是关注于发现资源,例如查看当前集群里面注册了哪些资源各个资源的版本信息等。DiscoveryClient 也封装了 restClient 并由它来真正地与 kube-apiserver 进行交互。

使用方式基本与静态客户端和动态客户端类似,我们来看一个使用 DiscoveryClient 的例子:

package main

import (
	// ...
	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
    kubeconfig := filepath.Join(
         os.Getenv("HOME"), ".kube", "config",
    )
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    // use the following one if running in cluster
    // config, err := rest.InClusterConfig()
    if err != nil {
        log.Fatal(err)
    }

    // create dynamic client from config
	dc, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		log.Fatal(err)
	}

    // get all the api groups
	apiGroup, apiResourceList, err := discoveryClient.ServerGroupsAndResources()
	if err != nil {
		log.Fatal(err)
	}

	log.Printf("APIGroup :\n %v\n\n\n", apiGroup)

    // apiResourceList is slice that contains item of GVR
	for _, apiRes := range apiResourceList {
		// GroupVersion is string of 'G/V' 
		gvStr := apiRes.GroupVersion
		gv, err := schema.ParseGroupVersion(gvStr)
		if err != nil {
			log.Fatal(err)
		}

		log.Printf("GroupVersion: %v\n", gvStr)
		// APIResources contains all the reosurces in current GroupVersion
		for _, res := range apiRes.APIResources {
			log.Printf("%v\n", res.Name)
		}
	}
}

需要说明的是,kubectl api-resources 命令正是使用 DiscoveryClient 来发现集群中的资源以及分组版本信息。

Informer

Informer 对于实现 kubernetes 的 controller 模式非常重要。我们知道 controller 的主要工作模式就是查看集群中资源的当前状态并且对比资源期望的状态,如果不相同则去执行一些列的指令来让两者的状态一致。但问题是,不停地向 kube-apiserver 检索当前资源的状态是个很耗费资源的操作,如果有一种缓存机制能让 controller 只有在资源状态发生变化的时候得到通知,这时候 controller 再执行指令就能减轻 kube-apisever 的负载。这种机制正是 client-go 中 informer 提供的核心功能。实际上,Informer 在初始化的时候会从 kube-apisever 中获取对应资源的所有对象,之后只会通过 Watch 机制接收 kube-apisever 推送过来的数据,不会再主动拉取数据,直接使用本地缓存中的数据以减少对 kube-apisever 的压力。

要创建一个 informer 很简单:

store, controller := cache.NewInformer {
	&cache.ListWatch{},
	&v1.Pod{},
	resyncPeriod, // how often the controller goes through all items remaining in the cache and fires the UpdateFunc again
	cache.ResourceEventHandlerFuncs{},

可以看到,创建 informer 第一个参数是一个 ListWatch 的接口类型,它需要知道如何 list/watch 资源,最简单的 ListWatch 如下:

lw := cache.NewListWatchFromClient(
      client,
      &v1.Pod{},
      api.NamespaceAll,
      fieldSelector)

当然,要创建自己的 ListWatch 也不复杂,主要是得清楚如何 list/watch 资源:

cache.ListWatch {
	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
		return client.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			FieldsSelectorParam(fieldSelector).
			Do().
			Get()
	}
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		return client.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			FieldsSelectorParam(fieldSelector).
			Watch()
	}
}

可以看到,创建 Informer 最后一个参数是一个回调函数 ResourceEventHandlerFuncs, 它知道如何处理资源对象发生变化的事件。具体来说,它必须包含 AddFuncUpdateFunc 以及 DeleteFunc

type ResourceEventHandlerFuncs struct {
	AddFunc    func(obj interface{})
	UpdateFunc func(oldObj, newObj interface{})
	DeleteFunc func(obj interface{})
}

实际上,Informer 很少被使用,用的比较多的是 SharedInformer,但是理解 Informer 对于开发 controller 大有裨益。

SharedInformer

Informer 会创建一些列的缓存,这种情况在一个程序中存在多个 controller 同时创建访问资源的 Informer 的时候会变得糟糕,不同 controller 创建的 Informer 的缓存会存在重叠,导致资源的消耗不必要的增加。好在有 SharedInformer,它对于多个 controller 维持一份 informer 需要的缓存,除此之外,SharedInformer 对于上游的服务器端也只维护一份 watch ,不管下游的 consumer 有多少。这种情况在 kube-controller-manager 中很常见。

lw := cache.NewListWatchFromClient()
sharedInformer := cache.NewSharedInformer(lw, &api.Pod{}, resyncPeriod)

Workqueue

前面说明的 ShareInformer 存在一个问题,它没有办法跟踪每个 controller 的状态,所以 controller 必须自己提供队列与重试机制。client-go 提供了 workqueue 来实现这一功能,SharedInformer只需要把 Watch 到的资源变化事件放到 workqueue 中,controller 作为消费者会从其中取出事件来处理。

初始化一个 带有限流的 workqueue :

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

关于 controller 的 worker 何时开始从 workqueue 取出事件来工作,最佳实践如下所示:

controller.informer = cache.NewSharedInformer(...)
controller.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

controller.informer.Run(stopCh)

if !cache.WaitForCacheSync(stopCh, controller.HasSynched)
{
	log.Errorf("Timed out waiting for caches to sync"))
}

// Now start processing
controller.runWorker()

完整示例

我们来看一个完整的使用 SharedInformer 与 workqueue 的 controller 的示例:

import (
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/apimachinery/pkg/util/wait"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/client-go/kubernetes"
)

// Controller
type Controller struct {
      clientset    kubernetes.Interface
      queue        workqueue.RateLimitingInterface
      informer     cache.SharedIndexInformer
      eventHandler handlers.Handler
}

func Start(kubeClient kubernetes.Interface, conf *config.Config, eventHandler handlers.Handler) {
    c := newController(kubeClient, eventHandler)
    stopCh := make(chan struct{})
    defer close(stopCh)

    go c.Run(stopCh) // start the controller

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGTERM)
    signal.Notify(sigterm, syscall.SIGINT)
    <-sigterm // wait for system term signal
}

// newController creates the controller
func newController(client kubernetes.Interface, eventHandler handlers.Handler) *Controller {
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	informer := cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				return client.CoreV1().Pods(metav1.NamespaceAll).List(options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				return client.CoreV1().Pods(metav1.NamespaceAll).Watch(options)
			},
		},
		&corev1.Pod{},
		0, // no resync
		cache.Indexers{},
	)

	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		UpdateFunc: func(old, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	})

	return &Controller{
		clientset:    client,
		informer:     informer,
		queue:        queue,
		eventHandler: eventHandler,
	}
}

// Run starts the controller
func (c *Controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()
	go c.informer.Run(stopCh)
	if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
        utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    // runWorker will loop until "something bad" happens. The .Until will then rekick the worker after one second
	wait.Until(c.runWorker, time.Second, stopCh)
}

// HasSynced is required for the cache.Controller interface.
func (c *Controller) HasSynced() bool {
    return c.informer.HasSynced()
}

// LastSyncResourceVersion is required for the cache.Controller interface.
func (c *Controller) LastSyncResourceVersion() string {
    return c.informer.LastSyncResourceVersion()
}

func (c *Controller) runWorker() {
    // processNextWorkItem will automatically wait until there's work available
    for c.processNextItem() {
        // continue looping
    }
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *Controller) processNextItem() bool {
	key, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(key)

	err := c.processItem(key.(string))
	if err == nil {
		// No error, reset the ratelimit counters
		c.queue.Forget(key)
	} else if c.queue.NumRequeues(key) < maxRetries {
        // err != nil and retry
		c.queue.AddRateLimited(key)
	} else {
		// err != nil and too many retries
		c.queue.Forget(key)
		utilruntime.HandleError(err)
	}
	return true
}

// processItem processs change of object
func (c *Controller) processItem(key string) error {
	obj, exists, err := c.informer.GetIndexer().GetByKey(key)
	if err != nil {
		return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
	}
	if !exists {
		c.eventHandler.ObjectDeleted(obj)
		return nil
	}
	c.eventHandler.ObjectCreated(obj)
	return nil
}

最后,我们再来看看关于 controller 模式的经典图示:

client-go-controller-interaction