tokers commented on a change in pull request #284:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/284#discussion_r589118124



##########
File path: pkg/ingress/controller/secret.go
##########
@@ -0,0 +1,203 @@
+package controller
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "go.uber.org/zap"
+       corev1 "k8s.io/api/core/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/util/runtime"
+       "k8s.io/client-go/tools/cache"
+       "k8s.io/client-go/util/workqueue"
+
+       "github.com/apache/apisix-ingress-controller/pkg/log"
+       "github.com/apache/apisix-ingress-controller/pkg/seven/state"
+       "github.com/apache/apisix-ingress-controller/pkg/types"
+       apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+)
+
+type secretController struct {
+       controller *Controller
+       workqueue  workqueue.RateLimitingInterface
+       workers    int
+}
+
+func (c *Controller) newSecretController() *secretController {
+       ctl := &secretController{
+               controller: c,
+               workqueue:  
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
 60*time.Second, 5), "Secrets"),
+               workers:    1,
+       }
+
+       ctl.controller.secretInformer.AddEventHandler(
+               cache.ResourceEventHandlerFuncs{
+                       AddFunc:    ctl.onAdd,
+                       UpdateFunc: ctl.onUpdate,
+                       DeleteFunc: ctl.onDelete,
+               },
+       )
+
+       return ctl
+}
+
+func (c *secretController) run(ctx context.Context) {
+       log.Info("secret controller started")
+       defer log.Info("secret controller exited")
+
+       if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.secretInformer.HasSynced); !ok {
+               log.Error("informers sync failed")
+               return
+       }
+
+       handler := func() {
+               for {
+                       obj, shutdown := c.workqueue.Get()
+                       if shutdown {
+                               return
+                       }
+                       err := func(obj interface{}) error {
+                               defer c.workqueue.Done(obj)
+                               var key string
+                               event := obj.(*types.Event)
+                               if secret, ok := event.Object.(*corev1.Secret); 
!ok {
+                                       c.workqueue.Forget(obj)
+                                       return fmt.Errorf("expected Secret in 
workqueue but got %#v", obj)
+                               } else {
+                                       if err := c.sync(ctx, 
obj.(*types.Event)); err != nil {
+                                               c.workqueue.AddRateLimited(obj)
+                                               log.Errorf("sync secret with 
ssl %s failed", secret.Namespace+"_"+secret.Name)
+                                               return fmt.Errorf("error 
syncing '%s': %s", key, err.Error())
+                                       }
+                                       c.workqueue.Forget(obj)
+                                       return nil
+                               }
+                       }(obj)
+                       if err != nil {
+                               runtime.HandleError(err)
+                       }
+               }
+       }
+
+       for i := 0; i < c.workers; i++ {
+               go handler()
+       }
+
+       <-ctx.Done()
+       c.workqueue.ShutDown()
+}
+
+func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
+       obj := ev.Object.(*corev1.Secret)
+       sec, err := 
c.controller.secretLister.Secrets(obj.Namespace).Get(obj.Name)
+
+       key := obj.Namespace + "_" + obj.Name
+       if err != nil {
+               if !k8serrors.IsNotFound(err) {
+                       log.Errorw("failed to get Secret",
+                               zap.String("version", obj.ResourceVersion),
+                               zap.String("key", key),
+                               zap.Error(err),
+                       )
+                       return err
+               }
+
+               if ev.Type != types.EventDelete {
+                       log.Warnw("Secret was deleted before it can be 
delivered",
+                               zap.String("key", key),
+                               zap.String("version", obj.ResourceVersion),
+                       )
+                       return nil
+               }
+       }
+       if ev.Type == types.EventDelete {
+               if sec != nil {
+                       // We still find the resource while we are processing 
the DELETE event,
+                       // that means object with same namespace and name was 
created, discarding
+                       // this stale DELETE event.
+                       log.Warnw("discard the stale secret delete event since 
the resource still exists",
+                               zap.String("key", key),
+                       )
+                       return nil
+               }
+               sec = ev.Tombstone.(*corev1.Secret)
+       }
+       // sync SSL in APISIX which is store in secretSSLMap
+       // FixMe Need to update the status of CRD ApisixTls
+       ssls, ok := secretSSLMap.Load(key)
+       if ok {
+               sslMap := ssls.(sync.Map)
+               sslMap.Range(func(_, v interface{}) bool {
+                       ssl := v.(*apisixv1.Ssl)
+                       err = state.SyncSsl(ssl, ev.Type.String())
+                       if err != nil {
+                               return false
+                       }
+                       return true
+               })
+       }
+       return err
+}
+
+func (c *secretController) onAdd(obj interface{}) {
+       key, err := cache.MetaNamespaceKeyFunc(obj)
+       if err != nil {
+               log.Errorf("found secret object with bad namespace/name: %s, 
ignore it", err)
+               return
+       }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
+
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:   types.EventAdd,
+               Object: obj,

Review comment:
       If data is not available, why we should still process on it? It always 
means the Kubernetes Control Plane is in trouble, or the networking is 
partitioned, or the object was deleted, in such a case, just return error and 
use the retry mechanism is OK.
   
   And, I didn't see other objects fields referred.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to