Copilot commented on code in PR #1340: URL: https://github.com/apache/dubbo-admin/pull/1340#discussion_r2446485675
########## pkg/engine/kubernetes/engine.go: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "fmt" + "reflect" + + "github.com/duke-git/lancet/v2/slice" + "github.com/duke-git/lancet/v2/strutil" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/common/bizerror" + enginecfg "github.com/apache/dubbo-admin/pkg/config/engine" + "github.com/apache/dubbo-admin/pkg/core/consts" + "github.com/apache/dubbo-admin/pkg/core/controller" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" +) + +type PodListerWatcher struct { + cfg *enginecfg.Config + lw cache.ListerWatcher +} + +var _ controller.ResourceListerWatcher = &PodListerWatcher{} + +func NewPodListWatcher(clientset *kubernetes.Clientset, cfg *enginecfg.Config) (*PodListerWatcher, error) { + var selector fields.Selector + s := cfg.Properties.PodWatchSelector + if strutil.IsBlank(s) { + selector = fields.Everything() + } + selector, err := fields.ParseSelector(s) + if err != nil { + return nil, fmt.Errorf("parse selector %s failed: %v", s, err) + } Review Comment: Variable `selector` is declared twice: first initialized on line 55 when `s` is blank, then reassigned on line 57. If `s` is blank, line 57 will overwrite the `fields.Everything()` value with the result of parsing an empty string, which may not be the intended behavior. Remove the assignment on line 55 or use a conditional to skip parsing when `s` is blank. ```suggestion } else { var err error selector, err = fields.ParseSelector(s) if err != nil { return nil, fmt.Errorf("parse selector %s failed: %v", s, err) } } ``` ########## pkg/core/runtime/runtime.go: ########## @@ -115,13 +115,18 @@ func (rt *runtime) Add(components ...Component) { func (rt *runtime) Start(stop <-chan struct{}) error { components := maputil.Values(rt.components) slice.SortBy(components, func(a, b Component) bool { - return a.Order() < b.Order() + return a.Order() > b.Order() }) for _, com := range components { - err := com.Start(rt, stop) - if err != nil { - return err - } + go func() { + err := com.Start(rt, stop) + if err != nil { + panic("component " + com.Type() + " running failed with error: " + err.Error()) + } + }() Review Comment: Starting each component in a separate goroutine without synchronization means errors won't be returned to the caller, and a panic in one component's goroutine won't be caught by recover in the parent. Consider using an error group (e.g., `errgroup.Group`) to properly propagate errors and coordinate shutdown. ########## pkg/core/controller/informer.go: ########## @@ -91,13 +102,11 @@ type informer struct { transform cache.TransformFunc } -func NewInformerWithOptions(lw cache.ListerWatcher, emitter events.Emitter, store store.ResourceStore, - exampleObject runtime.Object, options Options) Informer { +func NewInformerWithOptions(lw cache.ListerWatcher, emitter events.Emitter, store store.ResourceStore, options Options) Informer { return &informer{ indexer: store, listerWatcher: lw, emitter: emitter, Review Comment: The `objectType` field is no longer set in the constructor (previously set on line 109 before removal), but it may still be referenced elsewhere in the `informer` struct. Verify that removing this initialization doesn't break functionality or remove the field from the struct definition. ```suggestion func NewInformerWithOptions(lw cache.ListerWatcher, emitter events.Emitter, store store.ResourceStore, objectType runtime.Object, options Options) Informer { return &informer{ indexer: store, listerWatcher: lw, emitter: emitter, objectType: objectType, ``` ########## pkg/core/engine/subscriber/runtime_instance.go: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subscriber + +import ( + "errors" + "reflect" + + "github.com/duke-git/lancet/v2/strutil" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" + "github.com/apache/dubbo-admin/pkg/core/store/index" +) + +type RuntimeInstanceEventSubscriber struct { + instanceResourceStore store.ResourceStore + eventEmitter events.Emitter +} + +func (s *RuntimeInstanceEventSubscriber) ResourceKind() coremodel.ResourceKind { + return meshresource.RuntimeInstanceKind +} + +func (s *RuntimeInstanceEventSubscriber) Name() string { + return "Engine-" + s.ResourceKind().ToString() +} + +func (s *RuntimeInstanceEventSubscriber) ProcessEvent(event events.Event) error { + newObj, ok := event.NewObj().(*meshresource.RuntimeInstanceResource) + if !ok && newObj != nil { + return bizerror.NewAssertionError(meshresource.RuntimeInstanceKind, reflect.TypeOf(event.NewObj()).Name()) + } + oldObj, ok := event.OldObj().(*meshresource.RuntimeInstanceResource) + if !ok && oldObj != nil { + return bizerror.NewAssertionError(meshresource.RuntimeInstanceKind, reflect.TypeOf(event.OldObj()).Name()) + } + var processErr error + switch event.Type() { + case cache.Added, cache.Updated, cache.Replaced, cache.Sync: + if newObj == nil { + errStr := "process runtime instance upsert event, but new obj is nil, skipped processing" + logger.Error(errStr) + return errors.New(errStr) + } + processErr = s.processUpsert(newObj) + case cache.Deleted: + if oldObj == nil { + errStr := "process runtime instance delete event, but old obj is nil, skipped processing" + logger.Error(errStr) + return errors.New(errStr) + } + processErr = s.processDelete(oldObj) + } + eventStr := event.String() + if processErr == nil { + logger.Infof("process runtime instance event successfully, event: %s", eventStr) + } else { + logger.Errorf("process runtime instance event failed, event: %s, err: %s", eventStr, processErr.Error()) + } + return processErr +} + +func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceResource( + rtInstance *meshresource.RuntimeInstanceResource) (*meshresource.InstanceResource, error) { + resources, err := s.instanceResourceStore.ListByIndexes(map[string]string{ + index.ByInstanceIpIndex: rtInstance.Spec.Ip, + }) + if err != nil { + return nil, err + } + if len(resources) == 0 { + return nil, nil + } + instanceResources := make([]*meshresource.InstanceResource, len(resources)) + for i, item := range resources { + if res, ok := item.(*meshresource.InstanceResource); ok { + instanceResources[i] = res + } else { + return nil, bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(item).Name()) + } + } + return instanceResources[0], nil +} + +func (s *RuntimeInstanceEventSubscriber) mergeRuntimeInstance( + instanceRes *meshresource.InstanceResource, + rtInstanceRes *meshresource.RuntimeInstanceResource) { + instanceRes.Name = rtInstanceRes.Name + instanceRes.Spec.Name = rtInstanceRes.Spec.Name + instanceRes.Spec.Ip = rtInstanceRes.Spec.Ip + instanceRes.Labels = rtInstanceRes.Labels + instanceRes.Spec.Image = rtInstanceRes.Spec.Image + instanceRes.Spec.CreateTime = rtInstanceRes.Spec.CreateTime + instanceRes.Spec.StartTime = rtInstanceRes.Spec.StartTime + instanceRes.Spec.ReadyTime = rtInstanceRes.Spec.ReadyTime + instanceRes.Spec.DeployState = rtInstanceRes.Spec.Phase + instanceRes.Spec.WorkloadType = rtInstanceRes.Spec.WorkloadType + instanceRes.Spec.WorkloadName = rtInstanceRes.Spec.WorkloadName + instanceRes.Spec.Node = rtInstanceRes.Spec.Node + instanceRes.Spec.Probes = rtInstanceRes.Spec.Probes + instanceRes.Spec.Conditions = rtInstanceRes.Spec.Conditions +} + +func (s *RuntimeInstanceEventSubscriber) fromRuntimeInstance( + rtInstanceRes *meshresource.RuntimeInstanceResource) *meshresource.InstanceResource { + instanceRes := meshresource.NewInstanceResourceWithAttributes(rtInstanceRes.Name, rtInstanceRes.Mesh) + s.mergeRuntimeInstance(instanceRes, rtInstanceRes) + return instanceRes +} + +// processUpsert when runtime instance added or updated, we should add/update the corresponding instance resource +func (s *RuntimeInstanceEventSubscriber) processUpsert(rtInstanceRes *meshresource.RuntimeInstanceResource) error { + instanceResource, err := s.getRelatedInstanceResource(rtInstanceRes) + if err != nil { + return err + } + // If instance resource exists, the rpc instance resource exists in remote registry and has been watched by discovery. + // So we should merge the runtime info into it + if instanceResource != nil { + s.mergeRuntimeInstance(instanceResource, rtInstanceRes) + return s.instanceResourceStore.Update(instanceResource) + } + // If instance resource does not exist, we should create a new instance resource by runtime instance + // If the app name is empty, we cannot identify it as a dubbo app, so we skip it + if strutil.IsBlank(rtInstanceRes.Spec.AppName) { + logger.Warnf("cannot identify runtime instance %s as a dubbo app, skipped updating instance", rtInstanceRes.Name) + return nil + } + // Otherwise we can create a new instance resource by runtime instance + instanceRes := s.fromRuntimeInstance(rtInstanceRes) + if err = s.instanceResourceStore.Add(instanceRes); err != nil { + logger.Errorf("add instance resource failed, instance: %s, err: %s", instanceRes.ResourceKey(), err.Error()) + return err + } + instanceAddEvent := events.NewResourceChangedEvent(cache.Added, nil, instanceRes) + s.eventEmitter.Send(instanceAddEvent) + logger.Debugf("runtime instance upsert trigger instance add event, event: %s", instanceAddEvent.String()) + return nil +} + +// processDelete when runtime instance deleted, we should delete the corresponding instance resource +func (s *RuntimeInstanceEventSubscriber) processDelete(rtInstanceRes *meshresource.RuntimeInstanceResource) error { + instanceResource, err := s.getRelatedInstanceResource(rtInstanceRes) + if err != nil { + return err + } + if instanceResource == nil { + return nil + } + if err = s.instanceResourceStore.Delete(instanceResource.ResourceKey()); err != nil { + logger.Errorf("delete instance resource failed, instance: %s, err: %s", instanceResource.ResourceKey(), err.Error()) + return err + } + instanceDeleteEvent := events.NewResourceChangedEvent(cache.Deleted, instanceResource, nil) + s.eventEmitter.Send(instanceDeleteEvent) + logger.Debugf("runtime instance delete trigger instance delete event, event: %s", instanceDeleteEvent.String()) + return nil +} + +func NewRuntimeInstanceEventSubscriber(instanceResourceStore store.ResourceStore) events.Subscriber { + return &RuntimeInstanceEventSubscriber{ + instanceResourceStore: instanceResourceStore, Review Comment: The `eventEmitter` field is not initialized in the constructor but is used in `processUpsert` (line 157) and `processDelete` (line 176). This will cause a nil pointer dereference. Add an `eventEmitter` parameter to the constructor. ```suggestion func NewRuntimeInstanceEventSubscriber(instanceResourceStore store.ResourceStore, eventEmitter events.Emitter) events.Subscriber { return &RuntimeInstanceEventSubscriber{ instanceResourceStore: instanceResourceStore, eventEmitter: eventEmitter, ``` ########## pkg/diagnostics/server.go: ########## @@ -79,31 +76,30 @@ func (s *diagnosticsServer) Start(_ runtime.Runtime, stop <-chan struct{}) error Addr: fmt.Sprintf(":%d", s.config.ServerPort), Handler: mux, ReadHeaderTimeout: time.Second, - ErrorLog: adapter.ToStd(diagnosticsServerLog), } - diagnosticsServerLog.Info("starting diagnostic server", "interface", "0.0.0.0", "port", s.config.ServerPort) + logger.Infof("starting diagnostic server, endpoint is 0.0.0.0: %d", s.config.ServerPort) errChan := make(chan error) go func() { defer close(errChan) var err error err = httpServer.ListenAndServe() if err != nil { - switch err { - case http.ErrServerClosed: - diagnosticsServerLog.Info("shutting down server") + switch { + case errors.Is(err, http.ErrServerClosed): + logger.Errorf("diagnostics http server closed, err: %s", err) Review Comment: The log message says 'diagnostics http server closed' with an error level, but `http.ErrServerClosed` is the expected normal shutdown error. This should be logged at Info level with message 'shutting down server' (matching the original behavior) rather than as an error. ```suggestion logger.Info("shutting down diagnostics server") ``` ########## pkg/diagnostics/server.go: ########## @@ -79,31 +76,30 @@ func (s *diagnosticsServer) Start(_ runtime.Runtime, stop <-chan struct{}) error Addr: fmt.Sprintf(":%d", s.config.ServerPort), Handler: mux, ReadHeaderTimeout: time.Second, - ErrorLog: adapter.ToStd(diagnosticsServerLog), } - diagnosticsServerLog.Info("starting diagnostic server", "interface", "0.0.0.0", "port", s.config.ServerPort) + logger.Infof("starting diagnostic server, endpoint is 0.0.0.0: %d", s.config.ServerPort) errChan := make(chan error) go func() { defer close(errChan) var err error err = httpServer.ListenAndServe() if err != nil { - switch err { - case http.ErrServerClosed: - diagnosticsServerLog.Info("shutting down server") + switch { + case errors.Is(err, http.ErrServerClosed): + logger.Errorf("diagnostics http server closed, err: %s", err) default: - diagnosticsServerLog.Error(err, "could not start HTTP Server") + logger.Error("could not start diagnostics http Server, unknown err: %s", err) Review Comment: Corrected capitalization of 'Server' to 'server'. ```suggestion logger.Error("could not start diagnostics http server, unknown err: %s", err) ``` ########## app/dubbo-admin/dubbo-admin.yaml: ########## @@ -39,17 +39,55 @@ console: store: type: memory discovery: - - type: nacos - id: nacos-44.33 - address: - registry: nacos://47.76.94.134:8848?username=nacos&password=nacos - configCenter: nacos://47.76.94.134:8848?username=nacos&password=nacos - metadataReport: nacos://47.76.94.134:8848?username=nacos&password=nacos - - type: istio +# - type: nacos +# id: nacos-44.33 +# address: +# registry: nacos://47.76.94.134:8848?username=nacos&password=nacos +# configCenter: nacos://47.76.94.134:8848?username=nacos&password=nacos +# metadataReport: nacos://47.76.94.134:8848?username=nacos&password=nacos + # mock discovery is only for development + - type: mock engine: name: k8s1.28.6 type: kubernetes properties: - apiServerAddress: https://192.168.1.1:6443 - kubeConfig: /etc/kubernetes/admin.conf + # [Kubernetes] Path to kubernetes config file, if not set, will use in cluster config + kubeConfigPath: /root/.kube/config + # [Kubernetes] Watch pods with specified labels, if not set, will watch all pods + # podWatchSelector: org.apache.dubbo/dubbo-apps=true + # [Kubernetes] Identify which Dubbo app the pod belongs to, if not set, [type = ByIP] will be used + # 1. ByLabels: Use the label value corresponding to the labelKey as the dubbo app name + # e.g. + # type: ByLabel + # labelKey: org.apache.dubbo/dubbo-app-name + # 2. ByAnnotation: Use the annotation value corresponding to the annotationKey as the dubbo app name + # e.g. + # type: ByAnnotation + # annotationKey: org.apache.dubbo/dubbo-app-name + # 3. ByIP(default): Use pod's IP to find if there is a same ip of an instance and use the instance's app name as the identifier, + # if there is no such association, the pod will not be seen as a pod of dubbo application. + # e.g. + # type: ByIP +# dubboAppIdentifier: +# type: ByLabel +# labelKey: org.apache.dubbo/dubbo-app-name + # [Kubernetes] Strategy of choosing the main container, if not set, [type = ByIndex] and [index = 0] will be used + # 1. ByLast: choose the last container as the main container + # e.g. + # type: ByLast + # 2. ByIndex(default): choose the container at the specified index location as the main container + # e.g. + # type: ByIndex + # index: 0 + # 3. ByName: choose the container with the specified name + # e.g. + # type: ByName + # name: main + # 4. chooseByAnnotation: choose the container with the annotation key, specified annotation value will be used as the container name Review Comment: Corrected spelling of 'chooseByAnnotation' to 'ByAnnotation' to match the constant name. ```suggestion # 4. ByAnnotation: choose the container with the annotation key, specified annotation value will be used as the container name ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
