This is an automated email from the ASF dual-hosted git repository.
alinsran pushed a commit to branch v2.0.0
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/v2.0.0 by this push:
new 9d7e0183 feat: add synchronization status to CRD (#2460)
9d7e0183 is described below
commit 9d7e01831f5b1b1e97bdaa522ff8a5161ba19290
Author: AlinsRan <[email protected]>
AuthorDate: Mon Jul 7 14:37:01 2025 +0800
feat: add synchronization status to CRD (#2460)
---
api/adc/types.go | 9 +
internal/controller/apisixroute_controller.go | 37 +--
internal/controller/httproute_controller.go | 29 +--
internal/controller/utils.go | 34 +--
internal/controller/utils/utils.go | 71 ++++++
internal/manager/run.go | 2 +-
internal/provider/adc/adc.go | 29 ++-
internal/provider/adc/executor.go | 52 ++--
internal/provider/adc/status.go | 237 ++++++++++++++++++
internal/provider/adc/store.go | 93 ++++++-
internal/types/error.go | 94 +++++++
internal/types/{types.go => k8s.go} | 23 +-
internal/types/types.go | 11 +
internal/utils/k8s.go | 8 +
test/e2e/apisix/status.go | 342 ++++++++++++++++++++++++++
test/e2e/framework/manifests/ingress.yaml | 1 +
test/e2e/scaffold/apisix_deployer.go | 6 +
test/e2e/scaffold/deployer.go | 1 +
18 files changed, 980 insertions(+), 99 deletions(-)
diff --git a/api/adc/types.go b/api/adc/types.go
index d2ecba2d..0eb7d12a 100644
--- a/api/adc/types.go
+++ b/api/adc/types.go
@@ -77,6 +77,15 @@ const (
PassHostRewrite = "rewrite"
)
+const (
+ TypeRoute = "route"
+ TypeService = "service"
+ TypeConsumer = "consumer"
+ TypeSSL = "ssl"
+ TypeGlobalRule = "global_rule"
+ TypePluginMetadata = "plugin_metadata"
+)
+
type Object interface {
GetLabels() map[string]string
}
diff --git a/internal/controller/apisixroute_controller.go
b/internal/controller/apisixroute_controller.go
index bce27913..a07c6e76 100644
--- a/internal/controller/apisixroute_controller.go
+++ b/internal/controller/apisixroute_controller.go
@@ -32,7 +32,7 @@ import (
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
+ k8stypes "k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -45,6 +45,7 @@ import (
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/provider"
+ "github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
)
@@ -134,7 +135,7 @@ func (r *ApisixRouteReconciler) Reconcile(ctx
context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
if err = r.Provider.Update(ctx, tctx, &ar); err != nil {
- err = ReasonError{
+ err = types.ReasonError{
Reason: string(apiv2.ConditionReasonSyncFailed),
Message: err.Error(),
}
@@ -152,7 +153,7 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx
context.Context, tc *prov
for httpIndex, http := range in.Spec.HTTP {
// check rule names
if _, ok := rules[http.Name]; ok {
- return ReasonError{
+ return types.ReasonError{
Reason:
string(apiv2.ConditionReasonInvalidSpec),
Message: "duplicate route rule name",
}
@@ -178,7 +179,7 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx
context.Context, tc *prov
// check vars
if _, err := http.Match.NginxVars.ToVars(); err != nil {
- return ReasonError{
+ return types.ReasonError{
Reason:
string(apiv2.ConditionReasonInvalidSpec),
Message:
fmt.Sprintf(".spec.http[%d].match.exprs: %s", httpIndex, err.Error()),
}
@@ -186,7 +187,7 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx
context.Context, tc *prov
// validate remote address
if err := utils.ValidateRemoteAddrs(http.Match.RemoteAddrs);
err != nil {
- return ReasonError{
+ return types.ReasonError{
Reason:
string(apiv2.ConditionReasonInvalidSpec),
Message:
fmt.Sprintf(".spec.http[%d].match.remoteAddrs: %s", httpIndex, err.Error()),
}
@@ -220,7 +221,7 @@ func (r *ApisixRouteReconciler) validatePluginConfig(ctx
context.Context, tc *pr
pcNN = utils.NamespacedName(&pc)
)
if err := r.Get(ctx, pcNN, &pc); err != nil {
- return ReasonError{
+ return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get ApisixPluginConfig:
%s", pcNN),
}
@@ -230,13 +231,13 @@ func (r *ApisixRouteReconciler) validatePluginConfig(ctx
context.Context, tc *pr
if in.Spec.IngressClassName != pc.Spec.IngressClassName &&
pc.Spec.IngressClassName != "" {
var pcIC networkingv1.IngressClass
if err := r.Get(ctx, client.ObjectKey{Name:
pc.Spec.IngressClassName}, &pcIC); err != nil {
- return ReasonError{
+ return types.ReasonError{
Reason:
string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get
IngressClass %s for ApisixPluginConfig %s: %v", pc.Spec.IngressClassName, pcNN,
err),
}
}
if !matchesController(pcIC.Spec.Controller) {
- return ReasonError{
+ return types.ReasonError{
Reason:
string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("ApisixPluginConfig %s
references IngressClass %s with non-matching controller", pcNN,
pc.Spec.IngressClassName),
}
@@ -271,7 +272,7 @@ func (r *ApisixRouteReconciler) validateSecrets(ctx
context.Context, tc *provide
secretNN = utils.NamespacedName(&secret)
)
if err := r.Get(ctx, secretNN, &secret); err != nil {
- return ReasonError{
+ return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get Secret: %s",
secretNN),
}
@@ -282,18 +283,18 @@ func (r *ApisixRouteReconciler) validateSecrets(ctx
context.Context, tc *provide
}
func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc
*provider.TranslateContext, in *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP)
error {
- var backends = make(map[types.NamespacedName]struct{})
+ var backends = make(map[k8stypes.NamespacedName]struct{})
for _, backend := range http.Backends {
var (
au apiv2.ApisixUpstream
service corev1.Service
- serviceNN = types.NamespacedName{
+ serviceNN = k8stypes.NamespacedName{
Namespace: in.GetNamespace(),
Name: backend.ServiceName,
}
)
if _, ok := backends[serviceNN]; ok {
- return ReasonError{
+ return types.ReasonError{
Reason:
string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("duplicate backend
service: %s", serviceNN),
}
@@ -344,7 +345,7 @@ func (r *ApisixRouteReconciler) validateBackends(ctx
context.Context, tc *provid
discoveryv1.LabelServiceName: service.Name,
},
); err != nil {
- return ReasonError{
+ return types.ReasonError{
Reason:
string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to list endpoint
slices: %v", err),
}
@@ -366,7 +367,7 @@ func (r *ApisixRouteReconciler) validateUpstreams(ctx
context.Context, tc *provi
}
var (
ups apiv2.ApisixUpstream
- upsNN = types.NamespacedName{
+ upsNN = k8stypes.NamespacedName{
Namespace: ar.GetNamespace(),
Name: upstream.Name,
}
@@ -384,7 +385,7 @@ func (r *ApisixRouteReconciler) validateUpstreams(ctx
context.Context, tc *provi
if node.Type == apiv2.ExternalTypeService {
var (
service corev1.Service
- serviceNN =
types.NamespacedName{Namespace: ups.GetNamespace(), Name: node.Name}
+ serviceNN =
k8stypes.NamespacedName{Namespace: ups.GetNamespace(), Name: node.Name}
)
if err := r.Get(ctx, serviceNN, &service); err
!= nil {
r.Log.Error(err, "failed to get service
in ApisixUpstream", "ApisixUpstream", upsNN, "Service", serviceNN)
@@ -400,7 +401,7 @@ func (r *ApisixRouteReconciler) validateUpstreams(ctx
context.Context, tc *provi
if ups.Spec.TLSSecret != nil && ups.Spec.TLSSecret.Name != "" {
var (
secret corev1.Secret
- secretNN = types.NamespacedName{Namespace:
cmp.Or(ups.Spec.TLSSecret.Namespace, ar.GetNamespace()), Name:
ups.Spec.TLSSecret.Name}
+ secretNN = k8stypes.NamespacedName{Namespace:
cmp.Or(ups.Spec.TLSSecret.Namespace, ar.GetNamespace()), Name:
ups.Spec.TLSSecret.Name}
)
if err := r.Get(ctx, secretNN, &secret); err != nil {
r.Log.Error(err, "failed to get secret in
ApisixUpstream", "ApisixUpstream", upsNN, "Secret", secretNN)
@@ -578,7 +579,7 @@ func (r *ApisixRouteReconciler)
listApisixRoutesForPluginConfig(ctx context.Cont
return pkgutils.DedupComparable(requests)
}
-func (r *ApisixRouteReconciler) getSubsetLabels(tctx
*provider.TranslateContext, auNN types.NamespacedName, backend
apiv2.ApisixRouteHTTPBackend) map[string]string {
+func (r *ApisixRouteReconciler) getSubsetLabels(tctx
*provider.TranslateContext, auNN k8stypes.NamespacedName, backend
apiv2.ApisixRouteHTTPBackend) map[string]string {
if backend.Subset == "" {
return nil
}
@@ -621,7 +622,7 @@ func (r *ApisixRouteReconciler)
filterEndpointSliceByTargetPod(ctx context.Conte
var (
pod corev1.Pod
- podNN = types.NamespacedName{
+ podNN = k8stypes.NamespacedName{
Namespace: v.TargetRef.Namespace,
Name: v.TargetRef.Name,
}
diff --git a/internal/controller/httproute_controller.go
b/internal/controller/httproute_controller.go
index 43d878fd..ae869761 100644
--- a/internal/controller/httproute_controller.go
+++ b/internal/controller/httproute_controller.go
@@ -31,7 +31,7 @@ import (
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
+ k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -49,6 +49,7 @@ import (
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/provider"
+ "github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
)
@@ -186,7 +187,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx
context.Context, req ctrl.Request) (
var backendRefErr error
if err := r.processHTTPRoute(tctx, hr); err != nil {
// When encountering a backend reference error, it should not
affect the acceptance status
- if IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) {
+ if types.IsSomeReasonError(err,
gatewayv1.RouteReasonInvalidKind) {
backendRefErr = err
} else {
acceptStatus.status = false
@@ -340,10 +341,10 @@ func (r *HTTPRouteReconciler)
listHTTPRoutesForBackendTrafficPolicy(ctx context.
}
httprouteList = append(httprouteList, hrList.Items...)
}
- var namespacedNameMap = make(map[types.NamespacedName]struct{})
+ var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{})
requests := make([]reconcile.Request, 0, len(httprouteList))
for _, hr := range httprouteList {
- key := types.NamespacedName{
+ key := k8stypes.NamespacedName{
Namespace: hr.Namespace,
Name: hr.Name,
}
@@ -391,12 +392,12 @@ func (r *HTTPRouteReconciler)
listHTTPRouteByHTTPRoutePolicy(ctx context.Context
return nil
}
- var keys = make(map[types.NamespacedName]struct{})
+ var keys = make(map[k8stypes.NamespacedName]struct{})
for _, ref := range httpRoutePolicy.Spec.TargetRefs {
if ref.Kind != "HTTPRoute" {
continue
}
- key := types.NamespacedName{
+ key := k8stypes.NamespacedName{
Namespace: obj.GetNamespace(),
Name: string(ref.Name),
}
@@ -441,10 +442,10 @@ func (r *HTTPRouteReconciler)
listHTTPRouteForGenericEvent(ctx context.Context,
}
}
-func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx
*provider.TranslateContext, hrNN types.NamespacedName) error {
+func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx
*provider.TranslateContext, hrNN k8stypes.NamespacedName) error {
var terr error
for _, backend := range tctx.BackendRefs {
- targetNN := types.NamespacedName{
+ targetNN := k8stypes.NamespacedName{
Namespace: hrNN.Namespace,
Name: string(backend.Name),
}
@@ -453,7 +454,7 @@ func (r *HTTPRouteReconciler)
processHTTPRouteBackendRefs(tctx *provider.Transla
}
if backend.Kind != nil && *backend.Kind != "Service" {
- terr = newInvalidKindError(*backend.Kind)
+ terr = types.NewInvalidKindError(*backend.Kind)
continue
}
@@ -466,7 +467,7 @@ func (r *HTTPRouteReconciler)
processHTTPRouteBackendRefs(tctx *provider.Transla
if err := r.Get(tctx, targetNN, &service); err != nil {
terr = err
if client.IgnoreNotFound(err) == nil {
- terr = ReasonError{
+ terr = types.ReasonError{
Reason:
string(gatewayv1.RouteReasonBackendNotFound),
Message: fmt.Sprintf("Service %s not
found", targetNN),
}
@@ -490,7 +491,7 @@ func (r *HTTPRouteReconciler)
processHTTPRouteBackendRefs(tctx *provider.Transla
Namespace:
(*gatewayv1.Namespace)(&targetNN.Namespace),
},
); !permitted {
- terr = ReasonError{
+ terr = types.ReasonError{
Reason:
string(v1beta1.RouteReasonRefNotPermitted),
Message: fmt.Sprintf("%s is in a
different namespace than the HTTPRoute %s and no ReferenceGrant allowing
reference is configured", targetNN, hrNN),
}
@@ -549,7 +550,7 @@ func (r *HTTPRouteReconciler) processHTTPRoute(tctx
*provider.TranslateContext,
terror = err
continue
}
- tctx.PluginConfigs[types.NamespacedName{
+ tctx.PluginConfigs[k8stypes.NamespacedName{
Namespace: httpRoute.GetNamespace(),
Name:
string(filter.ExtensionRef.Name),
}] = pluginconfig
@@ -557,7 +558,7 @@ func (r *HTTPRouteReconciler) processHTTPRoute(tctx
*provider.TranslateContext,
}
for _, backend := range rule.BackendRefs {
if backend.Kind != nil && *backend.Kind != "Service" {
- terror = newInvalidKindError(*backend.Kind)
+ terror =
types.NewInvalidKindError(*backend.Kind)
continue
}
tctx.BackendRefs = append(tctx.BackendRefs,
gatewayv1.BackendRef{
@@ -659,7 +660,7 @@ func (r *HTTPRouteReconciler)
listHTTPRoutesForReferenceGrant(ctx context.Contex
var httpRouteList gatewayv1.HTTPRouteList
if err := r.List(ctx, &httpRouteList); err != nil {
- r.Log.Error(err, "failed to list httproutes for reference
ReferenceGrant", "ReferenceGrant", types.NamespacedName{Namespace:
obj.GetNamespace(), Name: obj.GetName()})
+ r.Log.Error(err, "failed to list httproutes for reference
ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace:
obj.GetNamespace(), Name: obj.GetName()})
return nil
}
diff --git a/internal/controller/utils.go b/internal/controller/utils.go
index 45cbf12c..0465f952 100644
--- a/internal/controller/utils.go
+++ b/internal/controller/utils.go
@@ -287,7 +287,7 @@ func SetRouteConditionResolvedRefs(routeParentStatus
*gatewayv1.RouteParentStatu
condition.Status = metav1.ConditionFalse
condition.Message = err.Error()
- var re ReasonError
+ var re types.ReasonError
if errors.As(err, &re) {
condition.Reason = re.Reason
}
@@ -436,7 +436,7 @@ func SetApisixCRDConditionAccepted(status
*apiv2.ApisixStatus, generation int64,
condition.Reason = string(apiv2.ConditionReasonInvalidSpec)
condition.Message = err.Error()
- var re ReasonError
+ var re types.ReasonError
if errors.As(err, &re) {
condition.Reason = re.Reason
}
@@ -984,36 +984,6 @@ func FullTypeName(a any) string {
return path.Join(path.Dir(pkgPath), name)
}
-type ReasonError struct {
- Reason string
- Message string
-}
-
-func (e ReasonError) Error() string {
- return e.Message
-}
-
-func IsSomeReasonError[Reason ~string](err error, reasons ...Reason) bool {
- if err == nil {
- return false
- }
- var re ReasonError
- if !errors.As(err, &re) {
- return false
- }
- if len(reasons) == 0 {
- return true
- }
- return slices.Contains(reasons, Reason(re.Reason))
-}
-
-func newInvalidKindError[Kind ~string](kind Kind) ReasonError {
- return ReasonError{
- Reason: string(gatewayv1.RouteReasonInvalidKind),
- Message: fmt.Sprintf("Invalid kind %s, only Service is
supported", kind),
- }
-}
-
// filterHostnames accepts a list of gateways and an HTTPRoute, and returns a
copy of the HTTPRoute with only the hostnames that match the listener hostnames
of the gateways.
// If the HTTPRoute hostnames do not intersect with the listener hostnames of
the gateways, it returns an ErrNoMatchingListenerHostname error.
func filterHostnames(gateways []RouteParentRefContext, httpRoute
*gatewayv1.HTTPRoute) (*gatewayv1.HTTPRoute, error) {
diff --git a/internal/controller/utils/utils.go
b/internal/controller/utils/utils.go
new file mode 100644
index 00000000..a22babc1
--- /dev/null
+++ b/internal/controller/utils/utils.go
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
+ "github.com/apache/apisix-ingress-controller/internal/utils"
+)
+
+func SetApisixCRDConditionWithGeneration(status *apiv2.ApisixStatus,
generation int64, condition metav1.Condition) {
+ condition.ObservedGeneration = generation
+ SetApisixCRDCondition(status, condition)
+}
+
+func SetApisixCRDCondition(status *apiv2.ApisixStatus, condition
metav1.Condition) {
+ for i, cond := range status.Conditions {
+ if cond.Type == condition.Type {
+ if cond.Status == condition.Status &&
+ cond.ObservedGeneration >
condition.ObservedGeneration {
+ return
+ }
+ status.Conditions[i] = condition
+ return
+ }
+ }
+
+ status.Conditions = append(status.Conditions, condition)
+}
+
+func NewConditionTypeAccepted(reason apiv2.ApisixRouteConditionReason, status
bool, generation int64, msg string) metav1.Condition {
+ var condition = metav1.Condition{
+ Type: string(apiv2.ConditionTypeAccepted),
+ Status: utils.ConditionStatus(status),
+ ObservedGeneration: generation,
+ LastTransitionTime: metav1.Now(),
+ Reason: string(reason),
+ Message: msg,
+ }
+ return condition
+}
+
+func MergeCondition(conditions []metav1.Condition, newCondition
metav1.Condition) []metav1.Condition {
+ if newCondition.LastTransitionTime.IsZero() {
+ newCondition.LastTransitionTime = metav1.Now()
+ }
+ newConditions := []metav1.Condition{}
+ for _, condition := range conditions {
+ if condition.Type != newCondition.Type {
+ newConditions = append(newConditions, condition)
+ }
+ }
+ newConditions = append(newConditions, newCondition)
+ return newConditions
+}
diff --git a/internal/manager/run.go b/internal/manager/run.go
index bb34b9cb..54ee31c8 100644
--- a/internal/manager/run.go
+++ b/internal/manager/run.go
@@ -156,7 +156,7 @@ func Run(ctx context.Context, logger logr.Logger) error {
return err
}
- provider, err := adc.New(&adc.Options{
+ provider, err := adc.New(updater.Writer(), &adc.Options{
SyncTimeout: config.ControllerConfig.ExecADCTimeout.Duration,
SyncPeriod:
config.ControllerConfig.ProviderConfig.SyncPeriod.Duration,
InitSyncDelay:
config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration,
diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go
index 0ab02d44..d3eb3f47 100644
--- a/internal/provider/adc/adc.go
+++ b/internal/provider/adc/adc.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
"github.com/apache/apisix-ingress-controller/internal/controller/label"
+ "github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/provider/adc/translator"
"github.com/apache/apisix-ingress-controller/internal/types"
@@ -73,6 +74,9 @@ type adcClient struct {
executor ADCExecutor
Options
+
+ updater status.Updater
+ statusUpdateMap map[types.NamespacedNameKind][]string
}
type Task struct {
@@ -83,7 +87,7 @@ type Task struct {
configs []adcConfig
}
-func New(opts ...Option) (provider.Provider, error) {
+func New(updater status.Updater, opts ...Option) (provider.Provider, error) {
o := Options{}
o.ApplyOptions(opts)
@@ -94,6 +98,7 @@ func New(opts ...Option) (provider.Provider, error) {
parentRefs:
make(map[types.NamespacedNameKind][]types.NamespacedNameKind),
store: NewStore(),
executor: &DefaultADCExecutor{},
+ updater: updater,
}, nil
}
@@ -318,6 +323,7 @@ func (d *adcClient) Sync(ctx context.Context) error {
log.Debugw("syncing resources with multiple configs",
zap.Any("configs", cfg))
+ failedMap := map[string]types.ADCExecutionErrors{}
var failedConfigs []string
for name, config := range cfg {
resources, err := d.store.GetResources(name)
@@ -337,8 +343,13 @@ func (d *adcClient) Sync(ctx context.Context) error {
}); err != nil {
log.Errorw("failed to sync resources",
zap.String("name", name), zap.Error(err))
failedConfigs = append(failedConfigs, name)
+ var execErrs types.ADCExecutionErrors
+ if errors.As(err, &execErrs) {
+ failedMap[name] = execErrs
+ }
}
}
+ d.handleADCExecutionErrors(failedMap)
if len(failedConfigs) > 0 {
return fmt.Errorf("failed to sync %d configs: %s",
len(failedConfigs),
@@ -363,15 +374,18 @@ func (d *adcClient) sync(ctx context.Context, task Task)
error {
args := BuildADCExecuteArgs(syncFilePath, task.Labels,
task.ResourceTypes)
- var failedConfigs []string
+ var errs types.ADCExecutionErrors
for _, config := range task.configs {
if err := d.executor.Execute(ctx, d.BackendMode, config, args);
err != nil {
log.Errorw("failed to execute adc command",
zap.Error(err), zap.Any("config", config))
- failedConfigs = append(failedConfigs, config.Name)
+ var execErr types.ADCExecutionError
+ if errors.As(err, &execErr) {
+ errs.Errors = append(errs.Errors, execErr)
+ }
}
}
- if len(failedConfigs) > 0 {
- return fmt.Errorf("failed to execute adc command for configs:
%s", strings.Join(failedConfigs, ", "))
+ if len(errs.Errors) > 0 {
+ return errs
}
return nil
}
@@ -399,3 +413,8 @@ func prepareSyncFile(resources any) (string, func(), error)
{
return tmpFile.Name(), cleanup, nil
}
+
+func (d *adcClient) handleADCExecutionErrors(statusesMap
map[string]types.ADCExecutionErrors) {
+ statusUpdateMap := d.resolveADCExecutionErrors(statusesMap)
+ d.handleStatusUpdate(statusUpdateMap)
+}
diff --git a/internal/provider/adc/executor.go
b/internal/provider/adc/executor.go
index bd7b6c7f..857ef14d 100644
--- a/internal/provider/adc/executor.go
+++ b/internal/provider/adc/executor.go
@@ -33,6 +33,7 @@ import (
"go.uber.org/zap"
adctypes "github.com/apache/apisix-ingress-controller/api/adc"
+ "github.com/apache/apisix-ingress-controller/internal/types"
)
type ADCExecutor interface {
@@ -51,15 +52,26 @@ func (e *DefaultADCExecutor) Execute(ctx context.Context,
mode string, config ad
}
func (e *DefaultADCExecutor) runADC(ctx context.Context, mode string, config
adcConfig, args []string) error {
- var failedAddrs []string
+ var execErrs = types.ADCExecutionError{
+ Name: config.Name,
+ }
+
for _, addr := range config.ServerAddrs {
if err := e.runForSingleServerWithTimeout(ctx, addr, mode,
config, args); err != nil {
log.Errorw("failed to run adc for server",
zap.String("server", addr), zap.Error(err))
- failedAddrs = append(failedAddrs, addr)
+ var execErr types.ADCExecutionServerAddrError
+ if errors.As(err, &execErr) {
+ execErrs.FailedErrors =
append(execErrs.FailedErrors, execErr)
+ } else {
+ execErrs.FailedErrors =
append(execErrs.FailedErrors, types.ADCExecutionServerAddrError{
+ ServerAddr: addr,
+ Err: err.Error(),
+ })
+ }
}
}
- if len(failedAddrs) > 0 {
- return fmt.Errorf("failed to run adc for servers: [%s]",
strings.Join(failedAddrs, ", "))
+ if len(execErrs.FailedErrors) > 0 {
+ return execErrs
}
return nil
}
@@ -95,7 +107,25 @@ func (e *DefaultADCExecutor) runForSingleServer(ctx
context.Context, serverAddr,
return e.buildCmdError(err, stdout.Bytes(), stderr.Bytes())
}
- return e.handleOutput(stdout.Bytes())
+ result, err := e.handleOutput(stdout.Bytes())
+ if err != nil {
+ log.Errorw("failed to handle adc output",
+ zap.Error(err),
+ zap.String("stdout", stdout.String()),
+ zap.String("stderr", stderr.String()),
+ )
+ return fmt.Errorf("failed to handle adc output: %w", err)
+ }
+ if result.FailedCount > 0 && len(result.Failed) > 0 {
+ log.Errorw("adc sync failed", zap.Any("result", result))
+ return types.ADCExecutionServerAddrError{
+ ServerAddr: serverAddr,
+ Err: result.Failed[0].Reason,
+ FailedStatuses: result.Failed,
+ }
+ }
+ log.Debugw("adc sync success", zap.Any("result", result))
+ return nil
}
func (e *DefaultADCExecutor) prepareEnv(serverAddr, mode, token string)
[]string {
@@ -121,7 +151,7 @@ func (e *DefaultADCExecutor) buildCmdError(runErr error,
stdout, stderr []byte)
return errors.New("failed to sync resources: " + errMsg + ", exit err:
" + runErr.Error())
}
-func (e *DefaultADCExecutor) handleOutput(output []byte) error {
+func (e *DefaultADCExecutor) handleOutput(output []byte)
(*adctypes.SyncResult, error) {
var result adctypes.SyncResult
log.Debugf("adc output: %s", string(output))
if lines := bytes.Split(output, []byte{'\n'}); len(lines) > 0 {
@@ -132,16 +162,10 @@ func (e *DefaultADCExecutor) handleOutput(output []byte)
error {
zap.Error(err),
zap.String("stdout", string(output)),
)
- return errors.New("failed to parse adc result: " + err.Error())
- }
-
- if result.FailedCount > 0 && len(result.Failed) > 0 {
- log.Errorw("adc sync failed", zap.Any("result", result))
- return errors.New(result.Failed[0].Reason)
+ return nil, errors.New("failed to parse adc result: " +
err.Error())
}
- log.Debugw("adc sync success", zap.Any("result", result))
- return nil
+ return &result, nil
}
func BuildADCExecuteArgs(filePath string, labels map[string]string, types
[]string) []string {
diff --git a/internal/provider/adc/status.go b/internal/provider/adc/status.go
new file mode 100644
index 00000000..8d1f70f8
--- /dev/null
+++ b/internal/provider/adc/status.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package adc
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/api7/gopkg/pkg/log"
+ "go.uber.org/zap"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
+
+ apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
+ "github.com/apache/apisix-ingress-controller/internal/controller/label"
+ "github.com/apache/apisix-ingress-controller/internal/controller/status"
+ cutils
"github.com/apache/apisix-ingress-controller/internal/controller/utils"
+ "github.com/apache/apisix-ingress-controller/internal/types"
+)
+
+// handleStatusUpdate updates resource conditions based on the latest sync
results.
+//
+// It maintains a history of failed resources in d.statusUpdateMap.
+//
+// For resources in the current failure map (statusUpdateMap), it marks them
as failed.
+// For resources that exist only in the previous failure history (i.e. not in
this sync's failures),
+// it marks them as accepted (success).
+func (d *adcClient) handleStatusUpdate(statusUpdateMap
map[types.NamespacedNameKind][]string) {
+ // Mark all resources in the current failure set as failed.
+ for nnk, msgs := range statusUpdateMap {
+ d.updateStatus(nnk, cutils.NewConditionTypeAccepted(
+ apiv2.ConditionReasonSyncFailed,
+ false,
+ 0,
+ strings.Join(msgs, "; "),
+ ))
+ }
+
+ // Mark resources that exist only in the previous failure history as
successful.
+ for nnk := range d.statusUpdateMap {
+ if _, ok := statusUpdateMap[nnk]; !ok {
+ d.updateStatus(nnk, cutils.NewConditionTypeAccepted(
+ apiv2.ConditionReasonAccepted,
+ true,
+ 0,
+ "",
+ ))
+ }
+ }
+ // Update the failure history with the current failure set.
+ d.statusUpdateMap = statusUpdateMap
+}
+
+func (d *adcClient) updateStatus(nnk types.NamespacedNameKind, condition
metav1.Condition) {
+ switch nnk.Kind {
+ case types.KindApisixRoute:
+ d.updater.Update(status.Update{
+ NamespacedName: nnk.NamespacedName(),
+ Resource: &apiv2.ApisixRoute{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ cp := obj.(*apiv2.ApisixRoute).DeepCopy()
+
cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(),
condition)
+ return cp
+ }),
+ })
+ case types.KindApisixGlobalRule:
+ d.updater.Update(status.Update{
+ NamespacedName: nnk.NamespacedName(),
+ Resource: &apiv2.ApisixGlobalRule{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ cp := obj.(*apiv2.ApisixGlobalRule).DeepCopy()
+
cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(),
condition)
+ return cp
+ }),
+ })
+ case types.KindApisixTls:
+ d.updater.Update(status.Update{
+ NamespacedName: nnk.NamespacedName(),
+ Resource: &apiv2.ApisixTls{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ cp := obj.(*apiv2.ApisixTls).DeepCopy()
+
cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(),
condition)
+ return cp
+ }),
+ })
+ case types.KindApisixConsumer:
+ d.updater.Update(status.Update{
+ NamespacedName: nnk.NamespacedName(),
+ Resource: &apiv2.ApisixConsumer{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ cp := obj.(*apiv2.ApisixConsumer).DeepCopy()
+
cutils.SetApisixCRDConditionWithGeneration(&cp.Status, cp.GetGeneration(),
condition)
+ return cp
+ }),
+ })
+ case types.KindHTTPRoute:
+ parentRefs := d.getParentRefs(nnk)
+ gatewayRefs := map[types.NamespacedNameKind]struct{}{}
+ for _, parentRef := range parentRefs {
+ if parentRef.Kind == types.KindGateway {
+ gatewayRefs[parentRef] = struct{}{}
+ }
+ }
+ d.updater.Update(status.Update{
+ NamespacedName: nnk.NamespacedName(),
+ Resource: &gatewayv1.HTTPRoute{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ cp := obj.(*gatewayv1.HTTPRoute).DeepCopy()
+ gatewayNs := cp.GetNamespace()
+ for i, ref := range cp.Status.Parents {
+ ns := gatewayNs
+ if ref.ParentRef.Namespace != nil {
+ ns =
string(*ref.ParentRef.Namespace)
+ }
+ if ref.ParentRef.Kind == nil ||
*ref.ParentRef.Kind == types.KindGateway {
+ nnk := types.NamespacedNameKind{
+ Name:
string(ref.ParentRef.Name),
+ Namespace: ns,
+ Kind:
types.KindGateway,
+ }
+ if _, ok := gatewayRefs[nnk];
ok {
+ ref.Conditions =
cutils.MergeCondition(ref.Conditions, condition)
+ cp.Status.Parents[i] =
ref
+ }
+ }
+ }
+ return cp
+ }),
+ })
+ }
+}
+
+func (d *adcClient) resolveADCExecutionErrors(
+ statusesMap map[string]types.ADCExecutionErrors,
+) map[types.NamespacedNameKind][]string {
+ statusUpdateMap := map[types.NamespacedNameKind][]string{}
+ for configName, execErrors := range statusesMap {
+ for _, execErr := range execErrors.Errors {
+ for _, failedStatus := range execErr.FailedErrors {
+ if len(failedStatus.FailedStatuses) == 0 {
+ d.handleEmptyFailedStatuses(configName,
failedStatus, statusUpdateMap)
+ } else {
+
d.handleDetailedFailedStatuses(configName, failedStatus, statusUpdateMap)
+ }
+ }
+ }
+ }
+
+ return statusUpdateMap
+}
+
+func (d *adcClient) handleEmptyFailedStatuses(
+ configName string,
+ failedStatus types.ADCExecutionServerAddrError,
+ statusUpdateMap map[types.NamespacedNameKind][]string,
+) {
+ resource, err := d.store.GetResources(configName)
+ if err != nil {
+ log.Errorw("failed to get resources from store",
zap.String("configName", configName), zap.Error(err))
+ return
+ }
+
+ for _, obj := range resource.Services {
+ d.addResourceToStatusUpdateMap(obj.GetLabels(),
failedStatus.Error(), statusUpdateMap)
+ }
+
+ for _, obj := range resource.Consumers {
+ d.addResourceToStatusUpdateMap(obj.GetLabels(),
failedStatus.Error(), statusUpdateMap)
+ }
+
+ for _, obj := range resource.SSLs {
+ d.addResourceToStatusUpdateMap(obj.GetLabels(),
failedStatus.Error(), statusUpdateMap)
+ }
+
+ globalRules, err := d.store.ListGlobalRules(configName)
+ if err != nil {
+ log.Errorw("failed to list global rules",
zap.String("configName", configName), zap.Error(err))
+ return
+ }
+ for _, rule := range globalRules {
+ d.addResourceToStatusUpdateMap(rule.GetLabels(),
failedStatus.Error(), statusUpdateMap)
+ }
+}
+
+func (d *adcClient) handleDetailedFailedStatuses(
+ configName string,
+ failedStatus types.ADCExecutionServerAddrError,
+ statusUpdateMap map[types.NamespacedNameKind][]string,
+) {
+ for _, status := range failedStatus.FailedStatuses {
+ id := status.Event.ResourceID
+ labels, err := d.store.GetResourceLabel(configName,
status.Event.ResourceType, id)
+ if err != nil {
+ log.Errorw("failed to get resource label",
+ zap.String("configName", configName),
+ zap.String("resourceType",
status.Event.ResourceType),
+ zap.String("id", id),
+ zap.Error(err),
+ )
+ continue
+ }
+ d.addResourceToStatusUpdateMap(
+ labels,
+ fmt.Sprintf("ServerAddr: %s, Error: %s",
failedStatus.ServerAddr, status.Reason),
+ statusUpdateMap,
+ )
+ }
+}
+
+func (d *adcClient) addResourceToStatusUpdateMap(
+ labels map[string]string,
+ msg string,
+ statusUpdateMap map[types.NamespacedNameKind][]string,
+) {
+ statusKey := types.NamespacedNameKind{
+ Name: labels[label.LabelName],
+ Namespace: labels[label.LabelNamespace],
+ Kind: labels[label.LabelKind],
+ }
+ statusUpdateMap[statusKey] = append(statusUpdateMap[statusKey], msg)
+}
diff --git a/internal/provider/adc/store.go b/internal/provider/adc/store.go
index 62c554dc..d5626c93 100644
--- a/internal/provider/adc/store.go
+++ b/internal/provider/adc/store.go
@@ -18,6 +18,7 @@
package adc
import (
+ "fmt"
"sync"
"github.com/api7/gopkg/pkg/log"
@@ -63,7 +64,7 @@ func (s *Store) Insert(name string, resourceTypes []string,
resources adctypes.R
}
for _, resourceType := range resourceTypes {
switch resourceType {
- case "service":
+ case adctypes.TypeService:
services, err := targetCache.ListServices(selector)
if err != nil {
return err
@@ -78,7 +79,7 @@ func (s *Store) Insert(name string, resourceTypes []string,
resources adctypes.R
return err
}
}
- case "consumer":
+ case adctypes.TypeConsumer:
consumers, err := targetCache.ListConsumers(selector)
if err != nil {
return err
@@ -93,7 +94,7 @@ func (s *Store) Insert(name string, resourceTypes []string,
resources adctypes.R
return err
}
}
- case "ssl":
+ case adctypes.TypeSSL:
ssls, err := targetCache.ListSSL(selector)
if err != nil {
return err
@@ -109,7 +110,7 @@ func (s *Store) Insert(name string, resourceTypes []string,
resources adctypes.R
return err
}
}
- case "global_rule":
+ case adctypes.TypeGlobalRule:
// List existing global rules that match the selector
globalRules, err :=
targetCache.ListGlobalRules(selector)
if err != nil {
@@ -136,7 +137,7 @@ func (s *Store) Insert(name string, resourceTypes []string,
resources adctypes.R
return err
}
}
- case "plugin_metadata":
+ case adctypes.TypePluginMetadata:
s.pluginMetadataMap[name] = resources.PluginMetadata
default:
continue
@@ -159,7 +160,7 @@ func (s *Store) Delete(name string, resourceTypes []string,
Labels map[string]st
}
for _, resourceType := range resourceTypes {
switch resourceType {
- case "service":
+ case adctypes.TypeService:
services, err := targetCache.ListServices(selector)
if err != nil {
log.Errorf("failed to list services: %v", err)
@@ -169,7 +170,7 @@ func (s *Store) Delete(name string, resourceTypes []string,
Labels map[string]st
log.Errorf("failed to delete service
%s: %v", service.ID, err)
}
}
- case "ssl":
+ case adctypes.TypeSSL:
ssls, err := targetCache.ListSSL(selector)
if err != nil {
log.Errorf("failed to list ssl: %v", err)
@@ -179,7 +180,7 @@ func (s *Store) Delete(name string, resourceTypes []string,
Labels map[string]st
log.Errorf("failed to delete ssl %s:
%v", ssl.ID, err)
}
}
- case "consumer":
+ case adctypes.TypeConsumer:
consumers, err := targetCache.ListConsumers(selector)
if err != nil {
log.Errorf("failed to list consumers: %v", err)
@@ -189,7 +190,7 @@ func (s *Store) Delete(name string, resourceTypes []string,
Labels map[string]st
log.Errorf("failed to delete consumer
%s: %v", consumer.Username, err)
}
}
- case "global_rule":
+ case adctypes.TypeGlobalRule:
globalRules, err :=
targetCache.ListGlobalRules(selector)
if err != nil {
log.Errorf("failed to list global rules: %v",
err)
@@ -199,7 +200,7 @@ func (s *Store) Delete(name string, resourceTypes []string,
Labels map[string]st
log.Errorf("failed to delete global
rule %s: %v", globalRule.ID, err)
}
}
- case "plugin_metadata":
+ case adctypes.TypePluginMetadata:
delete(s.pluginMetadataMap, name)
}
}
@@ -244,3 +245,75 @@ func (s *Store) GetResources(name string)
(*adctypes.Resources, error) {
PluginMetadata: metadata,
}, nil
}
+
+func (s *Store) ListGlobalRules(name string) ([]*adctypes.GlobalRuleItem,
error) {
+ s.Lock()
+ defer s.Unlock()
+ targetCache, ok := s.cacheMap[name]
+ if !ok {
+ return nil, fmt.Errorf("cache not found for name: %s", name)
+ }
+ globalRules, err := targetCache.ListGlobalRules()
+ if err != nil {
+ return nil, fmt.Errorf("failed to list global rules: %w", err)
+ }
+ return globalRules, nil
+}
+
+func (s *Store) GetResourceLabel(name, resourceType string, id string)
(map[string]string, error) {
+ s.Lock()
+ defer s.Unlock()
+ targetCache, ok := s.cacheMap[name]
+ if !ok {
+ return nil, fmt.Errorf("cache not found for name: %s", name)
+ }
+ switch resourceType {
+ case adctypes.TypeService:
+ service, err := targetCache.GetService(id)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get service: %w", err)
+ }
+ return service.Labels, nil
+ case adctypes.TypeRoute:
+ services, err := targetCache.ListServices()
+ if err != nil {
+ return nil, fmt.Errorf("failed to list services: %w",
err)
+ }
+ for _, service := range services {
+ for _, route := range service.Routes {
+ if route.ID == id {
+ // Return labels from the service that
contains the route
+ return route.GetLabels(), nil
+ }
+ }
+ }
+ return nil, fmt.Errorf("route not found: %s", id)
+ case adctypes.TypeSSL:
+ ssl, err := targetCache.GetSSL(id)
+ if err != nil {
+ return nil, err
+ }
+ if ssl != nil {
+ return ssl.GetLabels(), nil
+ }
+ case adctypes.TypeConsumer:
+ consumer, err := targetCache.GetConsumer(id)
+ if err != nil {
+ return nil, err
+ }
+ if consumer != nil {
+ return consumer.Labels, nil
+ }
+ case adctypes.TypeGlobalRule:
+ globalRule, err := targetCache.GetGlobalRule(id)
+ if err != nil {
+ return nil, err
+ }
+ if globalRule != nil {
+ return globalRule.GetLabels(), nil
+ }
+ default:
+ return nil, fmt.Errorf("unknown resource type: %s",
resourceType)
+ }
+ return nil, nil
+}
diff --git a/internal/types/error.go b/internal/types/error.go
new file mode 100644
index 00000000..80dbf568
--- /dev/null
+++ b/internal/types/error.go
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package types
+
+import (
+ "errors"
+ "fmt"
+ "slices"
+ "strings"
+
+ gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
+
+ "github.com/apache/apisix-ingress-controller/api/adc"
+)
+
+type ReasonError struct {
+ Reason string
+ Message string
+}
+
+func (e ReasonError) Error() string {
+ return e.Message
+}
+
+func IsSomeReasonError[Reason ~string](err error, reasons ...Reason) bool {
+ if err == nil {
+ return false
+ }
+ var re ReasonError
+ if !errors.As(err, &re) {
+ return false
+ }
+ if len(reasons) == 0 {
+ return true
+ }
+ return slices.Contains(reasons, Reason(re.Reason))
+}
+
+func NewInvalidKindError[Kind ~string](kind Kind) ReasonError {
+ return ReasonError{
+ Reason: string(gatewayv1.RouteReasonInvalidKind),
+ Message: fmt.Sprintf("Invalid kind %s, only Service is
supported", kind),
+ }
+}
+
+type ADCExecutionErrors struct {
+ Errors []ADCExecutionError
+}
+
+func (e ADCExecutionErrors) Error() string {
+ messages := make([]string, 0, len(e.Errors))
+ for _, err := range e.Errors {
+ messages = append(messages, err.Error())
+ }
+ return fmt.Sprintf("ADC execution errors: [%s]", strings.Join(messages,
"; "))
+}
+
+type ADCExecutionError struct {
+ Name string
+ FailedErrors []ADCExecutionServerAddrError
+}
+
+func (e ADCExecutionError) Error() string {
+ messages := make([]string, 0, len(e.FailedErrors))
+ for _, failed := range e.FailedErrors {
+ messages = append(messages, failed.Error())
+ }
+ return fmt.Sprintf("ADC execution error for %s: [%s]", e.Name,
strings.Join(messages, "; "))
+}
+
+type ADCExecutionServerAddrError struct {
+ Err string
+ ServerAddr string
+ FailedStatuses []adc.SyncStatus
+}
+
+func (e ADCExecutionServerAddrError) Error() string {
+ return fmt.Sprintf("ServerAddr: %s, Err: %s", e.ServerAddr, e.Err)
+}
diff --git a/internal/types/types.go b/internal/types/k8s.go
similarity index 55%
copy from internal/types/types.go
copy to internal/types/k8s.go
index d501db75..4bdf1e93 100644
--- a/internal/types/types.go
+++ b/internal/types/k8s.go
@@ -17,8 +17,21 @@
package types
-type NamespacedNameKind struct {
- Namespace string
- Name string
- Kind string
-}
+const DefaultIngressClassAnnotation =
"ingressclass.kubernetes.io/is-default-class"
+
+const (
+ KindGateway = "Gateway"
+ KindHTTPRoute = "HTTPRoute"
+ KindGatewayClass = "GatewayClass"
+ KindIngress = "Ingress"
+ KindIngressClass = "IngressClass"
+ KindGatewayProxy = "GatewayProxy"
+ KindSecret = "Secret"
+ KindService = "Service"
+ KindApisixRoute = "ApisixRoute"
+ KindApisixGlobalRule = "ApisixGlobalRule"
+ KindApisixPluginConfig = "ApisixPluginConfig"
+ KindPod = "Pod"
+ KindApisixTls = "ApisixTls"
+ KindApisixConsumer = "ApisixConsumer"
+)
diff --git a/internal/types/types.go b/internal/types/types.go
index d501db75..f695da87 100644
--- a/internal/types/types.go
+++ b/internal/types/types.go
@@ -17,8 +17,19 @@
package types
+import (
+ k8stypes "k8s.io/apimachinery/pkg/types"
+)
+
type NamespacedNameKind struct {
Namespace string
Name string
Kind string
}
+
+func (n *NamespacedNameKind) NamespacedName() k8stypes.NamespacedName {
+ return k8stypes.NamespacedName{
+ Namespace: n.Namespace,
+ Name: n.Name,
+ }
+}
diff --git a/internal/utils/k8s.go b/internal/utils/k8s.go
index 023831f1..0bca19d0 100644
--- a/internal/utils/k8s.go
+++ b/internal/utils/k8s.go
@@ -21,6 +21,7 @@ import (
"net"
"regexp"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -91,3 +92,10 @@ func IsSubsetOf(a, b map[string]string) bool {
}
return true
}
+
+func ConditionStatus(status bool) metav1.ConditionStatus {
+ if status {
+ return metav1.ConditionTrue
+ }
+ return metav1.ConditionFalse
+}
diff --git a/test/e2e/apisix/status.go b/test/e2e/apisix/status.go
new file mode 100644
index 00000000..1c72e412
--- /dev/null
+++ b/test/e2e/apisix/status.go
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package apisix
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "k8s.io/apimachinery/pkg/types"
+
+ apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
+ "github.com/apache/apisix-ingress-controller/test/e2e/framework"
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = Describe("Test CRD Status", Label("apisix.apache.org", "v2",
"apisixroute"), func() {
+ var (
+ s = scaffold.NewScaffold(&scaffold.Options{
+ ControllerName:
"apisix.apache.org/apisix-ingress-controller",
+ })
+ applier = framework.NewApplier(s.GinkgoT, s.K8sClient,
s.CreateResourceFromString)
+ )
+
+ assertion := func(actualOrCtx any, args ...any) AsyncAssertion {
+ return
Eventually(actualOrCtx).WithArguments(args...).WithTimeout(30 *
time.Second).ProbeEvery(time.Second)
+ }
+
+ Context("Test ApisixRoute Sync Status", func() {
+ BeforeEach(func() {
+ By("create GatewayProxy")
+ gatewayProxy := fmt.Sprintf(gatewayProxyYaml,
s.Deployer.GetAdminEndpoint(), s.AdminKey())
+ err :=
s.CreateResourceFromStringWithNamespace(gatewayProxy, "default")
+ Expect(err).NotTo(HaveOccurred(), "creating
GatewayProxy")
+ time.Sleep(5 * time.Second)
+
+ By("create IngressClass")
+ err =
s.CreateResourceFromStringWithNamespace(ingressClassYaml, "")
+ Expect(err).NotTo(HaveOccurred(), "creating
IngressClass")
+ time.Sleep(5 * time.Second)
+ })
+ const ar = `
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: default
+spec:
+ ingressClassName: apisix
+ http:
+ - name: rule0
+ match:
+ hosts:
+ - httpbin
+ paths:
+ - /*
+ backends:
+ - serviceName: httpbin-service-e2e-test
+ servicePort: 80
+`
+ const arWithInvalidPlugin = `
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: default
+spec:
+ ingressClassName: apisix
+ http:
+ - name: rule0
+ match:
+ hosts:
+ - httpbin
+ paths:
+ - /*
+ backends:
+ - serviceName: httpbin-service-e2e-test
+ servicePort: 80
+ plugins:
+ - name: non-existent-plugin
+ enable: true
+`
+
+ getRequest := func(path string) func() int {
+ return func() int {
+ return
s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
+ }
+ }
+
+ It("unknown plugin", func() {
+ if os.Getenv("PROVIDER_TYPE") == "apisix-standalone" {
+ Skip("apisix standalone does not validate
unknown plugins")
+ }
+ By("apply ApisixRoute with valid plugin")
+ applier.MustApplyAPIv2(types.NamespacedName{Namespace:
s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, arWithInvalidPlugin)
+
+ By("check ApisixRoute status")
+ assertion(func() string {
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml")
+ return output
+ }).Should(
+ And(
+ ContainSubstring(`status: "False"`),
+ ContainSubstring(`reason: SyncFailed`),
+ ContainSubstring(`unknown plugin
[non-existent-plugin]`),
+ ),
+ )
+
+ By("Update ApisixRoute")
+ applier.MustApplyAPIv2(types.NamespacedName{Namespace:
s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, ar)
+
+ By("check ApisixRoute status")
+ assertion(func() string {
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml")
+ return output
+ }).Should(
+ And(
+ ContainSubstring(`status: "True"`),
+ ContainSubstring(`reason: Accepted`),
+ ),
+ )
+
+ By("check route in APISIX")
+ assertion(getRequest("/get")).Should(Equal(200),
"should be able to access the route")
+ })
+
+ It("dataplane unavailable", func() {
+ By("apply ApisixRoute")
+ applier.MustApplyAPIv2(types.NamespacedName{Namespace:
s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, ar)
+
+ By("check ApisixRoute status")
+ assertion(func() string {
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml")
+ return output
+ }).Should(
+ And(
+ ContainSubstring(`status: "True"`),
+ ContainSubstring(`reason: Accepted`),
+ ),
+ )
+
+ By("check route in APISIX")
+ assertion(getRequest("/get")).Should(Equal(200),
"should be able to access the route")
+
+ s.Deployer.ScaleDataplane(0)
+
+ By("check ApisixRoute status")
+ assertion(func() string {
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml")
+ return output
+ }).Should(
+ And(
+ ContainSubstring(`status: "False"`),
+ ContainSubstring(`reason: SyncFailed`),
+ ),
+ )
+
+ s.Deployer.ScaleDataplane(1)
+
+ By("check ApisixRoute status after scaling up")
+ assertion(func() string {
+ output, _ := s.GetOutputFromString("ar",
"default", "-o", "yaml")
+ return output
+ }).Should(
+ And(
+ ContainSubstring(`status: "True"`),
+ ContainSubstring(`reason: Accepted`),
+ ),
+ )
+
+ By("check route in APISIX")
+ assertion(getRequest("/get")).Should(Equal(200),
"should be able to access the route")
+ })
+ })
+
+ Context("Test HTTPRoute Sync Status", func() {
+ const httproute = `
+apiVersion: gateway.networking.k8s.io/v1
+kind: HTTPRoute
+metadata:
+ name: httpbin
+spec:
+ parentRefs:
+ - name: apisix
+ hostnames:
+ - "httpbin"
+ rules:
+ - matches:
+ - path:
+ type: Exact
+ value: /get
+ backendRefs:
+ - name: httpbin-service-e2e-test
+ port: 80
+`
+ const gatewayClass = `
+apiVersion: gateway.networking.k8s.io/v1
+kind: GatewayClass
+metadata:
+ name: %s
+spec:
+ controllerName: %s
+`
+ const gatewayProxy = `
+apiVersion: apisix.apache.org/v1alpha1
+kind: GatewayProxy
+metadata:
+ name: apisix-proxy-config
+spec:
+ provider:
+ type: ControlPlane
+ controlPlane:
+ endpoints:
+ - %s
+ auth:
+ type: AdminKey
+ adminKey:
+ value: "%s"
+`
+ const defaultGateway = `
+apiVersion: gateway.networking.k8s.io/v1
+kind: Gateway
+metadata:
+ name: apisix
+spec:
+ gatewayClassName: %s
+ listeners:
+ - name: http1
+ protocol: HTTP
+ port: 80
+ infrastructure:
+ parametersRef:
+ group: apisix.apache.org
+ kind: GatewayProxy
+ name: apisix-proxy-config
+`
+ BeforeEach(func() {
+ By("create GatewayProxy")
+ gatewayProxy := fmt.Sprintf(gatewayProxy,
s.Deployer.GetAdminEndpoint(), s.AdminKey())
+ err := s.CreateResourceFromString(gatewayProxy)
+ Expect(err).NotTo(HaveOccurred(), "creating
GatewayProxy")
+ time.Sleep(5 * time.Second)
+
+ By("create GatewayClass")
+ gatewayClassName := fmt.Sprintf("apisix-%d",
time.Now().Unix())
+ err =
s.CreateResourceFromStringWithNamespace(fmt.Sprintf(gatewayClass,
gatewayClassName, s.GetControllerName()), "")
+ Expect(err).NotTo(HaveOccurred(), "creating
GatewayClass")
+ time.Sleep(5 * time.Second)
+
+ By("create Gateway")
+ err =
s.CreateResourceFromString(fmt.Sprintf(defaultGateway, gatewayClassName))
+ Expect(err).NotTo(HaveOccurred(), "creating Gateway")
+ time.Sleep(5 * time.Second)
+
+ By("check Gateway condition")
+ gwyaml, err := s.GetResourceYaml("Gateway", "apisix")
+ Expect(err).NotTo(HaveOccurred(), "getting Gateway
yaml")
+ Expect(gwyaml).To(ContainSubstring(`status: "True"`),
"checking Gateway condition status")
+ Expect(gwyaml).To(ContainSubstring("message: the
gateway has been accepted by the apisix-ingress-controller"), "checking Gateway
condition message")
+ })
+ AfterEach(func() {
+ _ = s.DeleteResource("Gateway", "apisix")
+ })
+ getRequest := func(path string) func() int {
+ return func() int {
+ return
s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
+ }
+ }
+ var resourceApplied = func(resourType, resourceName,
resourceRaw string, observedGeneration int) {
+ Expect(s.CreateResourceFromString(resourceRaw)).
+ NotTo(HaveOccurred(), fmt.Sprintf("creating
%s", resourType))
+
+ Eventually(func() string {
+ hryaml, err := s.GetResourceYaml(resourType,
resourceName)
+ Expect(err).NotTo(HaveOccurred(),
fmt.Sprintf("getting %s yaml", resourType))
+ return hryaml
+ }, "8s", "2s").
+ Should(
+ SatisfyAll(
+ ContainSubstring(`status:
"True"`),
+
ContainSubstring(fmt.Sprintf("observedGeneration: %d", observedGeneration)),
+ ),
+ fmt.Sprintf("checking %s condition
status", resourType),
+ )
+ time.Sleep(5 * time.Second)
+ }
+
+ It("dataplane unavailable", func() {
+ By("Create HTTPRoute")
+ resourceApplied("HTTPRoute", "httpbin", httproute, 1)
+
+ By("check route in APISIX")
+ assertion(getRequest("/get")).Should(Equal(200),
"should be able to access the route")
+
+ s.Deployer.ScaleDataplane(0)
+ time.Sleep(10 * time.Second)
+
+ By("check ApisixRoute status")
+ assertion(func() string {
+ output, _ := s.GetOutputFromString("httproute",
"httpbin", "-o", "yaml")
+ return output
+ }).Should(
+ And(
+ ContainSubstring(`status: "False"`),
+ ContainSubstring(`reason: SyncFailed`),
+ ),
+ )
+
+ s.Deployer.ScaleDataplane(1)
+ time.Sleep(10 * time.Second)
+
+ By("check ApisixRoute status after scaling up")
+ assertion(func() string {
+ output, _ := s.GetOutputFromString("httproute",
"httpbin", "-o", "yaml")
+ return output
+ }).Should(
+ And(
+ ContainSubstring(`status: "True"`),
+ ContainSubstring(`reason: Accepted`),
+ ),
+ )
+
+ By("check route in APISIX")
+ assertion(getRequest("/get")).Should(Equal(200),
"should be able to access the route")
+ })
+ })
+})
diff --git a/test/e2e/framework/manifests/ingress.yaml
b/test/e2e/framework/manifests/ingress.yaml
index 1ed00c10..c4bb1014 100644
--- a/test/e2e/framework/manifests/ingress.yaml
+++ b/test/e2e/framework/manifests/ingress.yaml
@@ -327,6 +327,7 @@ data:
controller_name: {{ .ControllerName | default
"apisix.apache.org/apisix-ingress-controller" }}
leader_election_id: "apisix-ingress-controller-leader"
+
provider:
type: {{ .ProviderType | default "apisix" }}
sync_period: {{ .ProviderSyncPeriod | default "0s" }}
diff --git a/test/e2e/scaffold/apisix_deployer.go
b/test/e2e/scaffold/apisix_deployer.go
index 67f20a86..b6fe7feb 100644
--- a/test/e2e/scaffold/apisix_deployer.go
+++ b/test/e2e/scaffold/apisix_deployer.go
@@ -250,6 +250,12 @@ func (s *APISIXDeployer) deployDataplane(opts
*APISIXDeployOptions) *corev1.Serv
return svc
}
+func (s *APISIXDeployer) ScaleDataplane(replicas int) {
+ s.DeployDataplane(DeployDataplaneOptions{
+ Replicas: ptr.To(replicas),
+ })
+}
+
func (s *APISIXDeployer) DeployIngress() {
s.Framework.DeployIngress(framework.IngressDeployOpts{
ControllerName: s.opts.ControllerName,
diff --git a/test/e2e/scaffold/deployer.go b/test/e2e/scaffold/deployer.go
index cfbe4043..94d740cd 100644
--- a/test/e2e/scaffold/deployer.go
+++ b/test/e2e/scaffold/deployer.go
@@ -26,6 +26,7 @@ type Deployer interface {
DeployDataplane(opts DeployDataplaneOptions)
DeployIngress()
ScaleIngress(replicas int)
+ ScaleDataplane(replicas int)
BeforeEach()
AfterEach()
CreateAdditionalGateway(namePrefix string) (string, *corev1.Service,
error)