Revolyssup commented on code in PR #2564:
URL:
https://github.com/apache/apisix-ingress-controller/pull/2564#discussion_r2378472720
##########
internal/manager/controllers.go:
##########
@@ -121,6 +124,14 @@ func setupControllers(ctx context.Context, mgr
manager.Manager, pro provider.Pro
Updater: updater,
Readier: readier,
},
+ &controller.TCPRouteReconciler{
+ Client: mgr.GetClient(),
+ Scheme: mgr.GetScheme(),
+ Log:
ctrl.LoggerFrom(ctx).WithName("controllers").WithName("TCPRoute"),
Review Comment:
done
##########
internal/manager/controllers.go:
##########
@@ -81,6 +81,9 @@ import (
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes,verbs=get;list;watch
//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes/status,verbs=get;update
+//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes,verbs=get;list;watch
+//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes/status,verbs=get;update
+//
+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants,verbs=list;watch;update
Review Comment:
done
##########
internal/controller/tcproute_controller.go:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package controller
+
+import (
+ "cmp"
+ "context"
+ "fmt"
+
+ "github.com/apache/apisix-ingress-controller/api/v1alpha1"
+
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
+ "github.com/apache/apisix-ingress-controller/internal/controller/status"
+ "github.com/apache/apisix-ingress-controller/internal/manager/readiness"
+ "github.com/apache/apisix-ingress-controller/internal/provider"
+ "github.com/apache/apisix-ingress-controller/internal/types"
+ "github.com/apache/apisix-ingress-controller/internal/utils"
+ "github.com/go-logr/logr"
+ corev1 "k8s.io/api/core/v1"
+ discoveryv1 "k8s.io/api/discovery/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ k8stypes "k8s.io/apimachinery/pkg/types"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/builder"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
+ gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
+ "sigs.k8s.io/gateway-api/apis/v1beta1"
+)
+
+// TCPRouteReconciler reconciles a TCPRoute object.
+type TCPRouteReconciler struct { //nolint:revive
+ client.Client
+ Scheme *runtime.Scheme
+
+ Log logr.Logger
+
+ Provider provider.Provider
+
+ Updater status.Updater
+ Readier readiness.ReadinessManager
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
+
+ bdr := ctrl.NewControllerManagedBy(mgr).
+ For(&gatewayv1alpha2.TCPRoute{}).
+ WithEventFilter(predicate.GenerationChangedPredicate{}).
+ Watches(&discoveryv1.EndpointSlice{},
+
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesByServiceRef),
+ ).
+ Watches(&gatewayv1.Gateway{},
+
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGateway),
+ builder.WithPredicates(
+ predicate.Funcs{
+ GenericFunc: func(e event.GenericEvent)
bool {
+ return false
+ },
+ DeleteFunc: func(e event.DeleteEvent)
bool {
+ return false
+ },
+ CreateFunc: func(e event.CreateEvent)
bool {
+ return true
+ },
+ UpdateFunc: func(e event.UpdateEvent)
bool {
+ return true
+ },
+ },
+ ),
+ ).
+ Watches(&v1alpha1.BackendTrafficPolicy{},
+
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForBackendTrafficPolicy),
+ ).
+ Watches(&v1alpha1.GatewayProxy{},
+
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGatewayProxy),
+ )
+
+ if GetEnableReferenceGrant() {
+ bdr.Watches(&v1beta1.ReferenceGrant{},
+
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForReferenceGrant),
+
builder.WithPredicates(referenceGrantPredicates(KindTCPRoute)),
+ )
+ }
+
+ return bdr.Complete(r)
+}
+
+func (r *TCPRouteReconciler) listTCPRoutesForBackendTrafficPolicy(ctx
context.Context, obj client.Object) []reconcile.Request {
+ policy, ok := obj.(*v1alpha1.BackendTrafficPolicy)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to BackendTrafficPolicy")
+ return nil
+ }
+
+ tcprouteList := []gatewayv1alpha2.TCPRoute{}
+ for _, targetRef := range policy.Spec.TargetRefs {
+ service := &corev1.Service{}
+ if err := r.Get(ctx, client.ObjectKey{
+ Namespace: policy.Namespace,
+ Name: string(targetRef.Name),
+ }, service); err != nil {
+ if client.IgnoreNotFound(err) != nil {
+ r.Log.Error(err, "failed to get service",
"namespace", policy.Namespace, "name", targetRef.Name)
+ }
+ continue
+ }
+ tcprList := &gatewayv1alpha2.TCPRouteList{}
+ if err := r.List(ctx, tcprList, client.MatchingFields{
+ indexer.ServiceIndexRef:
indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)),
+ }); err != nil {
+ r.Log.Error(err, "failed to list tcproutes by service
reference", "service", targetRef.Name)
+ return nil
+ }
+ tcprouteList = append(tcprouteList, tcprList.Items...)
+ }
+ var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{})
+ requests := make([]reconcile.Request, 0, len(tcprouteList))
+ for _, tr := range tcprouteList {
+ key := k8stypes.NamespacedName{
+ Namespace: tr.Namespace,
+ Name: tr.Name,
+ }
+ if _, ok := namespacedNameMap[key]; !ok {
+ namespacedNameMap[key] = struct{}{}
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace: tr.Namespace,
+ Name: tr.Name,
+ },
+ })
+ }
+ }
+ return requests
+}
+
+func (r *TCPRouteReconciler) listTCPRoutesForGateway(ctx context.Context, obj
client.Object) []reconcile.Request {
+ gateway, ok := obj.(*gatewayv1.Gateway)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to Gateway")
+ }
+ tcprList := &gatewayv1alpha2.TCPRouteList{}
+ if err := r.List(ctx, tcprList, client.MatchingFields{
+ indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace,
gateway.Name),
+ }); err != nil {
+ r.Log.Error(err, "failed to list tcproutes by gateway",
"gateway", gateway.Name)
+ return nil
+ }
+
+ requests := make([]reconcile.Request, 0, len(tcprList.Items))
+ for _, tcr := range tcprList.Items {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace: tcr.Namespace,
+ Name: tcr.Name,
+ },
+ })
+ }
+ return requests
+}
+
+// listTCPRoutesForGatewayProxy list all TCPRoute resources that are affected
by a given GatewayProxy
+func (r *TCPRouteReconciler) listTCPRoutesForGatewayProxy(ctx context.Context,
obj client.Object) []reconcile.Request {
+ gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to GatewayProxy")
+ return nil
+ }
+
+ namespace := gatewayProxy.GetNamespace()
+ name := gatewayProxy.GetName()
+
+ // find all gateways that reference this gateway proxy
+ gatewayList := &gatewayv1.GatewayList{}
+ if err := r.List(ctx, gatewayList, client.MatchingFields{
+ indexer.ParametersRef: indexer.GenIndexKey(namespace, name),
+ }); err != nil {
+ r.Log.Error(err, "failed to list gateways for gateway proxy",
"gatewayproxy", gatewayProxy.GetName())
+ return nil
+ }
+
+ var requests []reconcile.Request
+
+ // for each gateway, find all TCPRoute resources that reference it
+ for _, gateway := range gatewayList.Items {
+ tcpRouteList := &gatewayv1alpha2.TCPRouteList{}
+ if err := r.List(ctx, tcpRouteList, client.MatchingFields{
+ indexer.ParentRefs:
indexer.GenIndexKey(gateway.Namespace, gateway.Name),
+ }); err != nil {
+ r.Log.Error(err, "failed to list tcproutes for
gateway", "gateway", gateway.Name)
+ continue
+ }
+
+ for _, tcpRoute := range tcpRouteList.Items {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{
+ Namespace: tcpRoute.Namespace,
+ Name: tcpRoute.Name,
+ },
+ })
+ }
+ }
+
+ return requests
+}
+
+func (r *TCPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request)
(ctrl.Result, error) {
+ defer r.Readier.Done(&gatewayv1alpha2.TCPRoute{}, req.NamespacedName)
+ tr := new(gatewayv1alpha2.TCPRoute)
+ if err := r.Get(ctx, req.NamespacedName, tr); err != nil {
+ if client.IgnoreNotFound(err) == nil {
+ tr.Namespace = req.Namespace
+ tr.Name = req.Name
+
+ tr.TypeMeta = metav1.TypeMeta{
+ Kind: KindTCPRoute,
+ APIVersion:
gatewayv1alpha2.GroupVersion.String(),
+ }
+
+ if err := r.Provider.Delete(ctx, tr); err != nil {
+ r.Log.Error(err, "failed to delete tcproute",
"tcproute", tr)
+ return ctrl.Result{}, err
+ }
+ return ctrl.Result{}, nil
+ }
+ return ctrl.Result{}, err
+ }
+
+ type ResourceStatus struct {
+ status bool
+ msg string
+ }
+
+ acceptStatus := ResourceStatus{
+ status: true,
+ msg: "Route is accepted",
+ }
+
+ gateways, err := ParseRouteParentRefs(ctx, r.Client, tr,
tr.Spec.ParentRefs)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
+ if len(gateways) == 0 {
+ return ctrl.Result{}, nil
+ }
+
+ tctx := provider.NewDefaultTranslateContext(ctx)
+
+ tctx.RouteParentRefs = tr.Spec.ParentRefs
+ rk := utils.NamespacedNameKind(tr)
+ for _, gateway := range gateways {
+ if err := ProcessGatewayProxy(r.Client, r.Log, tctx,
gateway.Gateway, rk); err != nil {
+ acceptStatus.status = false
+ acceptStatus.msg = err.Error()
+ }
+ }
+
+ var backendRefErr error
+ if err := r.processTCPRoute(tctx, tr); err != nil {
+ // When encountering a backend reference error, it should not
affect the acceptance status
+ if types.IsSomeReasonError(err,
gatewayv1.RouteReasonInvalidKind) {
+ backendRefErr = err
+ } else {
+ acceptStatus.status = false
+ acceptStatus.msg = err.Error()
+ }
+ }
+
+ // Store the backend reference error for later use.
+ // If the backend reference error is because of an invalid kind, use
this error first
+ if err := r.processTCPRouteBackendRefs(tctx, req.NamespacedName); err
!= nil && backendRefErr == nil {
+ backendRefErr = err
+ }
+
+ ProcessBackendTrafficPolicy(r.Client, r.Log, tctx)
+ tr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0,
len(gateways))
+ for _, gateway := range gateways {
+ parentStatus := gatewayv1.RouteParentStatus{}
+ SetRouteParentRef(&parentStatus, gateway.Gateway.Name,
gateway.Gateway.Namespace)
+ for _, condition := range gateway.Conditions {
+ parentStatus.Conditions =
MergeCondition(parentStatus.Conditions, condition)
+ }
+ SetRouteConditionAccepted(&parentStatus, tr.GetGeneration(),
acceptStatus.status, acceptStatus.msg)
+ SetRouteConditionResolvedRefs(&parentStatus,
tr.GetGeneration(), backendRefErr)
+
+ tr.Status.Parents = append(tr.Status.Parents, parentStatus)
+ }
+
+ r.Updater.Update(status.Update{
+ NamespacedName: utils.NamespacedName(tr),
+ Resource: &gatewayv1alpha2.TCPRoute{},
+ Mutator: status.MutatorFunc(func(obj client.Object)
client.Object {
+ t, ok := obj.(*gatewayv1alpha2.TCPRoute)
+ if !ok {
+ err := fmt.Errorf("unsupported object type %T",
obj)
+ panic(err)
+ }
+ tCopy := t.DeepCopy()
+ tCopy.Status = tr.Status
+ return tCopy
+ }),
+ })
+ UpdateStatus(r.Updater, r.Log, tctx)
+ if isRouteAccepted(gateways) {
+ routeToUpdate := tr
+ if err := r.Provider.Update(ctx, tctx, routeToUpdate); err !=
nil {
+ return ctrl.Result{}, err
+ }
+ }
+ return ctrl.Result{}, nil
+}
+
+func (r *TCPRouteReconciler) processTCPRoute(tctx *provider.TranslateContext,
tcpRoute *gatewayv1alpha2.TCPRoute) error {
+ var terror error
+ for _, rule := range tcpRoute.Spec.Rules {
+ for _, backend := range rule.BackendRefs {
+ if backend.Kind != nil && *backend.Kind != KindService {
+ terror =
types.NewInvalidKindError(*backend.Kind)
+ continue
+ }
+ tctx.BackendRefs = append(tctx.BackendRefs,
gatewayv1.BackendRef{
+ BackendObjectReference:
gatewayv1.BackendObjectReference{
+ Name: backend.Name,
+ Namespace: cmp.Or(backend.Namespace,
(*gatewayv1.Namespace)(&tcpRoute.Namespace)),
+ Port: backend.Port,
+ },
+ })
+ }
+ }
+
+ return terror
+}
+
+func (r *TCPRouteReconciler) processTCPRouteBackendRefs(tctx
*provider.TranslateContext, trNN k8stypes.NamespacedName) error {
+ var terr error
+ for _, backend := range tctx.BackendRefs {
+ targetNN := k8stypes.NamespacedName{
+ Namespace: trNN.Namespace,
+ Name: string(backend.Name),
+ }
+ if backend.Namespace != nil {
+ targetNN.Namespace = string(*backend.Namespace)
+ }
+
+ if backend.Kind != nil && *backend.Kind != KindService {
+ terr = types.NewInvalidKindError(*backend.Kind)
+ continue
+ }
+
+ if backend.Port == nil {
+ terr = fmt.Errorf("port is required")
+ continue
+ }
+
+ var service corev1.Service
+ if err := r.Get(tctx, targetNN, &service); err != nil {
+ terr = err
+ if client.IgnoreNotFound(err) == nil {
+ terr = types.ReasonError{
+ Reason:
string(gatewayv1.RouteReasonBackendNotFound),
+ Message: fmt.Sprintf("Service %s not
found", targetNN),
+ }
+ }
+ continue
+ }
+
+ // if cross namespaces between TCPRoute and referenced Service,
check ReferenceGrant
+ if trNN.Namespace != targetNN.Namespace {
+ if permitted := checkReferenceGrant(tctx,
+ r.Client,
+ v1beta1.ReferenceGrantFrom{
+ Group: gatewayv1.GroupName,
+ Kind: KindTCPRoute,
+ Namespace:
v1beta1.Namespace(trNN.Namespace),
+ },
+ gatewayv1.ObjectReference{
+ Group: corev1.GroupName,
+ Kind: KindService,
+ Name:
gatewayv1.ObjectName(targetNN.Name),
+ Namespace:
(*gatewayv1.Namespace)(&targetNN.Namespace),
+ },
+ ); !permitted {
+ terr = types.ReasonError{
+ Reason:
string(v1beta1.RouteReasonRefNotPermitted),
+ Message: fmt.Sprintf("%s is in a
different namespace than the TCPRoute %s and no ReferenceGrant allowing
reference is configured", targetNN, trNN),
+ }
+ continue
+ }
+ }
+
+ if service.Spec.Type == corev1.ServiceTypeExternalName {
+ tctx.Services[targetNN] = &service
+ continue
+ }
+
+ portExists := false
+ for _, port := range service.Spec.Ports {
+ if port.Port == int32(*backend.Port) {
+ portExists = true
+ break
+ }
+ }
+ if !portExists {
+ terr = fmt.Errorf("port %d not found in service %s",
*backend.Port, targetNN.Name)
+ continue
+ }
+ tctx.Services[targetNN] = &service
+
+ endpointSliceList := new(discoveryv1.EndpointSliceList)
+ if err := r.List(tctx, endpointSliceList,
+ client.InNamespace(targetNN.Namespace),
+ client.MatchingLabels{
+ discoveryv1.LabelServiceName: targetNN.Name,
+ },
+ ); err != nil {
+ r.Log.Error(err, "failed to list endpoint slices",
"Service", targetNN)
+ terr = err
+ continue
+ }
+
+ tctx.EndpointSlices[targetNN] = endpointSliceList.Items
+ }
+ return terr
+}
+
+func (r *TCPRouteReconciler) listTCPRoutesForReferenceGrant(ctx
context.Context, obj client.Object) (requests []reconcile.Request) {
+ grant, ok := obj.(*v1beta1.ReferenceGrant)
+ if !ok {
+ r.Log.Error(fmt.Errorf("unexpected object type"), "failed to
convert object to ReferenceGrant")
+ return nil
+ }
+
+ var tcpRouteList gatewayv1alpha2.TCPRouteList
+ if err := r.List(ctx, &tcpRouteList); err != nil {
+ r.Log.Error(err, "failed to list tcproutes for reference
ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace:
obj.GetNamespace(), Name: obj.GetName()})
+ return nil
+ }
+
+ for _, tcpRoute := range tcpRouteList.Items {
+ hr := v1beta1.ReferenceGrantFrom{
Review Comment:
fixed
##########
internal/controller/indexer/indexer.go:
##########
@@ -500,6 +535,24 @@ func HTTPRouteServiceIndexFunc(rawObj client.Object)
[]string {
return keys
}
+func TCPPRouteServiceIndexFunc(rawObj client.Object) []string {
+ hr := rawObj.(*gatewayv1alpha2.TCPRoute)
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]