This is an automated email from the ASF dual-hosted git repository. ronething pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push: new 27eb3e9f feat: gatewayproxy controller (#2444) 27eb3e9f is described below commit 27eb3e9ffd0ff736ee87aca2dbc35b5c400d966e Author: 悟空 <rainchan...@163.com> AuthorDate: Thu Jul 3 18:00:57 2025 +0800 feat: gatewayproxy controller (#2444) --- internal/controller/gatewayproxy_controller.go | 179 +++++++++++++++++++++++++ internal/controller/indexer/indexer.go | 20 +++ internal/manager/controllers.go | 6 + internal/provider/adc/adc.go | 2 + internal/provider/adc/config.go | 44 ++++-- internal/provider/provider.go | 5 +- test/e2e/crds/gatewayproxy.go | 176 ++++++++++++++++++++++++ test/e2e/framework/k8s.go | 10 +- test/e2e/framework/manifests/apisix.yaml | 2 +- test/e2e/scaffold/apisix_deployer.go | 21 ++- test/e2e/scaffold/deployer.go | 1 + test/e2e/scaffold/scaffold.go | 33 ++++- 12 files changed, 484 insertions(+), 15 deletions(-) diff --git a/internal/controller/gatewayproxy_controller.go b/internal/controller/gatewayproxy_controller.go new file mode 100644 index 00000000..d1a4b803 --- /dev/null +++ b/internal/controller/gatewayproxy_controller.go @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controller + +import ( + "context" + "errors" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// GatewayProxyController reconciles a GatewayProxy object. +type GatewayProxyController struct { + client.Client + + Scheme *runtime.Scheme + Log logr.Logger + Provider provider.Provider +} + +func (r *GatewayProxyController) SetupWithManager(mrg ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mrg). + For(&v1alpha1.GatewayProxy{}). + Watches(&corev1.Service{}, + handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderService), + ). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderEndpointSlice), + ). + Watches(&corev1.Secret{}, + handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForSecret), + ). + Complete(r) +} + +func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request) (reconcile.Result, error) { + var tctx = provider.NewDefaultTranslateContext(ctx) + + var gp v1alpha1.GatewayProxy + if err := r.Get(ctx, req.NamespacedName, &gp); err != nil { + if client.IgnoreNotFound(err) == nil { + gp.Namespace = req.Namespace + gp.Name = req.Name + err = r.Provider.Update(ctx, tctx, &gp) + } + return ctrl.Result{}, err + } + + // if there is no provider, update with empty translate context + if gp.Spec.Provider == nil || gp.Spec.Provider.ControlPlane == nil { + return reconcile.Result{}, r.Provider.Update(ctx, tctx, &gp) + } + + // process endpoints for provider service + providerService := gp.Spec.Provider.ControlPlane.Service + if providerService == nil { + tctx.EndpointSlices[req.NamespacedName] = nil + } else { + if err := addProviderEndpointsToTranslateContext(tctx, r.Client, types.NamespacedName{ + Namespace: gp.Namespace, + Name: providerService.Name, + }); err != nil { + return reconcile.Result{}, err + } + } + + // process secret for provider auth + auth := gp.Spec.Provider.ControlPlane.Auth + if auth.AdminKey != nil && auth.AdminKey.ValueFrom != nil && auth.AdminKey.ValueFrom.SecretKeyRef != nil { + var ( + secret corev1.Secret + secretNN = types.NamespacedName{ + Namespace: gp.GetNamespace(), + Name: auth.AdminKey.ValueFrom.SecretKeyRef.Name, + } + ) + if err := r.Get(ctx, secretNN, &secret); err != nil { + r.Log.Error(err, "failed to get secret", "secret", secretNN) + return reconcile.Result{}, err + } + tctx.Secrets[secretNN] = &secret + } + + // list Gateways that reference the GatewayProxy + var ( + gatewayList gatewayv1.GatewayList + ingressClassList networkingv1.IngressClassList + indexKey = indexer.GenIndexKey(gp.GetNamespace(), gp.GetName()) + ) + if err := r.List(ctx, &gatewayList, client.MatchingFields{indexer.ParametersRef: indexKey}); err != nil { + r.Log.Error(err, "failed to list GatewayList") + return ctrl.Result{}, nil + } + + // list IngressClasses that reference the GatewayProxy + if err := r.List(ctx, &ingressClassList, client.MatchingFields{indexer.IngressClassParametersRef: indexKey}); err != nil { + r.Log.Error(err, "failed to list IngressClassList") + return reconcile.Result{}, err + } + + // append referrers to translate context + for _, item := range gatewayList.Items { + tctx.GatewayProxyReferrers[req.NamespacedName] = append(tctx.GatewayProxyReferrers[req.NamespacedName], utils.NamespacedNameKind(&item)) + } + for _, item := range ingressClassList.Items { + tctx.GatewayProxyReferrers[req.NamespacedName] = append(tctx.GatewayProxyReferrers[req.NamespacedName], utils.NamespacedNameKind(&item)) + } + + if err := r.Provider.Update(ctx, tctx, &gp); err != nil { + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +func (r *GatewayProxyController) listGatewayProxiesForProviderService(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + service, ok := obj.(*corev1.Service) + if !ok { + r.Log.Error(errors.New("unexpected object type"), "failed to convert object to Service") + return nil + } + + return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(service.GetNamespace(), service.GetName()), + }) +} + +func (r *GatewayProxyController) listGatewayProxiesForProviderEndpointSlice(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(errors.New("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + + return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(endpointSlice.GetNamespace(), endpointSlice.Labels[discoveryv1.LabelServiceName]), + }) +} + +func (r *GatewayProxyController) listGatewayProxiesForSecret(ctx context.Context, object client.Object) []reconcile.Request { + secret, ok := object.(*corev1.Secret) + if !ok { + r.Log.Error(errors.New("unexpected object type"), "failed to convert object to Secret") + return nil + } + return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{ + indexer.SecretIndexRef: indexer.GenIndexKey(secret.GetNamespace(), secret.GetName()), + }) +} diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index dfc06486..d9dd1638 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -241,6 +241,15 @@ func setupIngressClassIndexer(mgr ctrl.Manager) error { } func setupGatewayProxyIndexer(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &v1alpha1.GatewayProxy{}, + ServiceIndexRef, + GatewayProxyServiceIndexFunc, + ); err != nil { + return err + } + if err := mgr.GetFieldIndexer().IndexField( context.Background(), &v1alpha1.GatewayProxy{}, @@ -272,6 +281,17 @@ func setupGatewayClassIndexer(mgr ctrl.Manager) error { ) } +func GatewayProxyServiceIndexFunc(rawObj client.Object) []string { + gatewayProxy := rawObj.(*v1alpha1.GatewayProxy) + if gatewayProxy.Spec.Provider != nil && + gatewayProxy.Spec.Provider.ControlPlane != nil && + gatewayProxy.Spec.Provider.ControlPlane.Service != nil { + service := gatewayProxy.Spec.Provider.ControlPlane.Service + return []string{GenIndexKey(gatewayProxy.GetNamespace(), service.Name)} + } + return nil +} + func GatewayProxySecretIndexFunc(rawObj client.Object) []string { gatewayProxy := rawObj.(*v1alpha1.GatewayProxy) secretKeys := make([]string, 0) diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index 0e24a812..2af8cd6a 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -168,5 +168,11 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixUpstream"), Updater: updater, }, + &controller.GatewayProxyController{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("GatewayProxy"), + Provider: pro, + }, }, nil } diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index 4cf076d8..ae0a4513 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -133,6 +133,8 @@ func (d *adcClient) Update(ctx context.Context, tctx *provider.TranslateContext, case *apiv2.ApisixConsumer: result, err = d.translator.TranslateApisixConsumer(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, "consumer") + case *v1alpha1.GatewayProxy: + return d.updateConfigForGatewayProxy(tctx, t) } if err != nil { return err diff --git a/internal/provider/adc/config.go b/internal/provider/adc/config.go index e20c46e4..2bf512d3 100644 --- a/internal/provider/adc/config.go +++ b/internal/provider/adc/config.go @@ -18,20 +18,22 @@ package adc import ( + "errors" + "fmt" "net" "slices" "strconv" "github.com/api7/gopkg/pkg/log" - "github.com/pkg/errors" "go.uber.org/zap" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" - v1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" ) func (d *adcClient) getConfigsForGatewayProxy(tctx *provider.TranslateContext, gatewayProxy *v1alpha1.GatewayProxy) (*adcConfig, error) { @@ -87,17 +89,17 @@ func (d *adcClient) getConfigsForGatewayProxy(tctx *provider.TranslateContext, g } _, ok := tctx.Services[namespacedName] if !ok { - return nil, errors.Errorf("no service found for service reference: %s", namespacedName) + return nil, fmt.Errorf("no service found for service reference: %s", namespacedName) } endpoint := tctx.EndpointSlices[namespacedName] if endpoint == nil { return nil, nil } - upstreamNodes, err := d.translator.TranslateBackendRef(tctx, v1.BackendRef{ - BackendObjectReference: v1.BackendObjectReference{ - Name: v1.ObjectName(provider.ControlPlane.Service.Name), - Namespace: (*v1.Namespace)(&gatewayProxy.Namespace), - Port: ptr.To(v1.PortNumber(provider.ControlPlane.Service.Port)), + upstreamNodes, err := d.translator.TranslateBackendRef(tctx, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name), + Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace), + Port: ptr.To(gatewayv1.PortNumber(provider.ControlPlane.Service.Port)), }, }) if err != nil { @@ -167,6 +169,32 @@ func (d *adcClient) updateConfigs(rk types.NamespacedNameKind, tctx *provider.Tr return nil } +// updateConfigForGatewayProxy update config for all referrers of the GatewayProxy +func (d *adcClient) updateConfigForGatewayProxy(tctx *provider.TranslateContext, gp *v1alpha1.GatewayProxy) error { + d.Lock() + defer d.Unlock() + + config, err := d.getConfigsForGatewayProxy(tctx, gp) + if err != nil { + return err + } + + referrers := tctx.GatewayProxyReferrers[utils.NamespacedName(gp)] + + if config == nil { + for _, ref := range referrers { + delete(d.configs, ref) + } + return nil + } + + for _, ref := range referrers { + d.configs[ref] = *config + } + + return nil +} + func (d *adcClient) findConfigsToDelete(oldParentRefs, newParentRefs []types.NamespacedNameKind) []adcConfig { var deleteConfigs []adcConfig for _, parentRef := range oldParentRefs { diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 49e5222d..b06899fb 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -55,7 +55,9 @@ type TranslateContext struct { Upstreams map[k8stypes.NamespacedName]*apiv2.ApisixUpstream GatewayProxies map[types.NamespacedNameKind]v1alpha1.GatewayProxy ResourceParentRefs map[types.NamespacedNameKind][]types.NamespacedNameKind - HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy + // GatewayProxyReferrers key is GatewayProxy, value is a list of resources that reference this GatewayProxy + GatewayProxyReferrers map[k8stypes.NamespacedName][]types.NamespacedNameKind + HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy StatusUpdaters []status.Update } @@ -72,5 +74,6 @@ func NewDefaultTranslateContext(ctx context.Context) *TranslateContext { Upstreams: make(map[k8stypes.NamespacedName]*apiv2.ApisixUpstream), GatewayProxies: make(map[types.NamespacedNameKind]v1alpha1.GatewayProxy), ResourceParentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind), + GatewayProxyReferrers: make(map[k8stypes.NamespacedName][]types.NamespacedNameKind), } } diff --git a/test/e2e/crds/gatewayproxy.go b/test/e2e/crds/gatewayproxy.go new file mode 100644 index 00000000..57098df4 --- /dev/null +++ b/test/e2e/crds/gatewayproxy.go @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gatewayapi + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/gruntwork-io/terratest/modules/k8s" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/ptr" + + "github.com/apache/apisix-ingress-controller/test/e2e/framework" + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = Describe("Test GatewayProxy", Label("apisix.apache.org", "v1alpha1", "gatewayproxy"), func() { + var ( + s = scaffold.NewDefaultScaffold() + err error + ) + + const gatewayProxySpec = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: GatewayProxy +metadata: + name: apisix-proxy-config +spec: + provider: + type: ControlPlane + controlPlane: + service: + name: %s + port: 9180 + auth: + type: AdminKey + adminKey: + value: "%s" + plugins: + - name: response-rewrite + enabled: true + config: + headers: + "X-Pod-Hostname": "$hostname" +` + const gatewayClassSpec = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + name: %s +spec: + controllerName: %s +` + const gatewaySpec = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: apisix +spec: + gatewayClassName: %s + listeners: + - name: http1 + protocol: HTTP + port: 80 + infrastructure: + parametersRef: + group: apisix.apache.org + kind: GatewayProxy + name: apisix-proxy-config +` + const httpRouteSpec = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin +spec: + parentRefs: + - name: apisix + hostnames: + - "httpbin.org" + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + BeforeEach(func() { + By("create GatewayProxy") + err = s.CreateResourceFromString(fmt.Sprintf(gatewayProxySpec, framework.ProviderType, s.AdminKey())) + Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") + time.Sleep(time.Second) + + By("create GatewayClass") + gatewayClassName := fmt.Sprintf("apisix-%d", time.Now().Unix()) + err = s.CreateResourceFromString(fmt.Sprintf(gatewayClassSpec, gatewayClassName, s.GetControllerName())) + Expect(err).NotTo(HaveOccurred(), "creating GatewayClass") + time.Sleep(time.Second) + + By("create Gateway") + err = s.CreateResourceFromStringWithNamespace(fmt.Sprintf(gatewaySpec, gatewayClassName), s.Namespace()) + Expect(err).NotTo(HaveOccurred(), "creating Gateway") + time.Sleep(time.Second) + + By("create HTTPRoute") + s.ApplyHTTPRoute(types.NamespacedName{Namespace: s.Namespace(), Name: "httpbin"}, httpRouteSpec) + + Eventually(func() int { + return s.NewAPISIXClient().GET("/get").WithHost("httpbin.org").Expect().Raw().StatusCode + }).WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) + }) + + Context("Test GatewayProxy update configs", func() { + It("scaling apisix pods to test that the controller watches endpoints", func() { + By("scale apisix to replicas 2") + s.Deployer.DeployDataplane(scaffold.DeployDataplaneOptions{ + Replicas: ptr.To(2), + }) + + By("check pod ready") + err = wait.PollUntilContextTimeout(context.Background(), time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + pods := s.GetPods(s.Namespace(), "app.kubernetes.io/name=apisix") + if len(pods) != 2 { + return false, nil + } + for _, pod := range pods { + if pod.Status.PodIP == "" { + return false, nil + } + } + return true, nil + }) + Expect(err).NotTo(HaveOccurred(), "check pods ready") + + By("request every pod to check configuration effect") + pods := s.GetPods(s.Namespace(), "app.kubernetes.io/name=apisix") + for i, pod := range pods { + s.Logf("pod name: %s", pod.GetName()) + tunnel := k8s.NewTunnel(s.KubeOpts(), k8s.ResourceTypePod, pod.GetName(), 9080+i, 9080) + err := tunnel.ForwardPortE(s.GinkgoT) + Expect(err).NotTo(HaveOccurred(), "forward pod: %s", pod.Name) + + err = wait.PollUntilContextTimeout(context.Background(), time.Second, 30*time.Second, true, func(ctx context.Context) (done bool, err error) { + resp := scaffold.NewClient("http", tunnel.Endpoint()). + GET("/get").WithHost("httpbin.org").Expect().Raw() + return resp.StatusCode == http.StatusOK && resp.Header.Get("X-Pod-Hostname") == pod.GetName(), nil + }) + Expect(err).NotTo(HaveOccurred(), "request the pod: %s", pod.GetName()) + + tunnel.Close() + } + }) + }) +}) diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index ec97601e..4848b62d 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -195,18 +195,24 @@ func (f *Framework) Scale(name string, replicas int32) { UpdateScale(context.Background(), name, scale, metav1.UpdateOptions{}) f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("scale deployment %s to %v failed", name, replicas)) + // FIXME: The service name and the deployment name may not be the same err = f.ensureService(name, _namespace, int(replicas)) f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("ensure service %s/%s has %v endpoints failed", _namespace, name, replicas)) } func (f *Framework) GetPodIP(selector string) string { - podList, err := f.clientset.CoreV1().Pods(_namespace).List(f.Context, metav1.ListOptions{ + pods := f.GetPods("", selector) + return pods[0].Status.PodIP +} + +func (f *Framework) GetPods(namespace, selector string) []corev1.Pod { + podList, err := f.clientset.CoreV1().Pods(cmp.Or(namespace, _namespace)).List(f.Context, metav1.ListOptions{ LabelSelector: selector, }) f.GomegaT.Expect(err).ShouldNot(HaveOccurred()) f.GomegaT.Expect(podList.Items).ShouldNot(BeEmpty()) - return podList.Items[0].Status.PodIP + return podList.Items } //nolint:unused diff --git a/test/e2e/framework/manifests/apisix.yaml b/test/e2e/framework/manifests/apisix.yaml index 61d19763..affa4bfb 100644 --- a/test/e2e/framework/manifests/apisix.yaml +++ b/test/e2e/framework/manifests/apisix.yaml @@ -50,7 +50,7 @@ metadata: labels: app.kubernetes.io/name: apisix spec: - replicas: 1 + replicas: {{ default 1 .Replicas }} selector: matchLabels: app.kubernetes.io/name: apisix diff --git a/test/e2e/scaffold/apisix_deployer.go b/test/e2e/scaffold/apisix_deployer.go index 6d28b732..a0e2d531 100644 --- a/test/e2e/scaffold/apisix_deployer.go +++ b/test/e2e/scaffold/apisix_deployer.go @@ -28,6 +28,7 @@ import ( . "github.com/onsi/gomega" //nolint:staticcheck corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" "github.com/apache/apisix-ingress-controller/internal/provider/adc" "github.com/apache/apisix-ingress-controller/pkg/utils" @@ -44,6 +45,7 @@ type APISIXDeployOptions struct { ServiceHTTPSPort int ConfigProvider string + Replicas *int } type APISIXDeployer struct { @@ -147,6 +149,7 @@ func (s *APISIXDeployer) DeployDataplane(deployOpts DeployDataplaneOptions) { AdminKey: s.opts.APISIXAdminAPIKey, ServiceHTTPPort: 9080, ServiceHTTPSPort: 9443, + Replicas: ptr.To(1), } if deployOpts.Namespace != "" { @@ -161,6 +164,19 @@ func (s *APISIXDeployer) DeployDataplane(deployOpts DeployDataplaneOptions) { if deployOpts.ServiceHTTPSPort != 0 { opts.ServiceHTTPSPort = deployOpts.ServiceHTTPSPort } + if deployOpts.Replicas != nil { + opts.Replicas = deployOpts.Replicas + } + + for _, tunnel := range []*k8s.Tunnel{ + s.adminTunnel, + s.apisixHttpTunnel, + s.apisixHttpsTunnel, + } { + if tunnel != nil { + tunnel.Close() + } + } svc := s.deployDataplane(&opts) s.dataplaneService = svc @@ -286,7 +302,10 @@ func (s *APISIXDeployer) createAdminTunnel(svc *corev1.Service) (*k8s.Tunnel, er if err := adminTunnel.ForwardPortE(s.t); err != nil { return nil, err } - s.addFinalizers(adminTunnel.Close) + s.addFinalizers(func() { + adminTunnel.Close() + s.adminTunnel = nil + }) return adminTunnel, nil } diff --git a/test/e2e/scaffold/deployer.go b/test/e2e/scaffold/deployer.go index 2f4d02b4..cfbe4043 100644 --- a/test/e2e/scaffold/deployer.go +++ b/test/e2e/scaffold/deployer.go @@ -42,4 +42,5 @@ type DeployDataplaneOptions struct { SkipCreateTunnels bool ServiceHTTPPort int ServiceHTTPSPort int + Replicas *int } diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index 35c30f3b..f2f97a2d 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -313,13 +313,19 @@ func (s *Scaffold) createDataplaneTunnels( if err := httpTunnel.ForwardPortE(s.t); err != nil { return nil, nil, err } - s.addFinalizers(httpTunnel.Close) + s.addFinalizers(func() { + httpTunnel.Close() + s.apisixHttpTunnel = nil + }) if err := httpsTunnel.ForwardPortE(s.t); err != nil { httpTunnel.Close() return nil, nil, err } - s.addFinalizers(httpsTunnel.Close) + s.addFinalizers(func() { + httpsTunnel.Close() + s.apisixHttpsTunnel = nil + }) return httpTunnel, httpsTunnel, nil } @@ -406,3 +412,26 @@ func (s *Scaffold) GetGatewayHTTPSEndpoint(identifier string) (string, error) { func (s *Scaffold) GetDataplaneService() *corev1.Service { return s.dataplaneService } + +func (s *Scaffold) KubeOpts() *k8s.KubectlOptions { + return s.kubectlOptions +} + +func NewClient(scheme, host string) *httpexpect.Expect { + u := url.URL{ + Scheme: scheme, + Host: host, + } + return httpexpect.WithConfig(httpexpect.Config{ + BaseURL: u.String(), + Client: &http.Client{ + Transport: &http.Transport{}, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + }, + Reporter: httpexpect.NewAssertReporter( + httpexpect.NewAssertReporter(GinkgoT()), + ), + }) +}