1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
|
package main
import (
"flag"
"fmt"
"path/filepath"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
type Controller struct {
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
}
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
informer: informer,
indexer: indexer,
queue: queue,
}
}
func (c *Controller) processNextItem() bool {
// 等到工作队列中有一个新元素
key, quit := c.queue.Get()
if quit {
return false
}
// 告诉队列我们已经完成了处理此 key 的操作
// 这将为其他 worker 解锁该 key
// 这将确保安全的并行处理,因为永远不会并行处理具有相同 key 的两个Pod
defer c.queue.Done(key)
// 调用包含业务逻辑的方法
err := c.syncToStdout(key.(string))
// 如果在执行业务逻辑期间出现错误,则处理错误
c.handleErr(err, key)
return true
}
// syncToStdout 是控制器的业务逻辑实现
// 在此控制器中,它只是将有关 Pod 的信息打印到 stdout
// 如果发生错误,则简单地返回错误
// 此外重试逻辑不应成为业务逻辑的一部分。
func (c *Controller) syncToStdout(key string) error {
// 从本地存储中获取 key 对应的对象
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
fmt.Printf("Pod %s does not exists anymore\\n", key)
} else {
fmt.Printf("Sync/Add/Update for Pod %s\\n", obj.(*v1.Pod).GetName())
}
return nil
}
// 检查是否发生错误,并确保我们稍后重试
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
// 忘记每次成功同步时 key 的#AddRateLimited历史记录。
// 这样可以确保不会因过时的错误历史记录而延迟此 key 更新的以后处理。
c.queue.Forget(key)
return
}
//如果出现问题,此控制器将重试5次
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
// 重新加入 key 到限速队列
// 根据队列上的速率限制器和重新入队历史记录,稍后将再次处理该 key
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
// 多次重试,我们也无法成功处理该key
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
// Run 开始 watch 和同步
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()
// 停止控制器后关掉队列
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")
// 启动
go c.informer.Run(stopCh)
// 等待所有相关的缓存同步,然后再开始处理队列中的项目
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
klog.Info("Stopping Pod controller")
}
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
func initClient() (*kubernetes.Clientset, error) {
var err error
var config *rest.Config
// inCluster(Pod)、KubeConfig(kubectl)
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) kubeconfig 文件的绝对路径")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的绝对路径")
}
flag.Parse()
// 首先使用 inCluster 模式(需要去配置对应的 RBAC 权限,默认的sa是default->是没有获取deployments的List权限)
if config, err = rest.InClusterConfig(); err != nil {
// 使用 KubeConfig 文件创建集群配置 Config 对象
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
panic(err.Error())
}
}
// 已经获得了 rest.Config 对象
// 创建 Clientset 对象
return kubernetes.NewForConfig(config)
}
func main() {
clientset, err := initClient()
if err != nil {
klog.Fatal(err)
}
// 创建 Pod ListWatcher
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
// 创建队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 在 informer 的帮助下,将工作队列绑定到缓存
// 这样,我们确保无论何时更新缓存,都将 pod key 添加到工作队列中
// 注意,当我们最终从工作队列中处理元素时,我们可能会看到 Pod 的版本比响应触发更新的版本新
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
controller := NewController(queue, indexer, informer)
indexer.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "mypod",
Namespace: v1.NamespaceDefault,
},
})
// start controller
stopCh := make(chan struct{})
defer close(stopCh)
go controller.Run(1, stopCh)
select {}
|