This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 396cae2  feat: controller leader election (#173)
396cae2 is described below

commit 396cae2a5d4502a41f8683c472a1e533d82abb24
Author: Alex Zhang <zchao1...@gmail.com>
AuthorDate: Thu Jan 14 14:32:15 2021 +0800

    feat: controller leader election (#173)
    
    * feat: controller leader election
    
    * test: add e2e cases
    
    * chore: optimize e2e cases
---
 charts/ingress-apisix/templates/deployment.yaml   |   9 ++
 cmd/ingress/ingress.go                            |   1 +
 conf/config-default.yaml                          |  17 ++--
 pkg/config/config.go                              |   5 +
 pkg/config/config_test.go                         |   2 +
 pkg/ingress/controller/controller.go              | 119 ++++++++++++++++++----
 samples/deploy/deployment/ingress-controller.yaml |   9 ++
 test/e2e/ingress/namespace.go                     |   5 +
 test/e2e/ingress/resourcepushing.go               |  14 ++-
 test/e2e/ingress/sanity.go                        |  64 ++++++++++++
 test/e2e/scaffold/httpbin.go                      |  32 ++++++
 test/e2e/scaffold/ingress.go                      |  49 ++++++++-
 test/e2e/scaffold/scaffold.go                     |  29 ++++--
 13 files changed, 315 insertions(+), 40 deletions(-)

diff --git a/charts/ingress-apisix/templates/deployment.yaml 
b/charts/ingress-apisix/templates/deployment.yaml
index b96e811..b103ebd 100644
--- a/charts/ingress-apisix/templates/deployment.yaml
+++ b/charts/ingress-apisix/templates/deployment.yaml
@@ -70,6 +70,15 @@ spec:
           volumeMounts:
             - mountPath: /ingress-apisix/conf
               name: configuration
+          env:
+          - name: POD_NAMESPACE
+            valueFrom:
+              fieldRef:
+                fieldPath: metadata.namespace
+          - name: POD_NAME
+            valueFrom:
+              fieldRef:
+                fieldPath: metadata.name
       {{- with .Values.ingressController.nodeSelector }}
       nodeSelector:
         {{- toYaml . | nindent 8 }}
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 66a7748..dcaf894 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -126,6 +126,7 @@ the apisix cluster and others are created`,
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig, 
"kubeconfig", "", "Kubernetes configuration file (by default in-cluster 
configuration will be used)")
        
cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration, 
"resync-interval", time.Minute, "the controller resync (with Kubernetes) 
interval, the minimum resync interval is 30s")
        cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, 
"app-namespace", []string{config.NamespaceAll}, "namespaces that controller 
will watch for resources")
+       cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, 
"election-id", config.IngressAPISIXLeader, "election id used for compaign the 
controller leader")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", 
"", "the base URL for APISIX admin api / manager api")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, 
"apisix-admin-key", "", "admin key used for the authorization of APISIX admin 
api / manager api")
 
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 5e34911..419338a 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -33,13 +33,16 @@ enable_profiling: true # enable profileing via web 
interfaces
 
 # Kubernetes related configurations.
 kubernetes:
-  kubeconfig: ""         # the Kubernetes configuration file path, default is
-                         # "", so the in-cluster configuration will be used.
-  resync_interval: "6h" # how long should apisix-ingress-controller
-                         # re-synchronizes with Kubernetes, default is 6h,
-                         # and the minimal resync interval is 30s.
-  app_namespaces: ["*"]  # namespace list that controller will watch for 
resources,
-                         # by default all namespaces (represented by "*") are 
watched.
+  kubeconfig: ""                       # the Kubernetes configuration file 
path, default is
+                                       # "", so the in-cluster configuration 
will be used.
+  resync_interval: "6h"                # how long should 
apisix-ingress-controller
+                                       # re-synchronizes with Kubernetes, 
default is 6h,
+                                       # and the minimal resync interval is 
30s.
+  app_namespaces: ["*"]                # namespace list that controller will 
watch for resources,
+                                       # by default all namespaces 
(represented by "*") are watched.
+  election_id: "ingress-apisix-leader" # the election id for the controller 
leader compaign,
+                                       # only the leader will watch and 
delivery resource changes,
+                                       # other instances (as candidates) stand 
by.
 
 # APISIX related configurations.
 apisix:
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 4a00383..7627989 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -30,6 +30,9 @@ import (
 const (
        // NamespaceAll represents all namespaces.
        NamespaceAll = "*"
+       // IngressAPISIXLeader is the default election id for the controller
+       // leader election.
+       IngressAPISIXLeader = "ingress-apisix-leader"
 
        _minimalResyncInterval = 30 * time.Second
 )
@@ -50,6 +53,7 @@ type KubernetesConfig struct {
        Kubeconfig     string             `json:"kubeconfig" yaml:"kubeconfig"`
        ResyncInterval types.TimeDuration `json:"resync_interval" 
yaml:"resync_interval"`
        AppNamespaces  []string           `json:"app_namespaces" 
yaml:"app_namespaces"`
+       ElectionID     string             `json:"election_id" 
yaml:"election_id"`
 }
 
 // APISIXConfig contains all APISIX related config items.
@@ -71,6 +75,7 @@ func NewDefaultConfig() *Config {
                        Kubeconfig:     "", // Use in-cluster configurations.
                        ResyncInterval: types.TimeDuration{Duration: 6 * 
time.Hour},
                        AppNamespaces:  []string{v1.NamespaceAll},
+                       ElectionID:     IngressAPISIXLeader,
                },
        }
 }
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index d220b38..808ed88 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -36,6 +36,7 @@ func TestNewConfigFromFile(t *testing.T) {
                        ResyncInterval: types.TimeDuration{time.Hour},
                        Kubeconfig:     "/path/to/foo/baz",
                        AppNamespaces:  []string{""},
+                       ElectionID:     "my-election-id",
                },
                APISIX: APISIXConfig{
                        BaseURL:  "http://127.0.0.1:8080/apisix";,
@@ -72,6 +73,7 @@ enable_profiling: true
 kubernetes:
   kubeconfig: /path/to/foo/baz
   resync_interval: 1h0m0s
+  election_id: my-election-id
 apisix:
   base_url: http://127.0.0.1:8080/apisix
   admin_key: "123456"
diff --git a/pkg/ingress/controller/controller.go 
b/pkg/ingress/controller/controller.go
index 2114ec2..14fef47 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -15,22 +15,26 @@
 package controller
 
 import (
+       "context"
        "os"
        "sync"
-
-       v1 "k8s.io/api/core/v1"
-
-       "k8s.io/client-go/tools/cache"
-
-       "github.com/api7/ingress-controller/pkg/apisix"
+       "time"
 
        clientSet 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
        crdclientset 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
        
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions"
+       "go.uber.org/zap"
+       v1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/tools/cache"
+       "k8s.io/client-go/tools/leaderelection"
+       "k8s.io/client-go/tools/leaderelection/resourcelock"
 
        "github.com/api7/ingress-controller/pkg/api"
+       "github.com/api7/ingress-controller/pkg/apisix"
        "github.com/api7/ingress-controller/pkg/config"
        "github.com/api7/ingress-controller/pkg/kube"
        "github.com/api7/ingress-controller/pkg/log"
@@ -47,6 +51,9 @@ func recoverException() {
 
 // Controller is the ingress apisix controller object.
 type Controller struct {
+       name               string
+       namespace          string
+       cfg                *config.Config
        wg                 sync.WaitGroup
        watchingNamespace  map[string]struct{}
        apiServer          *api.Server
@@ -96,6 +103,9 @@ func NewController(cfg *config.Config) (*Controller, error) {
        }
 
        c := &Controller{
+               name:               podName,
+               namespace:          podNamespace,
+               cfg:                cfg,
                apiServer:          apiSrv,
                metricsCollector:   metrics.NewPrometheusCollector(podName, 
podNamespace),
                clientset:          kube.GetKubeClient(),
@@ -115,30 +125,102 @@ func (c *Controller) goAttach(handler func()) {
        }()
 }
 
+// Eventf implements the resourcelock.EventRecorder interface.
+func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, 
message string, _ ...interface{}) {
+       log.Infow(reason, zap.String("message", message), 
zap.String("event_type", eventType))
+}
+
 // Run launches the controller.
 func (c *Controller) Run(stop chan struct{}) error {
-       // TODO leader election.
+       rootCtx, rootCancel := context.WithCancel(context.Background())
+       defer rootCancel()
+       go func() {
+               <-stop
+               rootCancel()
+       }()
+       c.metricsCollector.ResetLeader(false)
+
+       go func() {
+               if err := c.apiServer.Run(rootCtx.Done()); err != nil {
+                       log.Errorf("failed to launch API Server: %s", err)
+               }
+       }()
+
+       lock := &resourcelock.LeaseLock{
+               LeaseMeta: metav1.ObjectMeta{
+                       Namespace: c.namespace,
+                       Name:      c.cfg.Kubernetes.ElectionID,
+               },
+               Client: c.clientset.CoordinationV1(),
+               LockConfig: resourcelock.ResourceLockConfig{
+                       Identity:      c.name,
+                       EventRecorder: c,
+               },
+       }
+       cfg := leaderelection.LeaderElectionConfig{
+               Lock:          lock,
+               LeaseDuration: 15 * time.Second,
+               RenewDeadline: 5 * time.Second,
+               RetryPeriod:   2 * time.Second,
+               Callbacks: leaderelection.LeaderCallbacks{
+                       OnStartedLeading: c.run,
+                       OnNewLeader: func(identity string) {
+                               log.Warnf("found a new leader %s", identity)
+                               if identity != c.name {
+                                       log.Infow("controller now is running as 
a candidate",
+                                               zap.String("namespace", 
c.namespace),
+                                               zap.String("pod", c.name),
+                                       )
+                               }
+                       },
+                       OnStoppedLeading: func() {
+                               log.Infow("controller now is running as a 
candidate",
+                                       zap.String("namespace", c.namespace),
+                                       zap.String("pod", c.name),
+                               )
+                               c.metricsCollector.ResetLeader(false)
+                       },
+               },
+               ReleaseOnCancel: true,
+               Name:            "ingress-apisix",
+       }
+
+       elector, err := leaderelection.NewLeaderElector(cfg)
+       if err != nil {
+               log.Errorf("failed to create leader elector: %s", err.Error())
+               return err
+       }
+
+election:
+       elector.Run(rootCtx)
+       select {
+       case <-rootCtx.Done():
+               return nil
+       default:
+               goto election
+       }
+}
+
+func (c *Controller) run(ctx context.Context) {
+       log.Infow("controller now is running as leader",
+               zap.String("namespace", c.namespace),
+               zap.String("pod", c.name),
+       )
        c.metricsCollector.ResetLeader(true)
-       log.Info("controller run as leader")
 
        ac := &Api6Controller{
                KubeClientSet:             c.clientset,
                Api6ClientSet:             c.crdClientset,
                SharedInformerFactory:     c.crdInformerFactory,
                CoreSharedInformerFactory: kube.CoreSharedInformerFactory,
-               Stop:                      stop,
+               Stop:                      ctx.Done(),
        }
        epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints()
        kube.EndpointsInformer = epInformer
        // endpoint
        ac.Endpoint(c)
        c.goAttach(func() {
-               ac.CoreSharedInformerFactory.Start(stop)
-       })
-       c.goAttach(func() {
-               if err := c.apiServer.Run(stop); err != nil {
-                       log.Errorf("failed to launch API Server: %s", err)
-               }
+               ac.CoreSharedInformerFactory.Start(ctx.Done())
        })
 
        // ApisixRoute
@@ -151,12 +233,11 @@ func (c *Controller) Run(stop chan struct{}) error {
        ac.ApisixTLS(c)
 
        c.goAttach(func() {
-               ac.SharedInformerFactory.Start(stop)
+               ac.SharedInformerFactory.Start(ctx.Done())
        })
 
-       <-stop
+       <-ctx.Done()
        c.wg.Wait()
-       return nil
 }
 
 // namespaceWatching accepts a resource key, getting the namespace part
@@ -182,7 +263,7 @@ type Api6Controller struct {
        Api6ClientSet             clientSet.Interface
        SharedInformerFactory     externalversions.SharedInformerFactory
        CoreSharedInformerFactory informers.SharedInformerFactory
-       Stop                      chan struct{}
+       Stop                      <-chan struct{}
 }
 
 func (api6 *Api6Controller) ApisixRoute(controller *Controller) {
diff --git a/samples/deploy/deployment/ingress-controller.yaml 
b/samples/deploy/deployment/ingress-controller.yaml
index 98267f9..eae43e1 100644
--- a/samples/deploy/deployment/ingress-controller.yaml
+++ b/samples/deploy/deployment/ingress-controller.yaml
@@ -50,6 +50,15 @@ spec:
         - mountPath: /ingress-apisix/conf/config.yaml
           name: apisix-ingress-configmap
           subPath: config.yaml
+        env:
+          - name: POD_NAMESPACE
+            valueFrom:
+              fieldRef:
+                fieldPath: metadata.namespace
+          - name: POD_NAME
+            valueFrom:
+              fieldRef:
+                fieldPath: metadata.name
       volumes:
         - configMap:
             name: apisix-ingress-cm
diff --git a/test/e2e/ingress/namespace.go b/test/e2e/ingress/namespace.go
index 23601ff..8cc654b 100644
--- a/test/e2e/ingress/namespace.go
+++ b/test/e2e/ingress/namespace.go
@@ -19,6 +19,7 @@ import (
        "encoding/json"
        "fmt"
        "net/http"
+       "time"
 
        "github.com/api7/ingress-controller/test/e2e/scaffold"
        "github.com/onsi/ginkgo"
@@ -49,6 +50,10 @@ spec:
                assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), 
"checking number of routes")
                assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
 
+               // TODO When ingress controller can feedback the lifecycle of 
CRDs to the
+               // status field, we can poll it rather than sleeping.
+               time.Sleep(3 * time.Second)
+
                body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
                var placeholder ip
                err := json.Unmarshal([]byte(body), &placeholder)
diff --git a/test/e2e/ingress/resourcepushing.go 
b/test/e2e/ingress/resourcepushing.go
index 2ffba3f..70490ff 100644
--- a/test/e2e/ingress/resourcepushing.go
+++ b/test/e2e/ingress/resourcepushing.go
@@ -49,10 +49,11 @@ spec:
                assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
                err = s.EnsureNumApisixUpstreamsCreated(1)
                assert.Nil(ginkgo.GinkgoT(), err, "Checking number of 
upstreams")
-               scale := 2
-               err = s.ScaleHTTPBIN(scale)
-               assert.Nil(ginkgo.GinkgoT(), err)
-               time.Sleep(5 * time.Second) // wait for ingress to sync
+               assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2), "scaling number 
of httpbin instancess")
+               assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPoddsAvailable(), 
"waiting for all httpbin pods ready")
+               // TODO When ingress controller can feedback the lifecycle of 
CRDs to the
+               // status field, we can poll it rather than sleeping.
+               time.Sleep(5 * time.Second)
                ups, err := s.ListApisixUpstreams()
                assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
                assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes 
not expect")
@@ -84,7 +85,10 @@ spec:
 
                // remove
                assert.Nil(ginkgo.GinkgoT(), 
s.RemoveResourceByString(apisixRoute))
-               time.Sleep(10 * time.Second) // wait for ingress to sync
+
+               // TODO When ingress controller can feedback the lifecycle of 
CRDs to the
+               // status field, we can poll it rather than sleeping.
+               time.Sleep(10 * time.Second)
                ups, err := s.ListApisixUpstreams()
                assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
                assert.Len(ginkgo.GinkgoT(), ups, 0, "upstreams nodes not 
expect")
diff --git a/test/e2e/ingress/sanity.go b/test/e2e/ingress/sanity.go
index 5b2c5cd..fc43fc6 100644
--- a/test/e2e/ingress/sanity.go
+++ b/test/e2e/ingress/sanity.go
@@ -17,6 +17,7 @@ package ingress
 import (
        "encoding/json"
        "net/http"
+       "time"
 
        "github.com/api7/ingress-controller/test/e2e/scaffold"
        "github.com/onsi/ginkgo"
@@ -51,6 +52,11 @@ var _ = ginkgo.Describe("single-route", func() {
                assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes")
                err = s.EnsureNumApisixUpstreamsCreated(1)
                assert.Nil(ginkgo.GinkgoT(), err, "checking number of 
upstreams")
+
+               // TODO When ingress controller can feedback the lifecycle of 
CRDs to the
+               // status field, we can poll it rather than sleeping.
+               time.Sleep(3 * time.Second)
+
                body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
                var placeholder ip
                err = json.Unmarshal([]byte(body), &placeholder)
@@ -91,6 +97,9 @@ var _ = ginkgo.Describe("double-routes", func() {
                assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes")
                err = s.EnsureNumApisixUpstreamsCreated(1)
                assert.Nil(ginkgo.GinkgoT(), err, "checking number of 
upstreams")
+               // TODO When ingress controller can feedback the lifecycle of 
CRDs to the
+               // status field, we can poll it rather than sleeping.
+               time.Sleep(3 * time.Second)
                body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
                var placeholder ip
                err = json.Unmarshal([]byte(body), &placeholder)
@@ -103,3 +112,58 @@ var _ = ginkgo.Describe("double-routes", func() {
                // We don't care the json data, only make sure it's a normal 
json string.
        })
 })
+
+var _ = ginkgo.Describe("leader election", func() {
+       s := scaffold.NewScaffold(&scaffold.Options{
+               Name:                    "leaderelection",
+               Kubeconfig:              scaffold.GetKubeconfig(),
+               APISIXConfigPath:        "testdata/apisix-gw-config.yaml",
+               APISIXDefaultConfigPath: 
"testdata/apisix-gw-config-default.yaml",
+               IngressAPISIXReplicas:   2,
+       })
+       ginkgo.It("lease check", func() {
+               pods, err := s.GetIngressPodDetails()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Len(ginkgo.GinkgoT(), pods, 2)
+               lease, err := s.WaitGetLeaderLease()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Equal(ginkgo.GinkgoT(), 
*lease.Spec.LeaseDurationSeconds, int32(15))
+               if *lease.Spec.HolderIdentity != pods[0].Name && 
*lease.Spec.HolderIdentity != pods[1].Name {
+                       assert.Fail(ginkgo.GinkgoT(), "bad leader lease holder 
identity")
+               }
+       })
+
+       ginkgo.It("leader failover", func() {
+               pods, err := s.GetIngressPodDetails()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Len(ginkgo.GinkgoT(), pods, 2)
+
+               lease, err := s.WaitGetLeaderLease()
+               assert.Nil(ginkgo.GinkgoT(), err)
+
+               leaderIdx := 0
+               if *lease.Spec.HolderIdentity == pods[1].Name {
+                       leaderIdx = 1
+               }
+               ginkgo.GinkgoT().Logf("lease is %s", *lease.Spec.HolderIdentity)
+               assert.Nil(ginkgo.GinkgoT(), s.KillPod(pods[leaderIdx].Name))
+
+               // Wait the old lease expire and new leader was elected.
+               time.Sleep(25 * time.Second)
+
+               newLease, err := s.WaitGetLeaderLease()
+               assert.Nil(ginkgo.GinkgoT(), err)
+
+               newPods, err := s.GetIngressPodDetails()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Len(ginkgo.GinkgoT(), pods, 2)
+
+               assert.NotEqual(ginkgo.GinkgoT(), 
*newLease.Spec.HolderIdentity, *lease.Spec.HolderIdentity)
+               assert.Greater(ginkgo.GinkgoT(), 
*newLease.Spec.LeaseTransitions, *lease.Spec.LeaseTransitions)
+
+               if *newLease.Spec.HolderIdentity != newPods[0].Name && 
*newLease.Spec.HolderIdentity != newPods[1].Name {
+                       assert.Failf(ginkgo.GinkgoT(), "bad leader lease holder 
identity: %s, should be %s or %s",
+                               *newLease.Spec.HolderIdentity, newPods[0].Name, 
newPods[1].Name)
+               }
+       })
+})
diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go
index 33401b3..f9e1f87 100644
--- a/test/e2e/scaffold/httpbin.go
+++ b/test/e2e/scaffold/httpbin.go
@@ -18,6 +18,9 @@ import (
        "fmt"
        "time"
 
+       "github.com/onsi/ginkgo"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
        "github.com/gruntwork-io/terratest/modules/k8s"
        corev1 "k8s.io/api/core/v1"
 )
@@ -112,3 +115,32 @@ func (s *Scaffold) ScaleHTTPBIN(desired int) error {
        }
        return nil
 }
+
+// WaitAllHTTPBINPods waits until all httpbin pods ready.
+func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error {
+       opts := metav1.ListOptions{
+               LabelSelector: "app=httpbin-deployment-e2e-test",
+       }
+       condFunc := func() (bool, error) {
+               items, err := k8s.ListPodsE(s.t, s.kubectlOptions, opts)
+               if err != nil {
+                       return false, err
+               }
+               if len(items) == 0 {
+                       ginkgo.GinkgoT().Log("no apisix pods created")
+                       return false, nil
+               }
+               for _, item := range items {
+                       for _, cond := range item.Status.Conditions {
+                               if cond.Type != corev1.PodReady {
+                                       continue
+                               }
+                               if cond.Status != "True" {
+                                       return false, nil
+                               }
+                       }
+               }
+               return true, nil
+       }
+       return waitExponentialBackoff(condFunc)
+}
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 9a6de56..6076eb9 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -15,11 +15,15 @@
 package scaffold
 
 import (
+       "context"
        "fmt"
 
        "github.com/gruntwork-io/terratest/modules/k8s"
        "github.com/onsi/ginkgo"
+       coordinationv1 "k8s.io/api/coordination/v1"
        corev1 "k8s.io/api/core/v1"
+       v1 "k8s.io/api/core/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
@@ -45,7 +49,7 @@ kind: Deployment
 metadata:
   name: ingress-apisix-controller-deployment-e2e-test
 spec:
-  replicas: 1
+  replicas: %d
   selector:
     matchLabels:
       app: ingress-apisix-controller-deployment-e2e-test
@@ -77,6 +81,15 @@ spec:
             tcpSocket:
               port: 8080
             timeoutSeconds: 2
+          env:
+            - name: POD_NAMESPACE
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.namespace
+            - name: POD_NAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.name
           image: "apache/apisix-ingress-controller:dev"
           imagePullPolicy: Never
           name: ingress-apisix-controller-deployment-e2e-test
@@ -102,7 +115,7 @@ spec:
 )
 
 func (s *Scaffold) newIngressAPISIXController() error {
-       ingressAPISIXDeployment := 
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.namespace)
+       ingressAPISIXDeployment := 
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas, 
s.namespace)
        if err := k8s.CreateServiceAccountE(s.t, s.kubectlOptions, 
_serviceAccount); err != nil {
                return err
        }
@@ -144,3 +157,35 @@ func (s *Scaffold) waitAllIngressControllerPodsAvailable() 
error {
        }
        return waitExponentialBackoff(condFunc)
 }
+
+// WaitGetLeaderLease waits the lease to be created and returns it.
+func (s *Scaffold) WaitGetLeaderLease() (*coordinationv1.Lease, error) {
+       cli, err := k8s.GetKubernetesClientE(s.t)
+       if err != nil {
+               return nil, err
+       }
+       var lease *coordinationv1.Lease
+       condFunc := func() (bool, error) {
+               l, err := 
cli.CoordinationV1().Leases(s.namespace).Get(context.TODO(), 
"ingress-apisix-leader", metav1.GetOptions{})
+               if err != nil {
+                       if k8serrors.IsNotFound(err) {
+                               return false, nil
+                       }
+                       return false, err
+               }
+               lease = l
+               return true, nil
+       }
+       if err := waitExponentialBackoff(condFunc); err != nil {
+               return nil, err
+       }
+       return lease, nil
+}
+
+// GetIngressPodDetails returns a batch of pod description
+// about apisix-ingress-controller.
+func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
+       return k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
+               LabelSelector: 
"app=ingress-apisix-controller-deployment-e2e-test",
+       })
+}
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index fdd1998..6e3e851 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -15,6 +15,7 @@
 package scaffold
 
 import (
+       "context"
        "fmt"
        "io/ioutil"
        "net/http"
@@ -26,6 +27,8 @@ import (
        "text/template"
        "time"
 
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
        "github.com/gavv/httpexpect/v2"
        "github.com/gruntwork-io/terratest/modules/k8s"
        "github.com/gruntwork-io/terratest/modules/testing"
@@ -41,6 +44,7 @@ type Options struct {
        Kubeconfig              string
        APISIXConfigPath        string
        APISIXDefaultConfigPath string
+       IngressAPISIXReplicas   int
 }
 
 type Scaffold struct {
@@ -101,10 +105,20 @@ func NewDefaultScaffold() *Scaffold {
                Kubeconfig:              GetKubeconfig(),
                APISIXConfigPath:        "testdata/apisix-gw-config.yaml",
                APISIXDefaultConfigPath: 
"testdata/apisix-gw-config-default.yaml",
+               IngressAPISIXReplicas:   1,
        }
        return NewScaffold(opts)
 }
 
+// KillPod kill the pod which name is podName.
+func (s *Scaffold) KillPod(podName string) error {
+       cli, err := k8s.GetKubernetesClientE(s.t)
+       if err != nil {
+               return err
+       }
+       return cli.CoreV1().Pods(s.namespace).Delete(context.TODO(), podName, 
metav1.DeleteOptions{})
+}
+
 // DefaultHTTPBackend returns the service name and service ports
 // of the default http backend.
 func (s *Scaffold) DefaultHTTPBackend() (string, []int32) {
@@ -149,14 +163,12 @@ func (s *Scaffold) beforeEach() {
        s.etcdService, err = s.newEtcd()
        assert.Nil(s.t, err, "initializing etcd")
 
-       // We don't use k8s.WaitUntilServiceAvailable since it hacks for 
Minikube.
-       err = k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, 
s.labelSelector("app=etcd-deployment-e2e-test"), 1, 5, 2*time.Second)
+       err = s.waitAllEtcdPodsAvailable()
        assert.Nil(s.t, err, "waiting for etcd ready")
 
        s.apisixService, err = s.newAPISIX()
        assert.Nil(s.t, err, "initializing Apache APISIX")
 
-       // We don't use k8s.WaitUntilServiceAvailable since it hacks for 
Minikube.
        err = s.waitAllAPISIXPodsAvailable()
        assert.Nil(s.t, err, "waiting for apisix ready")
 
@@ -180,6 +192,10 @@ func (s *Scaffold) afterEach() {
        for _, f := range s.finializers {
                f()
        }
+
+       // Wait for a while to prevent the worker node being overwhelming
+       // (new cases will be run).
+       time.Sleep(3 * time.Second)
 }
 
 func (s *Scaffold) addFinializer(f func()) {
@@ -202,10 +218,9 @@ func (s *Scaffold) renderConfig(path string) (string, 
error) {
 
 func waitExponentialBackoff(condFunc func() (bool, error)) error {
        backoff := wait.Backoff{
-               Duration: 100 * time.Millisecond,
-               Factor:   3,
-               Jitter:   0,
-               Steps:    6,
+               Duration: 500 * time.Millisecond,
+               Factor:   2,
+               Steps:    8,
        }
        return wait.ExponentialBackoff(backoff, condFunc)
 }

Reply via email to