This is an automated email from the ASF dual-hosted git repository. alinsran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push: new 66c2b0ac fix: full sync during restart results in loss of dataplane traffic (#2489) 66c2b0ac is described below commit 66c2b0acf14fc13f461e5f262753bdc0598f5d1a Author: AlinsRan <alins...@apache.org> AuthorDate: Fri Jul 25 09:05:11 2025 +0800 fix: full sync during restart results in loss of dataplane traffic (#2489) --- internal/controller/apisixconsumer_controller.go | 3 + internal/controller/apisixglobalrule_controller.go | 4 + internal/controller/apisixroute_controller.go | 3 + internal/controller/apisixtls_controller.go | 3 + internal/controller/consumer_controller.go | 31 +--- internal/controller/httproute_controller.go | 3 + internal/controller/indexer/indexer.go | 22 ++- internal/controller/ingress_controller.go | 3 + internal/controller/status/updater.go | 4 + internal/controller/utils.go | 35 ++++ internal/manager/controllers.go | 57 +++++- internal/manager/readiness/manager.go | 196 +++++++++++++++++++++ internal/manager/run.go | 12 +- internal/provider/adc/adc.go | 12 +- internal/provider/provider.go | 1 + internal/types/k8s.go | 86 ++++++++- test/e2e/crds/v1alpha1/consumer.go | 86 +++++++++ test/e2e/crds/v2/route.go | 107 +++++++++++ test/e2e/gatewayapi/httproute.go | 81 +++++++++ 19 files changed, 708 insertions(+), 41 deletions(-) diff --git a/internal/controller/apisixconsumer_controller.go b/internal/controller/apisixconsumer_controller.go index eb9b3c73..f9908b25 100644 --- a/internal/controller/apisixconsumer_controller.go +++ b/internal/controller/apisixconsumer_controller.go @@ -39,6 +39,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -51,10 +52,12 @@ type ApisixConsumerReconciler struct { Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // Reconcile FIXME: implement the reconcile logic (For now, it dose nothing other than directly accepting) func (r *ApisixConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixConsumer{}, req.NamespacedName) r.Log.Info("reconcile", "request", req.NamespacedName) ac := &apiv2.ApisixConsumer{} diff --git a/internal/controller/apisixglobalrule_controller.go b/internal/controller/apisixglobalrule_controller.go index fc42b1fb..9431df67 100644 --- a/internal/controller/apisixglobalrule_controller.go +++ b/internal/controller/apisixglobalrule_controller.go @@ -38,6 +38,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -49,10 +50,13 @@ type ApisixGlobalRuleReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + + Readier readiness.ReadinessManager } // Reconcile implements the reconciliation logic for ApisixGlobalRule func (r *ApisixGlobalRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixGlobalRule{}, req.NamespacedName) var globalRule apiv2.ApisixGlobalRule if err := r.Get(ctx, req.NamespacedName, &globalRule); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index a07c6e76..c4dde0d5 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -44,6 +44,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" @@ -57,6 +58,7 @@ type ApisixRouteReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -97,6 +99,7 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixRoute{}, req.NamespacedName) var ar apiv2.ApisixRoute if err := r.Get(ctx, req.NamespacedName, &ar); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixtls_controller.go b/internal/controller/apisixtls_controller.go index 5cde26f6..7410fdb0 100644 --- a/internal/controller/apisixtls_controller.go +++ b/internal/controller/apisixtls_controller.go @@ -39,6 +39,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -50,6 +51,7 @@ type ApisixTlsReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -85,6 +87,7 @@ func (r *ApisixTlsReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile implements the reconciliation logic for ApisixTls func (r *ApisixTlsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixTls{}, req.NamespacedName) var tls apiv2.ApisixTls if err := r.Get(ctx, req.NamespacedName, &tls); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index fd51f87a..7869ad6b 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -38,6 +38,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -51,6 +52,7 @@ type ConsumerReconciler struct { //nolint:revive Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -181,6 +183,7 @@ func (r *ConsumerReconciler) listConsumersForGatewayProxy(ctx context.Context, o } func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&v1alpha1.Consumer{}, req.NamespacedName) consumer := new(v1alpha1.Consumer) if err := r.Get(ctx, req.NamespacedName, consumer); err != nil { if client.IgnoreNotFound(err) == nil { @@ -303,31 +306,5 @@ func (r *ConsumerReconciler) checkGatewayRef(object client.Object) bool { if !ok { return false } - if consumer.Spec.GatewayRef.Name == "" { - return false - } - if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway { - return false - } - if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { - return false - } - ns := consumer.GetNamespace() - if consumer.Spec.GatewayRef.Namespace != nil { - ns = *consumer.Spec.GatewayRef.Namespace - } - gateway := &gatewayv1.Gateway{} - if err := r.Get(context.Background(), client.ObjectKey{ - Name: consumer.Spec.GatewayRef.Name, - Namespace: ns, - }, gateway); err != nil { - r.Log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name) - return false - } - gatewayClass := &gatewayv1.GatewayClass{} - if err := r.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { - r.Log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName) - return false - } - return matchesController(string(gatewayClass.Spec.ControllerName)) + return MatchConsumerGatewayRef(context.Background(), r.Client, r.Log, consumer) } diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 50b6e3b2..b6f39287 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -48,6 +48,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" @@ -65,6 +66,7 @@ type HTTPRouteReconciler struct { //nolint:revive genericEvent chan event.GenericEvent Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -130,6 +132,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.HTTPRoute{}, req.NamespacedName) hr := new(gatewayv1.HTTPRoute) if err := r.Get(ctx, req.NamespacedName, hr); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 2f1b7574..e621ac10 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -47,6 +47,7 @@ const ( GatewayClassIndexRef = "gatewayClassRef" ApisixUpstreamRef = "apisixUpstreamRef" PluginConfigIndexRef = "pluginConfigRefs" + ControllerName = "controllerName" ) func SetupIndexer(mgr ctrl.Manager) error { @@ -59,11 +60,11 @@ func SetupIndexer(mgr ctrl.Manager) error { setupIngressClassIndexer, setupGatewayProxyIndexer, setupGatewaySecretIndex, - setupGatewayClassIndexer, setupApisixRouteIndexer, setupApisixPluginConfigIndexer, setupApisixTlsIndexer, setupApisixConsumerIndexer, + setupGatewayClassIndexer, } { if err := setup(mgr); err != nil { return err @@ -81,6 +82,17 @@ func setupGatewayIndexer(mgr ctrl.Manager) error { ); err != nil { return err } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1.Gateway{}, + GatewayClassIndexRef, + func(obj client.Object) (requests []string) { + return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)} + }, + ); err != nil { + return err + } return nil } @@ -273,10 +285,10 @@ func setupGatewaySecretIndex(mgr ctrl.Manager) error { func setupGatewayClassIndexer(mgr ctrl.Manager) error { return mgr.GetFieldIndexer().IndexField( context.Background(), - &gatewayv1.Gateway{}, - GatewayClassIndexRef, - func(obj client.Object) (requests []string) { - return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)} + &gatewayv1.GatewayClass{}, + ControllerName, + func(obj client.Object) []string { + return []string{string(obj.(*gatewayv1.GatewayClass).Spec.ControllerName)} }, ) } diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 86c4d7ff..25b52990 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -45,6 +45,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -59,6 +60,7 @@ type IngressReconciler struct { //nolint:revive genericEvent chan event.GenericEvent Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -117,6 +119,7 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile handles the reconciliation of Ingress resources func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&networkingv1.Ingress{}, req.NamespacedName) ingress := new(networkingv1.Ingress) if err := r.Get(ctx, req.NamespacedName, ingress); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/status/updater.go b/internal/controller/status/updater.go index 019c1846..d00aec76 100644 --- a/internal/controller/status/updater.go +++ b/internal/controller/status/updater.go @@ -150,6 +150,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error { } } +func (u *UpdateHandler) NeedsLeaderElection() bool { + return true +} + func (u *UpdateHandler) Writer() Updater { return &UpdateWriter{ updateChannel: u.updateChannel, diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 81a44843..0917cf81 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -35,6 +35,7 @@ import ( corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -1448,3 +1449,37 @@ func TypePredicate[T client.Object]() func(obj client.Object) bool { return ok } } + +func MatchConsumerGatewayRef(ctx context.Context, c client.Client, log logr.Logger, consumer *v1alpha1.Consumer) bool { + if consumer.Spec.GatewayRef.Name == "" { + return false + } + if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway { + return false + } + if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { + return false + } + ns := consumer.GetNamespace() + if consumer.Spec.GatewayRef.Namespace != nil { + ns = *consumer.Spec.GatewayRef.Namespace + } + gateway := &gatewayv1.Gateway{} + if err := c.Get(context.Background(), client.ObjectKey{ + Name: consumer.Spec.GatewayRef.Name, + Namespace: ns, + }, gateway); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name) + } + return false + } + gatewayClass := &gatewayv1.GatewayClass{} + if err := c.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName) + } + return false + } + return matchesController(string(gatewayClass.Spec.ControllerName)) +} diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index 2af8cd6a..0688055d 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -20,13 +20,23 @@ package manager import ( "context" + netv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" + types "github.com/apache/apisix-ingress-controller/internal/types" ) // K8s @@ -83,7 +93,7 @@ type Controller interface { SetupWithManager(mgr manager.Manager) error } -func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider, updater status.Updater) ([]Controller, error) { +func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider, updater status.Updater, readier readiness.ReadinessManager) ([]Controller, error) { if err := indexer.SetupIndexer(mgr); err != nil { return nil, err } @@ -107,6 +117,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("HTTPRoute"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.IngressReconciler{ Client: mgr.GetClient(), @@ -114,6 +125,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Ingress"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ConsumerReconciler{ Client: mgr.GetClient(), @@ -121,6 +133,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Consumer"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.IngressClassReconciler{ Client: mgr.GetClient(), @@ -134,6 +147,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixGlobalRule"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixRouteReconciler{ Client: mgr.GetClient(), @@ -141,6 +155,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixRoute"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixConsumerReconciler{ Client: mgr.GetClient(), @@ -148,6 +163,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixConsumer"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixPluginConfigReconciler{ Client: mgr.GetClient(), @@ -161,6 +177,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixTls"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixUpstreamReconciler{ Client: mgr.GetClient(), @@ -176,3 +193,41 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro }, }, nil } + +func registerReadinessGVK(c client.Client, readier readiness.ReadinessManager) { + log := ctrl.LoggerFrom(context.Background()).WithName("readiness") + readier.RegisterGVK([]readiness.GVKConfig{ + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&gatewayv1.HTTPRoute{}), + }, + }, + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&netv1.Ingress{}), + types.GvkOf(&apiv2.ApisixRoute{}), + types.GvkOf(&apiv2.ApisixGlobalRule{}), + types.GvkOf(&apiv2.ApisixPluginConfig{}), + types.GvkOf(&apiv2.ApisixTls{}), + types.GvkOf(&apiv2.ApisixConsumer{}), + }, + Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { + icName, _, _ := unstructured.NestedString(obj.Object, "spec", "ingressClassName") + ingressClass, _ := controller.GetIngressClass(context.Background(), c, log, icName) + return ingressClass != nil + }), + }, + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&v1alpha1.Consumer{}), + }, + Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { + consumer := &v1alpha1.Consumer{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, consumer); err != nil { + return false + } + return controller.MatchConsumerGatewayRef(context.Background(), c, log, consumer) + }), + }, + }...) +} diff --git a/internal/manager/readiness/manager.go b/internal/manager/readiness/manager.go new file mode 100644 index 00000000..7ba4b83b --- /dev/null +++ b/internal/manager/readiness/manager.go @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readiness + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/api7/gopkg/pkg/log" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + k8stypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + types "github.com/apache/apisix-ingress-controller/internal/types" +) + +// Filter defines an interface to match unstructured Kubernetes objects. +type Filter interface { + Match(obj *unstructured.Unstructured) bool +} + +// GVKFilter is a functional implementation of Filter using a function type. +type GVKFilter func(obj *unstructured.Unstructured) bool + +func (f GVKFilter) Match(obj *unstructured.Unstructured) bool { + return f(obj) +} + +// GVKConfig defines a set of GVKs and an optional filter to match the objects. +type GVKConfig struct { + GVKs []schema.GroupVersionKind + Filter Filter +} + +// readinessManager prevents premature full sync to the data plane on controller startup. +// +// Background: +// On startup, the controller watches CRDs and periodically performs full sync to the data plane. +// If a sync occurs before all resources have been reconciled, it may push incomplete data, +// causing traffic disruption. +// +// This manager tracks whether all relevant resources have been processed at least once. +// It is used to delay full sync until initial reconciliation is complete. +type ReadinessManager interface { + RegisterGVK(configs ...GVKConfig) + Start(ctx context.Context) error + IsReady() bool + WaitReady(ctx context.Context, timeout time.Duration) bool + Done(obj client.Object, namespacedName k8stypes.NamespacedName) +} + +type readinessManager struct { + client client.Client + configs []GVKConfig + state map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{} + mu sync.RWMutex + startOnce sync.Once + started chan struct{} + done chan struct{} + + isReady atomic.Bool +} + +// ReadinessManager tracks readiness of specific resources across the cluster. +func NewReadinessManager(client client.Client) ReadinessManager { + return &readinessManager{ + client: client, + state: make(map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{}), + started: make(chan struct{}), + done: make(chan struct{}), + isReady: atomic.Bool{}, + } +} + +// RegisterGVK registers one or more GVKConfig objects for readiness tracking. +func (r *readinessManager) RegisterGVK(configs ...GVKConfig) { + r.mu.Lock() + defer r.mu.Unlock() + r.configs = append(r.configs, configs...) +} + +// Start initializes the readiness state from the Kubernetes API. +// Should be called only after informer cache has synced. +func (r *readinessManager) Start(ctx context.Context) error { + var err error + r.startOnce.Do(func() { + for _, cfg := range r.configs { + for _, gvk := range cfg.GVKs { + uList := &unstructured.UnstructuredList{} + uList.SetGroupVersionKind(gvk) + if listErr := r.client.List(ctx, uList); listErr != nil { + err = fmt.Errorf("list %s failed: %w", gvk.String(), listErr) + return + } + var expected []k8stypes.NamespacedName + for _, item := range uList.Items { + if cfg.Filter != nil && !cfg.Filter.Match(&item) { + continue + } + expected = append(expected, k8stypes.NamespacedName{ + Namespace: item.GetNamespace(), + Name: item.GetName(), + }) + } + if len(expected) > 0 { + log.Debugw("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) + r.registerState(gvk, expected) + } + } + } + close(r.started) + if len(r.state) == 0 && !r.isReady.Load() { + r.isReady.Store(true) + close(r.done) + } + }) + return err +} + +func (r *readinessManager) registerState(gvk schema.GroupVersionKind, list []k8stypes.NamespacedName) { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.state[gvk]; !ok { + r.state[gvk] = make(map[k8stypes.NamespacedName]struct{}) + } + for _, name := range list { + r.state[gvk][name] = struct{}{} + } +} + +// Done marks the resource as ready by removing it from the pending state. +func (r *readinessManager) Done(obj client.Object, nn k8stypes.NamespacedName) { + if r.IsReady() { + return + } + r.mu.Lock() + defer r.mu.Unlock() + gvk := types.GvkOf(obj) + if _, ok := r.state[gvk]; !ok { + return + } + delete(r.state[gvk], nn) + if len(r.state[gvk]) == 0 { + delete(r.state, gvk) + } + if len(r.state) == 0 && !r.isReady.Load() { + r.isReady.Store(true) + close(r.done) + } +} + +func (r *readinessManager) IsReady() bool { + return r.isReady.Load() +} + +// WaitReady blocks until readiness is achieved, a timeout occurs, or context is cancelled. +func (r *readinessManager) WaitReady(ctx context.Context, timeout time.Duration) bool { + if r.IsReady() { + return true + } + + select { + case <-r.started: + case <-ctx.Done(): + return false + } + + select { + case <-ctx.Done(): + return false + case <-time.After(timeout): + return true + case <-r.done: + return true + } +} diff --git a/internal/manager/run.go b/internal/manager/run.go index a27b2732..b75b383d 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -41,6 +41,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller" "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider/adc" _ "github.com/apache/apisix-ingress-controller/pkg/metrics" ) @@ -150,13 +151,20 @@ func Run(ctx context.Context, logger logr.Logger) error { return err } + readier := readiness.NewReadinessManager(mgr.GetClient()) + registerReadinessGVK(mgr.GetClient(), readier) + + if err := mgr.Add(readier); err != nil { + setupLog.Error(err, "unable to add readiness manager") + } + updater := status.NewStatusUpdateHandler(ctrl.LoggerFrom(ctx).WithName("status").WithName("updater"), mgr.GetClient()) if err := mgr.Add(updater); err != nil { setupLog.Error(err, "unable to add status updater") return err } - provider, err := adc.New(updater.Writer(), &adc.Options{ + provider, err := adc.New(updater.Writer(), readier, &adc.Options{ SyncTimeout: config.ControllerConfig.ExecADCTimeout.Duration, SyncPeriod: config.ControllerConfig.ProviderConfig.SyncPeriod.Duration, InitSyncDelay: config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration, @@ -184,7 +192,7 @@ func Run(ctx context.Context, logger logr.Logger) error { controller.SetEnableReferenceGrant(err == nil) setupLog.Info("setting up controllers") - controllers, err := setupControllers(ctx, mgr, provider, updater.Writer()) + controllers, err := setupControllers(ctx, mgr, provider, updater.Writer(), readier) if err != nil { setupLog.Error(err, "unable to set up controllers") return err diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index a109dcbb..5ff00761 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -38,6 +38,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/provider/adc/translator" "github.com/apache/apisix-ingress-controller/internal/types" @@ -93,6 +94,8 @@ type adcClient struct { updater status.Updater statusUpdateMap map[types.NamespacedNameKind][]string + readier readiness.ReadinessManager + syncCh chan struct{} } @@ -104,7 +107,7 @@ type Task struct { configs []adcConfig } -func New(updater status.Updater, opts ...Option) (provider.Provider, error) { +func New(updater status.Updater, readier readiness.ReadinessManager, opts ...Option) (provider.Provider, error) { o := Options{} o.ApplyOptions(opts) @@ -116,6 +119,7 @@ func New(updater status.Updater, opts ...Option) (provider.Provider, error) { store: NewStore(), executor: &DefaultADCExecutor{}, updater: updater, + readier: readier, syncCh: make(chan struct{}, 1), }, nil } @@ -302,6 +306,8 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error { } func (d *adcClient) Start(ctx context.Context) error { + d.readier.WaitReady(ctx, 5*time.Minute) + initalSyncDelay := d.InitSyncDelay if initalSyncDelay > 0 { time.AfterFunc(initalSyncDelay, func() { @@ -478,3 +484,7 @@ func (d *adcClient) handleADCExecutionErrors(statusesMap map[string]types.ADCExe statusUpdateMap := d.resolveADCExecutionErrors(statusesMap) d.handleStatusUpdate(statusUpdateMap) } + +func (d *adcClient) NeedLeaderElection() bool { + return true +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index b06899fb..d9b6cbfa 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -37,6 +37,7 @@ type Provider interface { Delete(context.Context, client.Object) error Sync(context.Context) error Start(context.Context) error + NeedLeaderElection() bool } type TranslateContext struct { diff --git a/internal/types/k8s.go b/internal/types/k8s.go index a115562e..d83158fe 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -18,11 +18,13 @@ package types import ( - "github.com/apache/apisix-ingress-controller/api/v1alpha1" - v2 "github.com/apache/apisix-ingress-controller/api/v2" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/networking/v1" + netv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/runtime/schema" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + v2 "github.com/apache/apisix-ingress-controller/api/v2" ) const DefaultIngressClassAnnotation = "ingressclass.kubernetes.io/is-default-class" @@ -56,9 +58,9 @@ func KindOf(obj any) string { return KindHTTPRoute case *gatewayv1.GatewayClass: return KindGatewayClass - case *v1.Ingress: + case *netv1.Ingress: return KindIngress - case *v1.IngressClass: + case *netv1.IngressClass: return KindIngressClass case *corev1.Secret: return KindSecret @@ -90,3 +92,77 @@ func KindOf(obj any) string { return "Unknown" } } + +func GvkOf(obj any) schema.GroupVersionKind { + kind := KindOf(obj) + switch obj.(type) { + case *gatewayv1.Gateway, *gatewayv1.HTTPRoute, *gatewayv1.GatewayClass: + return gatewayv1.SchemeGroupVersion.WithKind(kind) + case *netv1.Ingress, *netv1.IngressClass: + return netv1.SchemeGroupVersion.WithKind(kind) + case *corev1.Secret, *corev1.Service: + return corev1.SchemeGroupVersion.WithKind(kind) + case *v2.ApisixRoute: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixRoute, + } + case *v2.ApisixGlobalRule: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixGlobalRule, + } + case *v2.ApisixPluginConfig: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixPluginConfig, + } + case *v2.ApisixTls: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixTls, + } + case *v2.ApisixConsumer: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixConsumer, + } + case *v1alpha1.HTTPRoutePolicy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindHTTPRoutePolicy, + } + case *v1alpha1.BackendTrafficPolicy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindBackendTrafficPolicy, + } + case *v1alpha1.GatewayProxy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindGatewayProxy, + } + case *v1alpha1.Consumer: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindConsumer, + } + case *v1alpha1.PluginConfig: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindPluginConfig, + } + default: + return schema.GroupVersionKind{} + } +} diff --git a/test/e2e/crds/v1alpha1/consumer.go b/test/e2e/crds/v1alpha1/consumer.go index d2db2a5b..daa092cd 100644 --- a/test/e2e/crds/v1alpha1/consumer.go +++ b/test/e2e/crds/v1alpha1/consumer.go @@ -579,4 +579,90 @@ spec: }) }) }) + + Context("Test Consumer sync during startup", func() { + var consumer1 = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: Consumer +metadata: + name: consumer-sample +spec: + gatewayRef: + name: apisix + credentials: + - type: key-auth + name: key-auth-sample + config: + key: sample-key +` + var consumer2 = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: Consumer +metadata: + name: consumer-unused +spec: + gatewayRef: + name: apisix-non-existent + credentials: + - type: key-auth + name: key-auth-sample + config: + key: sample-key2 +` + + BeforeEach(func() { + s.ApplyDefaultGatewayResource(defaultGatewayProxy, defaultGatewayClass, defaultGateway, defaultHTTPRoute) + }) + + It("Should sync Consumer during startup", func() { + Expect(s.CreateResourceFromString(consumer2)).NotTo(HaveOccurred(), "creating unused consumer") + s.ResourceApplied("Consumer", "consumer-sample", consumer1, 1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key", + }, + Check: scaffold.WithExpectedStatus(200), + }) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key2", + }, + Check: scaffold.WithExpectedStatus(401), + }) + + By("restarting the controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key", + }, + Check: scaffold.WithExpectedStatus(200), + }) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key2", + }, + Check: scaffold.WithExpectedStatus(401), + }) + }) + }) }) diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go index ce1b9864..78d14636 100644 --- a/test/e2e/crds/v2/route.go +++ b/test/e2e/crds/v2/route.go @@ -650,4 +650,111 @@ spec: Expect(err).ShouldNot(HaveOccurred(), "check apisixupstream is referenced") }) }) + + Context("Test ApisixRoute sync during startup", func() { + const route = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + + const route2 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: route2 +spec: + ingressClassName: apisix-nonexistent + http: + - name: rule0 + match: + hosts: + - httpbin2 + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + const route3 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: route3 +spec: + http: + - name: rule0 + match: + hosts: + - httpbin3 + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + It("Should sync ApisixRoute during startup", func() { + By("apply ApisixRoute") + Expect(s.CreateResourceFromString(route2)).ShouldNot(HaveOccurred(), "apply ApisixRoute with nonexistent ingressClassName") + Expect(s.CreateResourceFromString(route3)).ShouldNot(HaveOccurred(), "apply ApisixRoute without ingressClassName") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, route) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin3", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + + By("restart controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin3", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + }) + }) }) diff --git a/test/e2e/gatewayapi/httproute.go b/test/e2e/gatewayapi/httproute.go index daba664e..f4012aa1 100644 --- a/test/e2e/gatewayapi/httproute.go +++ b/test/e2e/gatewayapi/httproute.go @@ -1900,4 +1900,85 @@ spec: } }) }) + + Context("Test HTTPRoute sync during startup", func() { + BeforeEach(beforeEachHTTP) + var route = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin +spec: + parentRefs: + - name: apisix + hostnames: + - httpbin + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + + var route2 = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin2 +spec: + parentRefs: + - name: apisix-nonexistent + hostnames: + - httpbin2 + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + It("Should sync ApisixRoute during startup", func() { + By("apply ApisixRoute") + Expect(s.CreateResourceFromString(route2)).ShouldNot(HaveOccurred(), "applying HTTPRoute with non-existent parent") + s.ResourceApplied("HTTPRoute", "httpbin", route, 1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + + By("restart controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + }) + + }) })