controller manager运行控制器程序,是集群中处理常规任务的进程。逻辑上,每个控制器都是一个单独的执行进程,为了降低复杂性,内置控制器被编译成一个二进制,在单个进程中运行。

Informer架构

对于自定义的API对象,需要编写types.go定义该对象,使用k8s自带的code-generator工具生成client-go使用的代码,里面就包含了对API对象变化进行处理的的informer框架代码。

Informer实现了持续获取集群的所有资源对象、监听集群的资源对象变化功能,并在本 地维护了全量资源对象的内存缓存,以减少对apiserver、对etcd的请求压⼒。Informers在启动的时候会⾸先在客户端调⽤List接⼝来获取全量的对象集合,然后通过Watch接⼝来获取增量的对象,然后更新本地缓存。

  • Informer也有很强的健壮性,当⻓期运⾏的watch连接中断时,informer会尝试拉起⼀个新的watch请求来恢复连接,在不丢失任何事件的情况下恢复事件流。
  • Informer还可以配置⼀个重新同步的周期参数,每间隔该周期,informers就会重新List全量数据。

在Informer的使⽤上,通常每个GroupVersionResource(GVR)都实例化⼀个informer,但有时候我们在⼀个应⽤中往往会在多个地⽅对同⼀种资源对象都有 informer的需求,所以就有了共享informer,即SharedInformerFactory。所以可以通过使⽤SharedInformerFactory来实例化informers,这样本地内存缓存就只 有⼀份,通知机制也只有⼀套,⼤⼤提⾼了效率,减少了资源浪费。

k8s client-go informer主要包括以下部件:

  1. Reflector:Reflector从kube-apiserver中list&watch资源对象, 然后调⽤DeltaFIFO的Add/Update/Delete/Replace⽅法将资源对象及其变化包装成Delta并将其丢到DeltaFIFO中;
  2. DeltaFIFO:DeltaFIFO中存储着⼀个map和⼀个queue,即 map[object key]Deltas以及object key的queue,Deltas为Delta的切⽚ 类型,Delta装有对象及对象的变化类型(Added/Updated/Deleted/ Sync) ,Reflector负责DeltaFIFO的输⼊,Controller负责处理 DeltaFIFO的输出;
  3. Controller:Controller从DeltaFIFO的queue中pop⼀个object key出来,并获取其关联的 Deltas出来进⾏处理,遍历Deltas,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发⽣;
  4. Processor:Processor根据对象的变化事件类型,调⽤相应的 ResourceEventHandler来处理对象的变化;
  5. Indexer:Indexer中有informer维护的指定资源对象的相对于etcd 数据的⼀份本地内存缓存,可通过该缓存获取资源对象,以减少对 apiserver、对etcd的请求压⼒;
  6. ResourceEventHandler:⽤户根据⾃身处理逻辑需要,注册⾃定 义的的ResourceEventHandler,当对象发⽣变化时,将触发调⽤对应 类型的ResourceEventHandler来做处理。

