tokers commented on a change in pull request #284: URL: https://github.com/apache/apisix-ingress-controller/pull/284#discussion_r589118124
########## File path: pkg/ingress/controller/secret.go ########## @@ -0,0 +1,203 @@ +package controller + +import ( + "context" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/seven/state" + "github.com/apache/apisix-ingress-controller/pkg/types" + apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" +) + +type secretController struct { + controller *Controller + workqueue workqueue.RateLimitingInterface + workers int +} + +func (c *Controller) newSecretController() *secretController { + ctl := &secretController{ + controller: c, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Secrets"), + workers: 1, + } + + ctl.controller.secretInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: ctl.onAdd, + UpdateFunc: ctl.onUpdate, + DeleteFunc: ctl.onDelete, + }, + ) + + return ctl +} + +func (c *secretController) run(ctx context.Context) { + log.Info("secret controller started") + defer log.Info("secret controller exited") + + if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.secretInformer.HasSynced); !ok { + log.Error("informers sync failed") + return + } + + handler := func() { + for { + obj, shutdown := c.workqueue.Get() + if shutdown { + return + } + err := func(obj interface{}) error { + defer c.workqueue.Done(obj) + var key string + event := obj.(*types.Event) + if secret, ok := event.Object.(*corev1.Secret); !ok { + c.workqueue.Forget(obj) + return fmt.Errorf("expected Secret in workqueue but got %#v", obj) + } else { + if err := c.sync(ctx, obj.(*types.Event)); err != nil { + c.workqueue.AddRateLimited(obj) + log.Errorf("sync secret with ssl %s failed", secret.Namespace+"_"+secret.Name) + return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + } + c.workqueue.Forget(obj) + return nil + } + }(obj) + if err != nil { + runtime.HandleError(err) + } + } + } + + for i := 0; i < c.workers; i++ { + go handler() + } + + <-ctx.Done() + c.workqueue.ShutDown() +} + +func (c *secretController) sync(ctx context.Context, ev *types.Event) error { + obj := ev.Object.(*corev1.Secret) + sec, err := c.controller.secretLister.Secrets(obj.Namespace).Get(obj.Name) + + key := obj.Namespace + "_" + obj.Name + if err != nil { + if !k8serrors.IsNotFound(err) { + log.Errorw("failed to get Secret", + zap.String("version", obj.ResourceVersion), + zap.String("key", key), + zap.Error(err), + ) + return err + } + + if ev.Type != types.EventDelete { + log.Warnw("Secret was deleted before it can be delivered", + zap.String("key", key), + zap.String("version", obj.ResourceVersion), + ) + return nil + } + } + if ev.Type == types.EventDelete { + if sec != nil { + // We still find the resource while we are processing the DELETE event, + // that means object with same namespace and name was created, discarding + // this stale DELETE event. + log.Warnw("discard the stale secret delete event since the resource still exists", + zap.String("key", key), + ) + return nil + } + sec = ev.Tombstone.(*corev1.Secret) + } + // sync SSL in APISIX which is store in secretSSLMap + // FixMe Need to update the status of CRD ApisixTls + ssls, ok := secretSSLMap.Load(key) + if ok { + sslMap := ssls.(sync.Map) + sslMap.Range(func(_, v interface{}) bool { + ssl := v.(*apisixv1.Ssl) + err = state.SyncSsl(ssl, ev.Type.String()) + if err != nil { + return false + } + return true + }) + } + return err +} + +func (c *secretController) onAdd(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorf("found secret object with bad namespace/name: %s, ignore it", err) + return + } + if !c.controller.namespaceWatching(key) { + return + } + + c.workqueue.AddRateLimited(&types.Event{ + Type: types.EventAdd, + Object: obj, Review comment: If data is not available, why we should still process on it? It always means the Kubernetes Control Plane is in trouble, or the networking is partitioned, or the object was deleted, in such a case, just return error and use the retry mechanism is OK. And, I didn't see other objects fields referred. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org