tokers commented on a change in pull request #284:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/284#discussion_r588981629



##########
File path: pkg/ingress/controller/secret.go
##########
@@ -0,0 +1,203 @@
+package controller
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "go.uber.org/zap"
+       corev1 "k8s.io/api/core/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/util/runtime"
+       "k8s.io/client-go/tools/cache"
+       "k8s.io/client-go/util/workqueue"
+
+       "github.com/apache/apisix-ingress-controller/pkg/log"
+       "github.com/apache/apisix-ingress-controller/pkg/seven/state"
+       "github.com/apache/apisix-ingress-controller/pkg/types"
+       apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+)
+
+type secretController struct {
+       controller *Controller
+       workqueue  workqueue.RateLimitingInterface
+       workers    int
+}
+
+func (c *Controller) newSecretController() *secretController {
+       ctl := &secretController{
+               controller: c,
+               workqueue:  
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
 60*time.Second, 5), "Secrets"),
+               workers:    1,
+       }
+
+       ctl.controller.secretInformer.AddEventHandler(
+               cache.ResourceEventHandlerFuncs{
+                       AddFunc:    ctl.onAdd,
+                       UpdateFunc: ctl.onUpdate,
+                       DeleteFunc: ctl.onDelete,
+               },
+       )
+
+       return ctl
+}
+
+func (c *secretController) run(ctx context.Context) {
+       log.Info("secret controller started")
+       defer log.Info("secret controller exited")
+
+       if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.secretInformer.HasSynced); !ok {
+               log.Error("informers sync failed")
+               return
+       }
+
+       handler := func() {
+               for {
+                       obj, shutdown := c.workqueue.Get()
+                       if shutdown {
+                               return
+                       }
+                       err := func(obj interface{}) error {
+                               defer c.workqueue.Done(obj)
+                               var key string
+                               event := obj.(*types.Event)
+                               if secret, ok := event.Object.(*corev1.Secret); 
!ok {
+                                       c.workqueue.Forget(obj)
+                                       return fmt.Errorf("expected Secret in 
workqueue but got %#v", obj)
+                               } else {
+                                       if err := c.sync(ctx, 
obj.(*types.Event)); err != nil {
+                                               c.workqueue.AddRateLimited(obj)
+                                               log.Errorf("sync secret with 
ssl %s failed", secret.Namespace+"_"+secret.Name)
+                                               return fmt.Errorf("error 
syncing '%s': %s", key, err.Error())
+                                       }
+                                       c.workqueue.Forget(obj)
+                                       return nil
+                               }
+                       }(obj)
+                       if err != nil {
+                               runtime.HandleError(err)
+                       }
+               }
+       }
+
+       for i := 0; i < c.workers; i++ {
+               go handler()
+       }
+
+       <-ctx.Done()
+       c.workqueue.ShutDown()
+}
+
+func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
+       obj := ev.Object.(*corev1.Secret)
+       sec, err := 
c.controller.secretLister.Secrets(obj.Namespace).Get(obj.Name)
+
+       key := obj.Namespace + "_" + obj.Name
+       if err != nil {
+               if !k8serrors.IsNotFound(err) {
+                       log.Errorw("failed to get Secret",
+                               zap.String("version", obj.ResourceVersion),
+                               zap.String("key", key),
+                               zap.Error(err),
+                       )
+                       return err
+               }
+
+               if ev.Type != types.EventDelete {
+                       log.Warnw("Secret was deleted before it can be 
delivered",
+                               zap.String("key", key),
+                               zap.String("version", obj.ResourceVersion),
+                       )
+                       return nil
+               }
+       }
+       if ev.Type == types.EventDelete {
+               if sec != nil {
+                       // We still find the resource while we are processing 
the DELETE event,
+                       // that means object with same namespace and name was 
created, discarding
+                       // this stale DELETE event.
+                       log.Warnw("discard the stale secret delete event since 
the resource still exists",
+                               zap.String("key", key),
+                       )
+                       return nil
+               }
+               sec = ev.Tombstone.(*corev1.Secret)
+       }
+       // sync SSL in APISIX which is store in secretSSLMap
+       // FixMe Need to update the status of CRD ApisixTls
+       ssls, ok := secretSSLMap.Load(key)
+       if ok {
+               sslMap := ssls.(sync.Map)
+               sslMap.Range(func(_, v interface{}) bool {
+                       ssl := v.(*apisixv1.Ssl)
+                       err = state.SyncSsl(ssl, ev.Type.String())
+                       if err != nil {
+                               return false
+                       }
+                       return true
+               })
+       }
+       return err
+}
+
+func (c *secretController) onAdd(obj interface{}) {
+       key, err := cache.MetaNamespaceKeyFunc(obj)
+       if err != nil {
+               log.Errorf("found secret object with bad namespace/name: %s, 
ignore it", err)
+               return
+       }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
+
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:   types.EventAdd,
+               Object: obj,

Review comment:
       It would be better to pass `key` as the object, because there is a time 
window between we put the event into queue and we process it, in such period, 
the object mapped by `key` might be changed again in theory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to