示例监听内置对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
	"log"
	"time"

	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	// 构建与kube-apiserver通信的config配置
	cfg, err := clientcmd.BuildConfigFromFlags("", "kube-config")
	if err != nil {
		log.Printf("read kubeconfig error %v", err)
		return
	}
	//rest.RESTClientFor()
	// 初始化与apiserver通信的clientset
	// 这里只是构造对象,并没有进行连接
	clientSet, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		log.Printf("get clientSet error %v", err)
		return
	}
	//clientSet.CoreV1().Pods("").Get()
	log.Println("cluster connected")

	stopCh := make(chan struct{})
	defer close(stopCh)

	// 利用clientset初始化shared informer factory以及pod informer
	// 每隔1分钟重新list,会触发UpdateFunc
	sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
	// 可以使用NewSharedInformerFactoryWithOptions 来指定namespace等
	// k8s的每个资源上都实现了Informer机制。每一个Informer上都会实现Informer/Lister方法
	// factory为k8s所有内置资源提供了创建对应Informer实例的方法,调用具体informer实例的lister/informer方法
	// 就完成了将informer注册到factory的过程
	podInformer := sharedInformers.Core().V1().Pods()
	informer := podInformer.Informer() // G.V.K 从factory中获取pod的informer
	lister := podInformer.Lister()

	// 注册informer的自定义ResourceEventHandler
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			o := obj.(v1.Object)
			log.Printf("New Pod added, %s/%s", o.GetName(), o.GetNamespace())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oldO := oldObj.(v1.Object)
			newO := newObj.(v1.Object)
			if oldO.GetResourceVersion() == newO.GetResourceVersion() {
				return // 每分钟重新list,过滤没有变化的对象
			}
			log.Printf("Pod %s/%s change from %s to %s", oldO.GetName(), oldO.GetNamespace(),
				oldO.GetResourceVersion(), newO.GetResourceVersion())
		},
		DeleteFunc: func(obj interface{}) {
			o := obj.(v1.Object)
			log.Printf("Pod %s/%s deleted", o.GetName(), o.GetNamespace())
		},
	})

	go func() {
		// 等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中
		if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
			log.Printf("Timeout for caches to sync")
			return
		}
		// 从informer中的indexer本地缓存中获取对象
		podList, err := lister.List(labels.Everything())
		if err != nil {
			log.Printf("lister err %v", err)
			return
		}
		log.Printf("list %d pods", len(podList))
	}()

	// 开始informer的list & watch操作
	// sharedInformers.Start(stopCh) // 整个sharedInformer的异步调用
	informer.Run(stopCh) // 同步
}

workqueue的使用

client-go中的util下提供了⼀个workqueue数据结构,从informer获取到的事件可以先放到这个queue中,然后⼯作线程去处理。

我们可以通过监听对象的变化,将资源对象写⼊到事件处理器的回调函数中。 但是如果我们直接在回调函数中处理这些数据会⽐较慢,对于这种情况往往我们就会使⽤队列来接收这些数据,然后再通过其他协程去处理这些数据,可 以⼤⼤加快数据的处理速度。这个其实和 channel 有点类似,但是 channel 功能过于单⼀,⽆法满⾜各类场景的需求,⽐如限制数据队列的写⼊速度。

为此在 client-go 中单独提供了⼀个 workqueue 的组件来实现队列的功能,由于 Kubernetes 很多模块都有队列的需求,所以统⼀实现在了 client-go 中,不仅可以⽤于 Kubernetes 内部,也可以供调⽤ client-go 的模块使⽤。

client-go 中抽象了⼏种队列,包括通⽤队列、限速队列、延时队列等。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package main

import (
	"flag"
	"fmt"
	"path/filepath"
	"time"

	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog"
)

type Controller struct {
	indexer  cache.Indexer
	queue    workqueue.RateLimitingInterface
	informer cache.Controller
}

func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
	return &Controller{
		informer: informer,
		indexer:  indexer,
		queue:    queue,
	}
}

func (c *Controller) processNextItem() bool {
	// 等到工作队列中有一个新元素
	key, quit := c.queue.Get()
	if quit {
		return false
	}
	// 告诉队列我们已经完成了处理此 key 的操作
	// 这将为其他 worker 解锁该 key
	// 这将确保安全的并行处理,因为永远不会并行处理具有相同 key 的两个Pod
	defer c.queue.Done(key)

	// 调用包含业务逻辑的方法
	err := c.syncToStdout(key.(string))
	// 如果在执行业务逻辑期间出现错误,则处理错误
	c.handleErr(err, key)
	return true
}

// syncToStdout 是控制器的业务逻辑实现
// 在此控制器中,它只是将有关 Pod 的信息打印到 stdout
// 如果发生错误,则简单地返回错误
// 此外重试逻辑不应成为业务逻辑的一部分。
func (c *Controller) syncToStdout(key string) error {
	// 从本地存储中获取 key 对应的对象
	obj, exists, err := c.indexer.GetByKey(key)
	if err != nil {
		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
		return err
	}
	if !exists {
		fmt.Printf("Pod %s does not exists anymore\\n", key)
	} else {
		fmt.Printf("Sync/Add/Update for Pod %s\\n", obj.(*v1.Pod).GetName())
	}
	return nil
}

