This is an automated email from the ASF dual-hosted git repository.

ronething pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 27eb3e9f feat: gatewayproxy controller (#2444)
27eb3e9f is described below

commit 27eb3e9ffd0ff736ee87aca2dbc35b5c400d966e
Author: 悟空 <rainchan...@163.com>
AuthorDate: Thu Jul 3 18:00:57 2025 +0800

    feat: gatewayproxy controller (#2444)
---
 internal/controller/gatewayproxy_controller.go | 179 +++++++++++++++++++++++++
 internal/controller/indexer/indexer.go         |  20 +++
 internal/manager/controllers.go                |   6 +
 internal/provider/adc/adc.go                   |   2 +
 internal/provider/adc/config.go                |  44 ++++--
 internal/provider/provider.go                  |   5 +-
 test/e2e/crds/gatewayproxy.go                  | 176 ++++++++++++++++++++++++
 test/e2e/framework/k8s.go                      |  10 +-
 test/e2e/framework/manifests/apisix.yaml       |   2 +-
 test/e2e/scaffold/apisix_deployer.go           |  21 ++-
 test/e2e/scaffold/deployer.go                  |   1 +
 test/e2e/scaffold/scaffold.go                  |  33 ++++-
 12 files changed, 484 insertions(+), 15 deletions(-)

diff --git a/internal/controller/gatewayproxy_controller.go 
b/internal/controller/gatewayproxy_controller.go
new file mode 100644
index 00000000..d1a4b803
--- /dev/null
+++ b/internal/controller/gatewayproxy_controller.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package controller
+
+import (
+       "context"
+       "errors"
+
+       "github.com/go-logr/logr"
+       corev1 "k8s.io/api/core/v1"
+       discoveryv1 "k8s.io/api/discovery/v1"
+       networkingv1 "k8s.io/api/networking/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
+       ctrl "sigs.k8s.io/controller-runtime"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+       "sigs.k8s.io/controller-runtime/pkg/handler"
+       "sigs.k8s.io/controller-runtime/pkg/reconcile"
+       gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
+
+       "github.com/apache/apisix-ingress-controller/api/v1alpha1"
+       
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
+       "github.com/apache/apisix-ingress-controller/internal/provider"
+       "github.com/apache/apisix-ingress-controller/internal/utils"
+)
+
+// GatewayProxyController reconciles a GatewayProxy object.
+type GatewayProxyController struct {
+       client.Client
+
+       Scheme   *runtime.Scheme
+       Log      logr.Logger
+       Provider provider.Provider
+}
+
+func (r *GatewayProxyController) SetupWithManager(mrg ctrl.Manager) error {
+       return ctrl.NewControllerManagedBy(mrg).
+               For(&v1alpha1.GatewayProxy{}).
+               Watches(&corev1.Service{},
+                       
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderService),
+               ).
+               Watches(&discoveryv1.EndpointSlice{},
+                       
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderEndpointSlice),
+               ).
+               Watches(&corev1.Secret{},
+                       
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForSecret),
+               ).
+               Complete(r)
+}
+
+func (r *GatewayProxyController) Reconcile(ctx context.Context, req 
ctrl.Request) (reconcile.Result, error) {
+       var tctx = provider.NewDefaultTranslateContext(ctx)
+
+       var gp v1alpha1.GatewayProxy
+       if err := r.Get(ctx, req.NamespacedName, &gp); err != nil {
+               if client.IgnoreNotFound(err) == nil {
+                       gp.Namespace = req.Namespace
+                       gp.Name = req.Name
+                       err = r.Provider.Update(ctx, tctx, &gp)
+               }
+               return ctrl.Result{}, err
+       }
+
+       // if there is no provider, update with empty translate context
+       if gp.Spec.Provider == nil || gp.Spec.Provider.ControlPlane == nil {
+               return reconcile.Result{}, r.Provider.Update(ctx, tctx, &gp)
+       }
+
+       // process endpoints for provider service
+       providerService := gp.Spec.Provider.ControlPlane.Service
+       if providerService == nil {
+               tctx.EndpointSlices[req.NamespacedName] = nil
+       } else {
+               if err := addProviderEndpointsToTranslateContext(tctx, 
r.Client, types.NamespacedName{
+                       Namespace: gp.Namespace,
+                       Name:      providerService.Name,
+               }); err != nil {
+                       return reconcile.Result{}, err
+               }
+       }
+
+       // process secret for provider auth
+       auth := gp.Spec.Provider.ControlPlane.Auth
+       if auth.AdminKey != nil && auth.AdminKey.ValueFrom != nil && 
auth.AdminKey.ValueFrom.SecretKeyRef != nil {
+               var (
+                       secret   corev1.Secret
+                       secretNN = types.NamespacedName{
+                               Namespace: gp.GetNamespace(),
+                               Name:      
auth.AdminKey.ValueFrom.SecretKeyRef.Name,
+                       }
+               )
+               if err := r.Get(ctx, secretNN, &secret); err != nil {
+                       r.Log.Error(err, "failed to get secret", "secret", 
secretNN)
+                       return reconcile.Result{}, err
+               }
+               tctx.Secrets[secretNN] = &secret
+       }
+
+       // list Gateways that reference the GatewayProxy
+       var (
+               gatewayList      gatewayv1.GatewayList
+               ingressClassList networkingv1.IngressClassList
+               indexKey         = indexer.GenIndexKey(gp.GetNamespace(), 
gp.GetName())
+       )
+       if err := r.List(ctx, &gatewayList, 
client.MatchingFields{indexer.ParametersRef: indexKey}); err != nil {
+               r.Log.Error(err, "failed to list GatewayList")
+               return ctrl.Result{}, nil
+       }
+
+       // list IngressClasses that reference the GatewayProxy
+       if err := r.List(ctx, &ingressClassList, 
client.MatchingFields{indexer.IngressClassParametersRef: indexKey}); err != nil 
{
+               r.Log.Error(err, "failed to list IngressClassList")
+               return reconcile.Result{}, err
+       }
+
+       // append referrers to translate context
+       for _, item := range gatewayList.Items {
+               tctx.GatewayProxyReferrers[req.NamespacedName] = 
append(tctx.GatewayProxyReferrers[req.NamespacedName], 
utils.NamespacedNameKind(&item))
+       }
+       for _, item := range ingressClassList.Items {
+               tctx.GatewayProxyReferrers[req.NamespacedName] = 
append(tctx.GatewayProxyReferrers[req.NamespacedName], 
utils.NamespacedNameKind(&item))
+       }
+
+       if err := r.Provider.Update(ctx, tctx, &gp); err != nil {
+               return reconcile.Result{}, err
+       }
+
+       return reconcile.Result{}, nil
+}
+
+func (r *GatewayProxyController) listGatewayProxiesForProviderService(ctx 
context.Context, obj client.Object) (requests []reconcile.Request) {
+       service, ok := obj.(*corev1.Service)
+       if !ok {
+               r.Log.Error(errors.New("unexpected object type"), "failed to 
convert object to Service")
+               return nil
+       }
+
+       return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, 
client.MatchingFields{
+               indexer.ServiceIndexRef: 
indexer.GenIndexKey(service.GetNamespace(), service.GetName()),
+       })
+}
+
+func (r *GatewayProxyController) 
listGatewayProxiesForProviderEndpointSlice(ctx context.Context, obj 
client.Object) (requests []reconcile.Request) {
+       endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
+       if !ok {
+               r.Log.Error(errors.New("unexpected object type"), "failed to 
convert object to EndpointSlice")
+               return nil
+       }
+
+       return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, 
client.MatchingFields{
+               indexer.ServiceIndexRef: 
indexer.GenIndexKey(endpointSlice.GetNamespace(), 
endpointSlice.Labels[discoveryv1.LabelServiceName]),
+       })
+}
+
+func (r *GatewayProxyController) listGatewayProxiesForSecret(ctx 
context.Context, object client.Object) []reconcile.Request {
+       secret, ok := object.(*corev1.Secret)
+       if !ok {
+               r.Log.Error(errors.New("unexpected object type"), "failed to 
convert object to Secret")
+               return nil
+       }
+       return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, 
client.MatchingFields{
+               indexer.SecretIndexRef: 
indexer.GenIndexKey(secret.GetNamespace(), secret.GetName()),
+       })
+}
diff --git a/internal/controller/indexer/indexer.go 
b/internal/controller/indexer/indexer.go
index dfc06486..d9dd1638 100644
--- a/internal/controller/indexer/indexer.go
+++ b/internal/controller/indexer/indexer.go
@@ -241,6 +241,15 @@ func setupIngressClassIndexer(mgr ctrl.Manager) error {
 }
 
 func setupGatewayProxyIndexer(mgr ctrl.Manager) error {
+       if err := mgr.GetFieldIndexer().IndexField(
+               context.Background(),
+               &v1alpha1.GatewayProxy{},
+               ServiceIndexRef,
+               GatewayProxyServiceIndexFunc,
+       ); err != nil {
+               return err
+       }
+
        if err := mgr.GetFieldIndexer().IndexField(
                context.Background(),
                &v1alpha1.GatewayProxy{},
@@ -272,6 +281,17 @@ func setupGatewayClassIndexer(mgr ctrl.Manager) error {
        )
 }
 
+func GatewayProxyServiceIndexFunc(rawObj client.Object) []string {
+       gatewayProxy := rawObj.(*v1alpha1.GatewayProxy)
+       if gatewayProxy.Spec.Provider != nil &&
+               gatewayProxy.Spec.Provider.ControlPlane != nil &&
+               gatewayProxy.Spec.Provider.ControlPlane.Service != nil {
+               service := gatewayProxy.Spec.Provider.ControlPlane.Service
+               return []string{GenIndexKey(gatewayProxy.GetNamespace(), 
service.Name)}
+       }
+       return nil
+}
+
 func GatewayProxySecretIndexFunc(rawObj client.Object) []string {
        gatewayProxy := rawObj.(*v1alpha1.GatewayProxy)
        secretKeys := make([]string, 0)
diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go
index 0e24a812..2af8cd6a 100644
--- a/internal/manager/controllers.go
+++ b/internal/manager/controllers.go
@@ -168,5 +168,11 @@ func setupControllers(ctx context.Context, mgr 
manager.Manager, pro provider.Pro
                        Log:     
ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixUpstream"),
                        Updater: updater,
                },
+               &controller.GatewayProxyController{
+                       Client:   mgr.GetClient(),
+                       Scheme:   mgr.GetScheme(),
+                       Log:      
ctrl.LoggerFrom(ctx).WithName("controllers").WithName("GatewayProxy"),
+                       Provider: pro,
+               },
        }, nil
 }
diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go
index 4cf076d8..ae0a4513 100644
--- a/internal/provider/adc/adc.go
+++ b/internal/provider/adc/adc.go
@@ -133,6 +133,8 @@ func (d *adcClient) Update(ctx context.Context, tctx 
*provider.TranslateContext,
        case *apiv2.ApisixConsumer:
                result, err = d.translator.TranslateApisixConsumer(tctx, 
t.DeepCopy())
                resourceTypes = append(resourceTypes, "consumer")
+       case *v1alpha1.GatewayProxy:
+               return d.updateConfigForGatewayProxy(tctx, t)
        }
        if err != nil {
                return err
diff --git a/internal/provider/adc/config.go b/internal/provider/adc/config.go
index e20c46e4..2bf512d3 100644
--- a/internal/provider/adc/config.go
+++ b/internal/provider/adc/config.go
@@ -18,20 +18,22 @@
 package adc
 
 import (
+       "errors"
+       "fmt"
        "net"
        "slices"
        "strconv"
 
        "github.com/api7/gopkg/pkg/log"
-       "github.com/pkg/errors"
        "go.uber.org/zap"
        k8stypes "k8s.io/apimachinery/pkg/types"
        "k8s.io/utils/ptr"
-       v1 "sigs.k8s.io/gateway-api/apis/v1"
+       gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
 
        "github.com/apache/apisix-ingress-controller/api/v1alpha1"
        "github.com/apache/apisix-ingress-controller/internal/provider"
        "github.com/apache/apisix-ingress-controller/internal/types"
+       "github.com/apache/apisix-ingress-controller/internal/utils"
 )
 
 func (d *adcClient) getConfigsForGatewayProxy(tctx *provider.TranslateContext, 
gatewayProxy *v1alpha1.GatewayProxy) (*adcConfig, error) {
@@ -87,17 +89,17 @@ func (d *adcClient) getConfigsForGatewayProxy(tctx 
*provider.TranslateContext, g
                }
                _, ok := tctx.Services[namespacedName]
                if !ok {
-                       return nil, errors.Errorf("no service found for service 
reference: %s", namespacedName)
+                       return nil, fmt.Errorf("no service found for service 
reference: %s", namespacedName)
                }
                endpoint := tctx.EndpointSlices[namespacedName]
                if endpoint == nil {
                        return nil, nil
                }
-               upstreamNodes, err := d.translator.TranslateBackendRef(tctx, 
v1.BackendRef{
-                       BackendObjectReference: v1.BackendObjectReference{
-                               Name:      
v1.ObjectName(provider.ControlPlane.Service.Name),
-                               Namespace: 
(*v1.Namespace)(&gatewayProxy.Namespace),
-                               Port:      
ptr.To(v1.PortNumber(provider.ControlPlane.Service.Port)),
+               upstreamNodes, err := d.translator.TranslateBackendRef(tctx, 
gatewayv1.BackendRef{
+                       BackendObjectReference: 
gatewayv1.BackendObjectReference{
+                               Name:      
gatewayv1.ObjectName(provider.ControlPlane.Service.Name),
+                               Namespace: 
(*gatewayv1.Namespace)(&gatewayProxy.Namespace),
+                               Port:      
ptr.To(gatewayv1.PortNumber(provider.ControlPlane.Service.Port)),
                        },
                })
                if err != nil {
@@ -167,6 +169,32 @@ func (d *adcClient) updateConfigs(rk 
types.NamespacedNameKind, tctx *provider.Tr
        return nil
 }
 
+// updateConfigForGatewayProxy update config for all referrers of the 
GatewayProxy
+func (d *adcClient) updateConfigForGatewayProxy(tctx 
*provider.TranslateContext, gp *v1alpha1.GatewayProxy) error {
+       d.Lock()
+       defer d.Unlock()
+
+       config, err := d.getConfigsForGatewayProxy(tctx, gp)
+       if err != nil {
+               return err
+       }
+
+       referrers := tctx.GatewayProxyReferrers[utils.NamespacedName(gp)]
+
+       if config == nil {
+               for _, ref := range referrers {
+                       delete(d.configs, ref)
+               }
+               return nil
+       }
+
+       for _, ref := range referrers {
+               d.configs[ref] = *config
+       }
+
+       return nil
+}
+
 func (d *adcClient) findConfigsToDelete(oldParentRefs, newParentRefs 
[]types.NamespacedNameKind) []adcConfig {
        var deleteConfigs []adcConfig
        for _, parentRef := range oldParentRefs {
diff --git a/internal/provider/provider.go b/internal/provider/provider.go
index 49e5222d..b06899fb 100644
--- a/internal/provider/provider.go
+++ b/internal/provider/provider.go
@@ -55,7 +55,9 @@ type TranslateContext struct {
        Upstreams              map[k8stypes.NamespacedName]*apiv2.ApisixUpstream
        GatewayProxies         
map[types.NamespacedNameKind]v1alpha1.GatewayProxy
        ResourceParentRefs     
map[types.NamespacedNameKind][]types.NamespacedNameKind
-       HTTPRoutePolicies      []v1alpha1.HTTPRoutePolicy
+       // GatewayProxyReferrers key is GatewayProxy, value is a list of 
resources that reference this GatewayProxy
+       GatewayProxyReferrers 
map[k8stypes.NamespacedName][]types.NamespacedNameKind
+       HTTPRoutePolicies     []v1alpha1.HTTPRoutePolicy
 
        StatusUpdaters []status.Update
 }
@@ -72,5 +74,6 @@ func NewDefaultTranslateContext(ctx context.Context) 
*TranslateContext {
                Upstreams:              
make(map[k8stypes.NamespacedName]*apiv2.ApisixUpstream),
                GatewayProxies:         
make(map[types.NamespacedNameKind]v1alpha1.GatewayProxy),
                ResourceParentRefs:     
make(map[types.NamespacedNameKind][]types.NamespacedNameKind),
+               GatewayProxyReferrers:  
make(map[k8stypes.NamespacedName][]types.NamespacedNameKind),
        }
 }
diff --git a/test/e2e/crds/gatewayproxy.go b/test/e2e/crds/gatewayproxy.go
new file mode 100644
index 00000000..57098df4
--- /dev/null
+++ b/test/e2e/crds/gatewayproxy.go
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gatewayapi
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+       "time"
+
+       "github.com/gruntwork-io/terratest/modules/k8s"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/apimachinery/pkg/util/wait"
+       "k8s.io/utils/ptr"
+
+       "github.com/apache/apisix-ingress-controller/test/e2e/framework"
+       "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = Describe("Test GatewayProxy", Label("apisix.apache.org", "v1alpha1", 
"gatewayproxy"), func() {
+       var (
+               s   = scaffold.NewDefaultScaffold()
+               err error
+       )
+
+       const gatewayProxySpec = `
+apiVersion: apisix.apache.org/v1alpha1
+kind: GatewayProxy
+metadata:
+  name: apisix-proxy-config
+spec:
+  provider:
+    type: ControlPlane
+    controlPlane:
+      service:
+        name: %s
+        port: 9180
+      auth:
+        type: AdminKey
+        adminKey:
+          value: "%s"
+  plugins:
+  - name: response-rewrite
+    enabled: true
+    config: 
+      headers:
+        "X-Pod-Hostname": "$hostname"
+`
+       const gatewayClassSpec = `
+apiVersion: gateway.networking.k8s.io/v1
+kind: GatewayClass
+metadata:
+  name: %s
+spec:
+  controllerName: %s
+`
+       const gatewaySpec = `
+apiVersion: gateway.networking.k8s.io/v1
+kind: Gateway
+metadata:
+  name: apisix
+spec:
+  gatewayClassName: %s
+  listeners:
+    - name: http1
+      protocol: HTTP
+      port: 80
+  infrastructure:
+    parametersRef:
+      group: apisix.apache.org
+      kind: GatewayProxy
+      name: apisix-proxy-config
+`
+       const httpRouteSpec = `
+apiVersion: gateway.networking.k8s.io/v1
+kind: HTTPRoute
+metadata:
+  name: httpbin
+spec:
+  parentRefs:
+  - name: apisix
+  hostnames:
+  - "httpbin.org"
+  rules:
+  - matches: 
+    - path:
+        type: Exact
+        value: /get
+    backendRefs:
+    - name: httpbin-service-e2e-test
+      port: 80
+`
+       BeforeEach(func() {
+               By("create GatewayProxy")
+               err = s.CreateResourceFromString(fmt.Sprintf(gatewayProxySpec, 
framework.ProviderType, s.AdminKey()))
+               Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy")
+               time.Sleep(time.Second)
+
+               By("create GatewayClass")
+               gatewayClassName := fmt.Sprintf("apisix-%d", time.Now().Unix())
+               err = s.CreateResourceFromString(fmt.Sprintf(gatewayClassSpec, 
gatewayClassName, s.GetControllerName()))
+               Expect(err).NotTo(HaveOccurred(), "creating GatewayClass")
+               time.Sleep(time.Second)
+
+               By("create Gateway")
+               err = 
s.CreateResourceFromStringWithNamespace(fmt.Sprintf(gatewaySpec, 
gatewayClassName), s.Namespace())
+               Expect(err).NotTo(HaveOccurred(), "creating Gateway")
+               time.Sleep(time.Second)
+
+               By("create HTTPRoute")
+               s.ApplyHTTPRoute(types.NamespacedName{Namespace: s.Namespace(), 
Name: "httpbin"}, httpRouteSpec)
+
+               Eventually(func() int {
+                       return 
s.NewAPISIXClient().GET("/get").WithHost("httpbin.org").Expect().Raw().StatusCode
+               }).WithTimeout(8 * 
time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK))
+       })
+
+       Context("Test GatewayProxy update configs", func() {
+               It("scaling apisix pods to test that the controller watches 
endpoints", func() {
+                       By("scale apisix to replicas 2")
+                       
s.Deployer.DeployDataplane(scaffold.DeployDataplaneOptions{
+                               Replicas: ptr.To(2),
+                       })
+
+                       By("check pod ready")
+                       err = 
wait.PollUntilContextTimeout(context.Background(), time.Second, 10*time.Second, 
true, func(ctx context.Context) (done bool, err error) {
+                               pods := s.GetPods(s.Namespace(), 
"app.kubernetes.io/name=apisix")
+                               if len(pods) != 2 {
+                                       return false, nil
+                               }
+                               for _, pod := range pods {
+                                       if pod.Status.PodIP == "" {
+                                               return false, nil
+                                       }
+                               }
+                               return true, nil
+                       })
+                       Expect(err).NotTo(HaveOccurred(), "check pods ready")
+
+                       By("request every pod to check configuration effect")
+                       pods := s.GetPods(s.Namespace(), 
"app.kubernetes.io/name=apisix")
+                       for i, pod := range pods {
+                               s.Logf("pod name: %s", pod.GetName())
+                               tunnel := k8s.NewTunnel(s.KubeOpts(), 
k8s.ResourceTypePod, pod.GetName(), 9080+i, 9080)
+                               err := tunnel.ForwardPortE(s.GinkgoT)
+                               Expect(err).NotTo(HaveOccurred(), "forward pod: 
%s", pod.Name)
+
+                               err = 
wait.PollUntilContextTimeout(context.Background(), time.Second, 30*time.Second, 
true, func(ctx context.Context) (done bool, err error) {
+                                       resp := scaffold.NewClient("http", 
tunnel.Endpoint()).
+                                               
GET("/get").WithHost("httpbin.org").Expect().Raw()
+                                       return resp.StatusCode == http.StatusOK 
&& resp.Header.Get("X-Pod-Hostname") == pod.GetName(), nil
+                               })
+                               Expect(err).NotTo(HaveOccurred(), "request the 
pod: %s", pod.GetName())
+
+                               tunnel.Close()
+                       }
+               })
+       })
+})
diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go
index ec97601e..4848b62d 100644
--- a/test/e2e/framework/k8s.go
+++ b/test/e2e/framework/k8s.go
@@ -195,18 +195,24 @@ func (f *Framework) Scale(name string, replicas int32) {
                UpdateScale(context.Background(), name, scale, 
metav1.UpdateOptions{})
        f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("scale 
deployment %s to %v failed", name, replicas))
 
+       // FIXME: The service name and the deployment name may not be the same
        err = f.ensureService(name, _namespace, int(replicas))
        f.GomegaT.Expect(err).ShouldNot(HaveOccurred(),
                fmt.Sprintf("ensure service %s/%s has %v endpoints failed", 
_namespace, name, replicas))
 }
 
 func (f *Framework) GetPodIP(selector string) string {
-       podList, err := f.clientset.CoreV1().Pods(_namespace).List(f.Context, 
metav1.ListOptions{
+       pods := f.GetPods("", selector)
+       return pods[0].Status.PodIP
+}
+
+func (f *Framework) GetPods(namespace, selector string) []corev1.Pod {
+       podList, err := f.clientset.CoreV1().Pods(cmp.Or(namespace, 
_namespace)).List(f.Context, metav1.ListOptions{
                LabelSelector: selector,
        })
        f.GomegaT.Expect(err).ShouldNot(HaveOccurred())
        f.GomegaT.Expect(podList.Items).ShouldNot(BeEmpty())
-       return podList.Items[0].Status.PodIP
+       return podList.Items
 }
 
 //nolint:unused
diff --git a/test/e2e/framework/manifests/apisix.yaml 
b/test/e2e/framework/manifests/apisix.yaml
index 61d19763..affa4bfb 100644
--- a/test/e2e/framework/manifests/apisix.yaml
+++ b/test/e2e/framework/manifests/apisix.yaml
@@ -50,7 +50,7 @@ metadata:
   labels:
     app.kubernetes.io/name: apisix
 spec:
-  replicas: 1
+  replicas: {{ default 1 .Replicas }}
   selector:
     matchLabels:
       app.kubernetes.io/name: apisix
diff --git a/test/e2e/scaffold/apisix_deployer.go 
b/test/e2e/scaffold/apisix_deployer.go
index 6d28b732..a0e2d531 100644
--- a/test/e2e/scaffold/apisix_deployer.go
+++ b/test/e2e/scaffold/apisix_deployer.go
@@ -28,6 +28,7 @@ import (
        . "github.com/onsi/gomega"    //nolint:staticcheck
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/utils/ptr"
 
        "github.com/apache/apisix-ingress-controller/internal/provider/adc"
        "github.com/apache/apisix-ingress-controller/pkg/utils"
@@ -44,6 +45,7 @@ type APISIXDeployOptions struct {
        ServiceHTTPSPort int
 
        ConfigProvider string
+       Replicas       *int
 }
 
 type APISIXDeployer struct {
@@ -147,6 +149,7 @@ func (s *APISIXDeployer) DeployDataplane(deployOpts 
DeployDataplaneOptions) {
                AdminKey:         s.opts.APISIXAdminAPIKey,
                ServiceHTTPPort:  9080,
                ServiceHTTPSPort: 9443,
+               Replicas:         ptr.To(1),
        }
 
        if deployOpts.Namespace != "" {
@@ -161,6 +164,19 @@ func (s *APISIXDeployer) DeployDataplane(deployOpts 
DeployDataplaneOptions) {
        if deployOpts.ServiceHTTPSPort != 0 {
                opts.ServiceHTTPSPort = deployOpts.ServiceHTTPSPort
        }
+       if deployOpts.Replicas != nil {
+               opts.Replicas = deployOpts.Replicas
+       }
+
+       for _, tunnel := range []*k8s.Tunnel{
+               s.adminTunnel,
+               s.apisixHttpTunnel,
+               s.apisixHttpsTunnel,
+       } {
+               if tunnel != nil {
+                       tunnel.Close()
+               }
+       }
 
        svc := s.deployDataplane(&opts)
        s.dataplaneService = svc
@@ -286,7 +302,10 @@ func (s *APISIXDeployer) createAdminTunnel(svc 
*corev1.Service) (*k8s.Tunnel, er
        if err := adminTunnel.ForwardPortE(s.t); err != nil {
                return nil, err
        }
-       s.addFinalizers(adminTunnel.Close)
+       s.addFinalizers(func() {
+               adminTunnel.Close()
+               s.adminTunnel = nil
+       })
 
        return adminTunnel, nil
 }
diff --git a/test/e2e/scaffold/deployer.go b/test/e2e/scaffold/deployer.go
index 2f4d02b4..cfbe4043 100644
--- a/test/e2e/scaffold/deployer.go
+++ b/test/e2e/scaffold/deployer.go
@@ -42,4 +42,5 @@ type DeployDataplaneOptions struct {
        SkipCreateTunnels bool
        ServiceHTTPPort   int
        ServiceHTTPSPort  int
+       Replicas          *int
 }
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 35c30f3b..f2f97a2d 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -313,13 +313,19 @@ func (s *Scaffold) createDataplaneTunnels(
        if err := httpTunnel.ForwardPortE(s.t); err != nil {
                return nil, nil, err
        }
-       s.addFinalizers(httpTunnel.Close)
+       s.addFinalizers(func() {
+               httpTunnel.Close()
+               s.apisixHttpTunnel = nil
+       })
 
        if err := httpsTunnel.ForwardPortE(s.t); err != nil {
                httpTunnel.Close()
                return nil, nil, err
        }
-       s.addFinalizers(httpsTunnel.Close)
+       s.addFinalizers(func() {
+               httpsTunnel.Close()
+               s.apisixHttpsTunnel = nil
+       })
 
        return httpTunnel, httpsTunnel, nil
 }
@@ -406,3 +412,26 @@ func (s *Scaffold) GetGatewayHTTPSEndpoint(identifier 
string) (string, error) {
 func (s *Scaffold) GetDataplaneService() *corev1.Service {
        return s.dataplaneService
 }
+
+func (s *Scaffold) KubeOpts() *k8s.KubectlOptions {
+       return s.kubectlOptions
+}
+
+func NewClient(scheme, host string) *httpexpect.Expect {
+       u := url.URL{
+               Scheme: scheme,
+               Host:   host,
+       }
+       return httpexpect.WithConfig(httpexpect.Config{
+               BaseURL: u.String(),
+               Client: &http.Client{
+                       Transport: &http.Transport{},
+                       CheckRedirect: func(req *http.Request, via 
[]*http.Request) error {
+                               return http.ErrUseLastResponse
+                       },
+               },
+               Reporter: httpexpect.NewAssertReporter(
+                       httpexpect.NewAssertReporter(GinkgoT()),
+               ),
+       })
+}

Reply via email to