Copilot commented on code in PR #2489: URL: https://github.com/apache/apisix-ingress-controller/pull/2489#discussion_r2224633578
########## internal/provider/adc/adc.go: ########## @@ -296,6 +300,8 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error { } func (d *adcClient) Start(ctx context.Context) error { + d.readier.WaitReady(ctx, 5*time.Minute) Review Comment: The timeout duration (5 minutes) is hardcoded. Consider making this configurable through the Options struct or defining it as a constant to improve maintainability. ```suggestion timeout := d.WaitReadyTimeout if timeout == 0 { timeout = 5 * time.Minute } d.readier.WaitReady(ctx, timeout) ``` ########## internal/manager/readiness/manager.go: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readiness + +import ( + "context" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + k8stypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + types "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/api7/gopkg/pkg/log" +) + +// Filter defines an interface to match unstructured Kubernetes objects. +type Filter interface { + Match(obj *unstructured.Unstructured) bool +} + +// GVKFilter is a functional implementation of Filter using a function type. +type GVKFilter func(obj *unstructured.Unstructured) bool + +func (f GVKFilter) Match(obj *unstructured.Unstructured) bool { + return f(obj) +} + +// GVKConfig defines a set of GVKs and an optional filter to match the objects. +type GVKConfig struct { + GVKs []schema.GroupVersionKind + Filter Filter +} + +// readinessManager prevents premature full sync to the data plane on controller startup. +// +// Background: +// On startup, the controller watches CRDs and periodically performs full sync to the data plane. +// If a sync occurs before all resources have been reconciled, it may push incomplete data, +// causing traffic disruption. +// +// This manager tracks whether all relevant resources have been processed at least once. +// It is used to delay full sync until initial reconciliation is complete. +type ReadinessManager interface { + RegisterGVK(configs ...GVKConfig) + Start(ctx context.Context) error + IsReady() bool + WaitReady(ctx context.Context, timeout time.Duration) bool + Done(obj client.Object, namespacedName k8stypes.NamespacedName) +} + +type readinessManager struct { + client client.Client + configs []GVKConfig + state map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{} + mu sync.RWMutex + startOnce sync.Once + started chan struct{} + done chan struct{} + + isReady bool +} + +// ReadinessManager tracks readiness of specific resources across the cluster. +func NewReadinessManager(client client.Client) ReadinessManager { + return &readinessManager{ + client: client, + state: make(map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{}), + started: make(chan struct{}), + done: make(chan struct{}), + } +} + +// RegisterGVK registers one or more GVKConfig objects for readiness tracking. +func (r *readinessManager) RegisterGVK(configs ...GVKConfig) { + r.mu.Lock() + defer r.mu.Unlock() + r.configs = append(r.configs, configs...) +} + +// Start initializes the readiness state from the Kubernetes API. +// Should be called only after informer cache has synced. +func (r *readinessManager) Start(ctx context.Context) error { + var err error + r.startOnce.Do(func() { + for _, cfg := range r.configs { + for _, gvk := range cfg.GVKs { + uList := &unstructured.UnstructuredList{} + uList.SetGroupVersionKind(gvk) + if listErr := r.client.List(ctx, uList); listErr != nil { + err = fmt.Errorf("list %s failed: %w", gvk.String(), listErr) + return + } + var expected []k8stypes.NamespacedName + for _, item := range uList.Items { + if cfg.Filter != nil && !cfg.Filter.Match(&item) { + continue + } + expected = append(expected, k8stypes.NamespacedName{ + Namespace: item.GetNamespace(), + Name: item.GetName(), + }) + } + if len(expected) > 0 { + log.Warnw("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) Review Comment: This log message uses WARN level for normal operational information. Consider using INFO level instead, as registering readiness state is expected behavior during startup, not a warning condition. ```suggestion log.Infow("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) ``` ########## internal/manager/controllers.go: ########## @@ -176,3 +194,87 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro }, }, nil } + +func registerReadinessGVK(c client.Client, readier readiness.ReadinessManager) { + readier.RegisterGVK([]readiness.GVKConfig{ + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&gatewayv1.HTTPRoute{}), + }, + }, + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&netv1.Ingress{}), + types.GvkOf(&apiv2.ApisixRoute{}), + types.GvkOf(&apiv2.ApisixGlobalRule{}), + types.GvkOf(&apiv2.ApisixPluginConfig{}), + types.GvkOf(&apiv2.ApisixTls{}), + types.GvkOf(&apiv2.ApisixConsumer{}), + }, + Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { + icName, _, _ := unstructured.NestedString(obj.Object, "spec", "ingressClassName") + if icName == "" { + insList := &netv1.IngressClassList{} + if err := c.List(context.Background(), insList, client.MatchingFields{ + indexer.IngressClass: config.GetControllerName(), + }); err != nil { + return false + } + for _, ic := range insList.Items { + if ic.Annotations[types.DefaultIngressClassAnnotation] == "true" { + return true + } + } + return false + } else { + var ingressClass netv1.IngressClass + if err := c.Get(context.Background(), client.ObjectKey{ + Name: icName, + }, &ingressClass); err != nil { + return false + } + return true + } Review Comment: The filter logic contains duplicated validation patterns. Consider extracting the IngressClass validation logic into a separate helper function to reduce code duplication and improve maintainability. ```suggestion return isIngressClassValid(c, icName) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org