// 检查是否发生错误,并确保我们稍后重试
func (c *Controller) handleErr(err error, key interface{}) {
	if err == nil {
		// 忘记每次成功同步时 key 的#AddRateLimited历史记录。
		// 这样可以确保不会因过时的错误历史记录而延迟此 key 更新的以后处理。
		c.queue.Forget(key)
		return
	}
	//如果出现问题,此控制器将重试5次
	if c.queue.NumRequeues(key) < 5 {
		klog.Infof("Error syncing pod %v: %v", key, err)
		// 重新加入 key 到限速队列
		// 根据队列上的速率限制器和重新入队历史记录,稍后将再次处理该 key
		c.queue.AddRateLimited(key)
		return
	}
	c.queue.Forget(key)
	// 多次重试,我们也无法成功处理该key
	runtime.HandleError(err)
	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

// Run 开始 watch 和同步
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	defer runtime.HandleCrash()

	// 停止控制器后关掉队列
	defer c.queue.ShutDown()
	klog.Info("Starting Pod controller")

	// 启动
	go c.informer.Run(stopCh)

	// 等待所有相关的缓存同步,然后再开始处理队列中的项目
	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}

	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	<-stopCh
	klog.Info("Stopping Pod controller")
}

func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}

func initClient() (*kubernetes.Clientset, error) {
	var err error
	var config *rest.Config
	// inCluster(Pod)、KubeConfig(kubectl)
	var kubeconfig *string

	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) kubeconfig 文件的绝对路径")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的绝对路径")
	}
	flag.Parse()

	// 首先使用 inCluster 模式(需要去配置对应的 RBAC 权限,默认的sa是default->是没有获取deployments的List权限)
	if config, err = rest.InClusterConfig(); err != nil {
		// 使用 KubeConfig 文件创建集群配置 Config 对象
		if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
			panic(err.Error())
		}
	}

	// 已经获得了 rest.Config 对象
	// 创建 Clientset 对象
	return kubernetes.NewForConfig(config)
}

func main() {
	clientset, err := initClient()
	if err != nil {
		klog.Fatal(err)
	}

	// 创建 Pod ListWatcher
	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

	// 创建队列
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// 在 informer 的帮助下,将工作队列绑定到缓存
	// 这样,我们确保无论何时更新缓存,都将 pod key 添加到工作队列中
	// 注意,当我们最终从工作队列中处理元素时,我们可能会看到 Pod 的版本比响应触发更新的版本新
	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	controller := NewController(queue, indexer, informer)

	indexer.Add(&v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "mypod",
			Namespace: v1.NamespaceDefault,
		},
	})

	// start controller
	stopCh := make(chan struct{})
	defer close(stopCh)
	go controller.Run(1, stopCh)

	select {}

内置控制器

使用以下命令,查看–feature-gates参数给出的支持的控制器列表及是否默认开启

1
kubectl -n kube-system exec -it kube-controller-manager --kube-controller-manager -h

常见的内置控制器:

  • Job Controller
  • CronJob Controller
  • Pod AutoScaler
  • ReplicaSet
  • Service Controller
  • ServiceAccount Controller
  • StatefulSet Controller
  • Volume Controller,根据PV spec创建volume
  • Resource quota Controller,用户使用资源后,更新状态
  • Namespace Controller
  • Node Controller
  • Daemon Controller,根据daemonset创建pod
  • Deployment Controller
  • Endpoint Controller,根据service更新Endpoint
  • Garbage Controller,处理级联删除

Cloud Controller Manager

用于管理K8s使用的特定于某个云的资源的生命周期。如对接企业云底座OpenStack,它带了一些获取节点信息的API,那么k8s获取Node信息时,直接调用这个API即可。