Copilot commented on code in PR #1371:
URL: https://github.com/apache/dubbo-admin/pull/1371#discussion_r2635260578


##########
pkg/discovery/zk/listerwatcher/listerwatcher.go:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package listerwatcher
+
+import (
+       "fmt"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       k8sruntime "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher"
+)
+
+type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) 
coremodel.Resource
+
+type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource
+
+type ListerWatcher[T coremodel.Resource] struct {
+       basePath             string
+       rk                   coremodel.ResourceKind
+       conn                 *zk.Conn
+       cfg                  *discoverycfg.Config
+       toUpsertResourceFunc ToUpsertResourceFunc
+       toDeleteResourceFunc ToDeleteResourceFunc
+       newResourceFunc      coremodel.NewResourceFunc
+       newResListFunc       coremodel.NewResourceListFunc
+       resultChan           chan watch.Event
+       stopChan             chan bool
+}
+
+func NewListerWatcher(
+       rk coremodel.ResourceKind,
+       toResourceFunc ToUpsertResourceFunc,
+       toDeleteResourceFunc ToDeleteResourceFunc,
+       basePath string,
+       conn *zk.Conn,
+       cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) {
+       newResourceFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       newResListFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       return &ListerWatcher[coremodel.Resource]{
+               rk:                   rk,
+               toUpsertResourceFunc: toResourceFunc,
+               toDeleteResourceFunc: toDeleteResourceFunc,
+               basePath:             basePath,
+               conn:                 conn,
+               cfg:                  cfg,
+               newResourceFunc:      newResourceFunc,
+               newResListFunc:       newResListFunc,
+               resultChan:           make(chan watch.Event, 1000),

Review Comment:
   The resultChan buffer size of 1000 (line 78) is arbitrary and could lead to 
blocking if watch events are produced faster than consumed by the client. 
Consider making this configurable or implementing backpressure handling.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool
+}
+
+// RecursiveWatcher recursive watcher
+type RecursiveWatcher struct {
+       conn      *zk.Conn
+       basePath  string
+       eventChan chan ZookeeperEvent
+       stopChan  chan struct{}
+       mu        sync.Mutex
+}
+
+// NewRecursiveWatcher create a new recursive watcher
+func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher {
+       return &RecursiveWatcher{
+               conn:      conn,
+               basePath:  basePath,
+               eventChan: make(chan ZookeeperEvent, 1000),
+               stopChan:  make(chan struct{}),
+       }
+}
+
+// Start begin watching
+func (rw *RecursiveWatcher) Start() error {
+       logger.Infof("Start watching path: %s", rw.basePath)
+       // Recursively watch the initial path
+       return rw.watchPathRecursively(rw.basePath)
+}
+
+// Stop watching
+func (rw *RecursiveWatcher) Stop() {
+       close(rw.stopChan)
+       rw.conn.Close()
+       close(rw.eventChan)
+}
+
+// EventChan return event channel
+func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent {
+       return rw.eventChan
+}
+
+// watchPathRecursively watch path recursively
+func (rw *RecursiveWatcher) watchPathRecursively(path string) error {
+       // Watch data changes for the current path
+       go rw.watchDataChanges(path)
+
+       // Get children of current path and watch
+       children, _, err := rw.conn.Children(path)
+       if err != nil {
+               logger.Errorf("Failed to get children of path: %s, cause: %v", 
path, err)
+               return err
+       }
+       // Watch children list changes
+       go rw.watchChildrenChanges(path)
+
+       // Recursively watch child nodes
+       for _, child := range children {
+               childPath := path + "/" + child
+               if err := rw.watchPathRecursively(childPath); err != nil {
+                       logger.Errorf("Failed to watch subpath %s: %v", 
childPath, err)
+               }
+       }
+       return nil
+}
+
+// watchDataChanges watch node data changes
+func (rw *RecursiveWatcher) watchDataChanges(path string) {
+       logger.Debugf("Watching data changes for path: %s", path)
+       for {
+
+               select {
+               case <-rw.stopChan:
+                       return
+               default:
+               }
+
+               // Get data and set up watch
+               data, stat, watcher, err := rw.conn.GetW(path)
+               if err != nil {
+                       logger.Errorf("Failed to watch data changes for path 
%s, cause: %v", path, err)
+                       time.Sleep(time.Second)
+                       continue
+               }
+               rw.eventChan <- ZookeeperEvent{
+                       Type:     NodeCreated,
+                       Path:     path,
+                       Data:     string(data),
+                       leafNode: stat.NumChildren == 0,
+               }
+
+               select {
+               case event := <-watcher.EvtCh:
+                       if event.Type == zk.EventNodeDataChanged {
+                               logger.Debugf("node data changed: %s", path)
+
+                               // Re-watch data changes
+                               go rw.watchDataChanges(path)
+                               return
+                       } else if event.Type == zk.EventNodeDeleted {
+                               // Node deleted, stop watching
+                               logger.Debugf("node deleted: %s", path)
+                               rw.eventChan <- ZookeeperEvent{
+                                       Type:     NodeDeleted,
+                                       Path:     path,
+                                       leafNode: stat.NumChildren == 0,
+                               }
+                               return
+                       }
+               case <-rw.stopChan:
+                       return
+               }
+
+               // If node exists but no changes, continue watching
+               if stat != nil {
+                       continue
+               }
+       }
+}
+
+// watchChildrenChanges watch child node changes
+func (rw *RecursiveWatcher) watchChildrenChanges(path string) {
+       logger.Debugf("Watching child node changes for path: %s", path)
+       for {
+               // Check if stopped
+               select {
+               case <-rw.stopChan:
+                       return
+               default:
+               }
+
+               // Get child nodes and set up watch
+               children, stat, watcher, err := rw.conn.ChildrenW(path)
+               if err != nil {
+                       logger.Errorf("Failed to watch child node changes for 
path %s: %v", path, err)
+                       time.Sleep(time.Second)
+                       continue
+               }
+
+               select {
+               case event := <-watcher.EvtCh:
+                       if event.Type == zk.EventNodeChildrenChanged {
+                               logger.Debugf("Child node list changed: %s", 
path)
+
+                               // Get current child nodes
+                               newChildren, _, err := rw.conn.Children(path)
+                               if err != nil {
+                                       logger.Debugf("Failed to get new child 
nodes: %v", err)
+                                       continue
+                               }
+                               // Calculate added and deleted child nodes
+                               newChildSet := set.FromSlice(newChildren)
+                               oldChildSet := set.FromSlice(children)
+                               addedChildren := newChildSet.Minus(oldChildSet)
+                               deletedChildren := 
oldChildSet.Minus(newChildSet)
+
+                               // watch newly added child nodes
+                               for _, c := range addedChildren.ToSlice() {
+                                       childPath := path + "/" + c
+                                       logger.Debugf("New child node added: 
%s", childPath)
+                                       // Recursively watch new node
+                                       go func() {
+                                               err := 
rw.watchPathRecursively(childPath)
+                                               if err != nil {
+                                                       logger.Errorf("Failed 
to watch subpath %s: %v", childPath, err)
+                                               }
+                                       }()
+                               }
+
+                               // Check for deleted child nodes
+                               for _, c := range deletedChildren.ToSlice() {
+                                       logger.Debugf("child node %s of %s 
deleted", c, path)
+                               }
+
+                               // Re-watch child node changes
+                               go rw.watchChildrenChanges(path)

Review Comment:
   Similarly, watchChildrenChanges spawns a new goroutine recursively (line 
220) without tracking. This could lead to goroutine leaks if child node lists 
change frequently. The same issue applies to the goroutines spawned in lines 
206-211 for newly added child nodes.



##########
pkg/core/resource/apis/mesh/v1alpha1/rpc_instance_helper.go:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package v1alpha1
+
+import (
+       "encoding/json"
+       "strconv"
+       "time"
+
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+       "github.com/duke-git/lancet/v2/maputil"
+
+       meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+func ToRPCInstance(
+       mesh string, appName string, ip string,
+       port int64, metadata map[string]string) *RPCInstanceResource {
+       resName := BuildInstanceResName(appName, ip, port)

Review Comment:
   The function builds an instance resource name using BuildInstanceResName but 
this function is not defined in the visible code. This creates a dependency 
that may not be clear. Ensure that BuildInstanceResName is properly exported 
and documented, especially since it's used in multiple places throughout the 
codebase.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool
+}
+
+// RecursiveWatcher recursive watcher
+type RecursiveWatcher struct {
+       conn      *zk.Conn
+       basePath  string
+       eventChan chan ZookeeperEvent
+       stopChan  chan struct{}
+       mu        sync.Mutex
+}
+
+// NewRecursiveWatcher create a new recursive watcher
+func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher {
+       return &RecursiveWatcher{
+               conn:      conn,
+               basePath:  basePath,
+               eventChan: make(chan ZookeeperEvent, 1000),
+               stopChan:  make(chan struct{}),
+       }
+}
+
+// Start begin watching
+func (rw *RecursiveWatcher) Start() error {
+       logger.Infof("Start watching path: %s", rw.basePath)
+       // Recursively watch the initial path
+       return rw.watchPathRecursively(rw.basePath)
+}
+
+// Stop watching
+func (rw *RecursiveWatcher) Stop() {
+       close(rw.stopChan)
+       rw.conn.Close()
+       close(rw.eventChan)
+}
+
+// EventChan return event channel
+func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent {
+       return rw.eventChan
+}
+
+// watchPathRecursively watch path recursively
+func (rw *RecursiveWatcher) watchPathRecursively(path string) error {
+       // Watch data changes for the current path
+       go rw.watchDataChanges(path)
+
+       // Get children of current path and watch
+       children, _, err := rw.conn.Children(path)
+       if err != nil {
+               logger.Errorf("Failed to get children of path: %s, cause: %v", 
path, err)
+               return err
+       }
+       // Watch children list changes
+       go rw.watchChildrenChanges(path)
+
+       // Recursively watch child nodes
+       for _, child := range children {
+               childPath := path + "/" + child
+               if err := rw.watchPathRecursively(childPath); err != nil {
+                       logger.Errorf("Failed to watch subpath %s: %v", 
childPath, err)
+               }
+       }
+       return nil
+}
+
+// watchDataChanges watch node data changes
+func (rw *RecursiveWatcher) watchDataChanges(path string) {
+       logger.Debugf("Watching data changes for path: %s", path)
+       for {
+
+               select {
+               case <-rw.stopChan:
+                       return
+               default:
+               }
+
+               // Get data and set up watch
+               data, stat, watcher, err := rw.conn.GetW(path)
+               if err != nil {
+                       logger.Errorf("Failed to watch data changes for path 
%s, cause: %v", path, err)
+                       time.Sleep(time.Second)
+                       continue
+               }
+               rw.eventChan <- ZookeeperEvent{
+                       Type:     NodeCreated,
+                       Path:     path,
+                       Data:     string(data),
+                       leafNode: stat.NumChildren == 0,
+               }
+
+               select {
+               case event := <-watcher.EvtCh:
+                       if event.Type == zk.EventNodeDataChanged {
+                               logger.Debugf("node data changed: %s", path)
+
+                               // Re-watch data changes
+                               go rw.watchDataChanges(path)
+                               return

Review Comment:
   The watchDataChanges function spawns a new goroutine recursively (line 142) 
without any mechanism to track or limit the number of goroutines. If a node's 
data changes frequently, this could lead to goroutine leaks and resource 
exhaustion. Consider using a single long-lived goroutine per path or 
implementing a goroutine pool.



##########
pkg/config/discovery/config.go:
##########
@@ -89,11 +78,17 @@ func (c *Config) PreProcess() error {
 }
 
 func (c *Config) Validate() error {
+       if strutil.IsBlank(c.ID) {
+               return bizerror.New(bizerror.InvalidArgument, "discovery id is 
needed")
+       }
        if strutil.IsBlank(c.Name) {
                return bizerror.New(bizerror.InvalidArgument, "discovery name 
is needed")
        }
        if strutil.IsBlank(string(c.Type)) {
                return bizerror.New(bizerror.InvalidArgument, "discovery type 
is needed")
        }
+       if strutil.IsBlank(c.Address.Registry) && c.Type != Mock {
+               return bizerror.New(bizerror.InvalidArgument, "discovery 
address is needed")
+       }

Review Comment:
   The validation on line 90 checks for blank address only when Type is not 
Mock. However, in the PreProcess method (line 63), the same check is already 
performed. This creates inconsistency where PreProcess allows blank addresses 
for Mock type, but Validate enforces it for non-Mock types. Consider removing 
this duplicate validation or ensuring both methods have consistent behavior.
   ```suggestion
   
   ```



##########
pkg/discovery/zk/listerwatcher/listerwatcher.go:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package listerwatcher
+
+import (
+       "fmt"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       k8sruntime "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher"
+)
+
+type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) 
coremodel.Resource
+
+type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource
+
+type ListerWatcher[T coremodel.Resource] struct {
+       basePath             string
+       rk                   coremodel.ResourceKind
+       conn                 *zk.Conn
+       cfg                  *discoverycfg.Config
+       toUpsertResourceFunc ToUpsertResourceFunc
+       toDeleteResourceFunc ToDeleteResourceFunc
+       newResourceFunc      coremodel.NewResourceFunc
+       newResListFunc       coremodel.NewResourceListFunc
+       resultChan           chan watch.Event
+       stopChan             chan bool
+}
+
+func NewListerWatcher(
+       rk coremodel.ResourceKind,
+       toResourceFunc ToUpsertResourceFunc,
+       toDeleteResourceFunc ToDeleteResourceFunc,
+       basePath string,
+       conn *zk.Conn,
+       cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) {
+       newResourceFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       newResListFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       return &ListerWatcher[coremodel.Resource]{
+               rk:                   rk,
+               toUpsertResourceFunc: toResourceFunc,
+               toDeleteResourceFunc: toDeleteResourceFunc,
+               basePath:             basePath,
+               conn:                 conn,
+               cfg:                  cfg,
+               newResourceFunc:      newResourceFunc,
+               newResListFunc:       newResListFunc,
+               resultChan:           make(chan watch.Event, 1000),
+               stopChan:             make(chan bool),
+       }, nil
+}
+
+func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, 
error) {
+       resList, err := lw.listRecursively(lw.basePath)
+       if err != nil {
+               logger.Errorf("list all %s under path %s in zk %s failed, 
cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err)
+               return nil, err
+       }
+       resListObj := lw.newResListFunc()
+       resListObj.SetItems(resList)
+       return resListObj, nil
+}
+
+func (lw *ListerWatcher[T]) listRecursively(path string) 
([]coremodel.Resource, error) {
+       resList := make([]coremodel.Resource, 0)
+       nodeData, _, err := lw.conn.Get(path)
+       if err != nil {
+               errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData))
+       if res != nil {
+               resList = append(resList, res)
+       }
+       children, _, err := lw.conn.Children(path)
+       if err != nil {
+               errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       if len(children) == 0 {
+               return resList, nil
+       }
+       for _, childPath := range children {
+               resources, err := lw.listRecursively(path + 
constants.PathSeparator + childPath)
+               if err != nil {
+                       return nil, err
+               }
+               resList = append(resList, resources...)
+       }
+       return resList, nil
+}
+
+func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, 
error) {
+       watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath)
+       go func() {
+               for {
+                       select {
+                       case event, ok := <-watcher.EventChan():
+                               if !ok {
+                                       logger.Warnf("zookeeper watcher 
stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr())
+                                       return
+                               }
+                               lw.handleEvent(event)
+                       case <-lw.stopChan:
+                               logger.Warnf("stop watching %s in %s,", 
lw.basePath, lw.zkAddr())
+                               return
+                       }
+
+               }
+       }()
+       err := watcher.Start()
+       if err != nil {
+               return nil, err
+       }

Review Comment:
   The Watch method starts the watcher after spawning a goroutine that consumes 
events from it. If watcher.Start() fails (lines 143-146), the goroutine at 
lines 127-142 will continue running and may encounter nil pointer dereferences 
or other issues since the watcher failed to start properly. The goroutine 
should be stopped if Start() fails, or the order should be reversed to start 
the watcher before spawning the consumer goroutine.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool

Review Comment:
   The leafNode field in ZookeeperEvent struct is unexported (lowercase 'l'), 
making it inaccessible to consumers of this event. This appears to be 
unintentional as other fields are exported. Consider changing 'leafNode' to 
'LeafNode' to make it consistent with other fields and accessible to external 
packages.



##########
pkg/core/discovery/subscriber/zk_metadata.go:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package subscriber
+
+import (
+       "fmt"
+       "reflect"
+       "strings"
+
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/core/store"
+)
+
+type ZKMetadataEventSubscriber struct {
+       emitter     events.Emitter
+       storeRouter store.Router
+}
+
+func NewZKMetadataEventSubscriber(eventEmitter events.Emitter, storeRouter 
store.Router) *ZKMetadataEventSubscriber {
+       return &ZKMetadataEventSubscriber{
+               emitter:     eventEmitter,
+               storeRouter: storeRouter,
+       }
+}
+
+func (z *ZKMetadataEventSubscriber) ResourceKind() coremodel.ResourceKind {
+       return meshresource.ZKMetadataKind
+}
+
+func (z *ZKMetadataEventSubscriber) Name() string {
+       return "Discovery-" + z.ResourceKind().ToString()
+}
+
+func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error {
+       newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource)
+       if !ok && event.NewObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(newObj), 
event.NewObj())
+       }
+       oldObj, ok := event.OldObj().(*meshresource.ZKMetadataResource)
+       if !ok && event.OldObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(oldObj), 
event.OldObj())
+       }
+       var processErr error
+       switch event.Type() {
+       case cache.Added, cache.Updated, cache.Replaced, cache.Sync:
+               if newObj == nil || newObj.Spec == nil {
+                       errStr := "process zk metadata upsert event, but new 
obj is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               processErr = z.processUpsert(newObj)
+       case cache.Deleted:
+               if oldObj == nil {
+                       errStr := "process zk metadata delete event, but old 
obj is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               // Metadata is an ephemeral znode, dubbo client only 
adds/updates the metadata znode and never deletes.
+               // And we can't identify the service only by the node path, so 
for delete event, we just ignored
+               logger.Infof("ignored zk metadata delete event")
+       }
+       if processErr != nil {
+               logger.Errorf("process zk metadata event failed, cause: %s, 
event: %s", processErr.Error(), event.String())
+               return processErr
+       }
+       logger.Infof("process zk metadata event successfully, event: %s", 
event.String())
+       return nil
+}
+
+func (z *ZKMetadataEventSubscriber) processUpsert(metadataRes 
*meshresource.ZKMetadataResource) error {
+       paths := strings.Split(metadataRes.Spec.NodePath, 
constants.PathSeparator)
+       if len(paths) < 2 {
+               return bizerror.New(bizerror.ZKError, fmt.Sprintf("invalid zk 
metadata node path: %s", metadataRes.Spec.NodePath))
+       }
+       if paths[len(paths)-2] == constants.ProviderSide {
+               return 
processMetadataUpsert[*meshresource.ServiceProviderMetadataResource](
+                       metadataRes, meshresource.ToServiceProviderMetadataRes, 
z.storeRouter, z.emitter)
+       } else if paths[len(paths)-2] == constants.ConsumerSide {
+               return 
processMetadataUpsert[*meshresource.ServiceConsumerMetadataResource](
+                       metadataRes, 
meshresource.ToServiceConsumerMetadataByRawData, z.storeRouter, z.emitter)
+       }
+       logger.Warnf("unknown metadata, node path: %s, node data: %s", 
metadataRes.Spec.NodePath, metadataRes.Spec.NodeData)
+       return nil
+}
+
+// processMetadataUpsert handle service provider/consumer metadata upsert
+func processMetadataUpsert[T coremodel.Resource](
+       zkMetadataRes *meshresource.ZKMetadataResource,
+       toMetadataRes meshresource.ToMetadataResFunc,
+       router store.Router,
+       emitter events.Emitter) error {
+       newMetadataRes := toMetadataRes(zkMetadataRes.Mesh, 
zkMetadataRes.Spec.NodeData)
+       if newMetadataRes == nil {
+               logger.Errorf("cannot unmarshal metadata in zk %s, raw content: 
%s", zkMetadataRes.Mesh, zkMetadataRes.Spec.NodeData)
+               return bizerror.New(bizerror.ZKError, "cannot unmarshal 
metadata")
+       }
+       st, err := router.ResourceKindRoute(newMetadataRes.ResourceKind())
+       if err != nil {
+               logger.Errorf("get %s store failed, cause: %s", 
newMetadataRes.ResourceKind(), err.Error())
+               return err
+       }
+       oldRes, exists, err := st.GetByKey(newMetadataRes.ResourceKey())
+       if err != nil {
+               logger.Errorf("get metadata %s from store failed, cause: %s", 
newMetadataRes.ResourceKey(), err.Error())
+               return err
+       }
+       if !exists {
+               err := st.Add(newMetadataRes)
+               if err != nil {
+                       logger.Errorf("add metadata %s to store failed, cause: 
%s", newMetadataRes.ResourceKey(), err.Error())
+                       return err
+               }
+               emitter.Send(events.NewResourceChangedEvent(cache.Added, nil, 
newMetadataRes))
+               return nil

Review Comment:
   The processUpsert function calls toMetadataRes which may return nil (line 
114), but the subsequent code assumes newMetadataRes is non-nil when calling 
newMetadataRes.ResourceKind() on line 119 and newMetadataRes.ResourceKey() on 
lines 124, 126, etc. This could cause a nil pointer dereference panic. Add a 
nil check after line 118 before using newMetadataRes.



##########
pkg/core/discovery/subscriber/zk_config.go:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package subscriber
+
+import (
+       "fmt"
+       "reflect"
+       "strings"
+
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/core/store"
+)
+
+type ZKConfigEventSubscriber struct {
+       emitter     events.Emitter
+       storeRouter store.Router
+}
+
+func NewZKConfigEventSubscriber(eventEmitter events.Emitter, storeRouter 
store.Router) *ZKConfigEventSubscriber {
+       return &ZKConfigEventSubscriber{
+               emitter:     eventEmitter,
+               storeRouter: storeRouter,
+       }
+}
+
+func (z *ZKConfigEventSubscriber) ResourceKind() coremodel.ResourceKind {
+       return meshresource.ZKConfigKind
+}
+
+func (z *ZKConfigEventSubscriber) Name() string {
+       return "Discovery-" + z.ResourceKind().ToString()
+}
+
+func (z *ZKConfigEventSubscriber) ProcessEvent(event events.Event) error {
+       newObj, ok := event.NewObj().(*meshresource.ZKConfigResource)
+       if !ok && event.NewObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(newObj), 
event.NewObj())
+       }
+       oldObj, ok := event.OldObj().(*meshresource.ZKConfigResource)
+       if !ok && event.OldObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(oldObj), 
event.OldObj())
+       }
+       var processErr error
+       switch event.Type() {
+       case cache.Added, cache.Updated, cache.Replaced, cache.Sync:
+               if newObj == nil || newObj.Spec == nil {
+                       errStr := "process zk config upsert event, but new obj 
is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               processErr = z.processUpsert(newObj)
+       case cache.Deleted:
+               if oldObj == nil {
+                       errStr := "process zk config delete event, but old obj 
is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               processErr = z.processDelete(oldObj)
+       }
+       if processErr != nil {
+               logger.Errorf("process zk config event failed, cause: %s, 
event: %s", processErr.Error(), event.String())
+               return processErr
+       }
+       logger.Infof("process zk config event successfully, event: %s", 
event.String())
+       return nil
+}
+
+func (z *ZKConfigEventSubscriber) processUpsert(configRes 
*meshresource.ZKConfigResource) error {
+       parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator)
+       suffix := parts[len(parts)-1]
+       if !constants.RuleSuffixSet.Contain(suffix) {
+               logger.Warnf("node name is not end with rule suffix, skipped 
processing, nodeName: %s", configRes.Spec.NodeName)
+               return nil
+       }
+       logger.Debugf("process zk config upsert event, config res: %s", 
configRes.String())
+       switch suffix {
+       case constants.TagRuleSuffix:
+               return processConfigUpsert[*meshresource.TagRouteResource](
+                       configRes, meshresource.ToTagRouteResource, 
z.storeRouter, z.emitter)
+       case constants.ConditionRuleSuffix:
+               return 
processConfigUpsert[*meshresource.ConditionRouteResource](
+                       configRes, meshresource.ToConditionRouteResource, 
z.storeRouter, z.emitter)
+       case constants.ConfiguratorsSuffix:
+               return processConfigUpsert[*meshresource.DynamicConfigResource](
+                       configRes, meshresource.ToDynamicConfigResource, 
z.storeRouter, z.emitter)
+       default:
+               return bizerror.New(bizerror.UnknownError,
+                       fmt.Sprintf("unknown rule type in mesh %s, skipped 
processing, path: %s, raw content: %s",
+                               configRes.Mesh, configRes.Spec.NodeName, 
configRes.Spec.NodeData))
+       }
+}
+
+func (z *ZKConfigEventSubscriber) processDelete(configRes 
*meshresource.ZKConfigResource) error {
+       parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator)
+       suffix := parts[len(parts)-1]
+       if !constants.RuleSuffixSet.Contain(suffix) {
+               logger.Warnf("node name is not end with rule suffix, skipped 
processing, nodeName: %s", configRes.Spec.NodeName)
+               return nil
+       }
+       logger.Debugf("process zk config delete event, config res: %s", 
configRes.String())
+       switch suffix {
+       case constants.TagRuleSuffix:
+               return processConfigDelete[*meshresource.TagRouteResource](
+                       configRes, meshresource.ToTagRouteResource, 
z.storeRouter, z.emitter)
+       case constants.ConditionRuleSuffix:
+               return 
processConfigDelete[*meshresource.ConditionRouteResource](
+                       configRes, meshresource.ToConditionRouteResource, 
z.storeRouter, z.emitter)
+       case constants.ConfiguratorsSuffix:
+               return processConfigDelete[*meshresource.DynamicConfigResource](
+                       configRes, meshresource.ToDynamicConfigResource, 
z.storeRouter, z.emitter)
+       default:
+               return bizerror.New(bizerror.UnknownError,
+                       fmt.Sprintf("unknown rule type in mesh %s, skipped 
processing, node: %s",
+                               configRes.Mesh, configRes.Spec.NodeName))
+       }
+}
+
+func processConfigUpsert[T coremodel.Resource](
+       configRes *meshresource.ZKConfigResource,
+       toRuleRes meshresource.ToRuleResourceFunc,
+       router store.Router,
+       emitter events.Emitter) error {
+       newRuleRes := toRuleRes(configRes.Mesh, configRes.Name, 
configRes.Spec.NodeData)
+       if newRuleRes == nil {
+               logger.Errorf("cannot unmarshal config in zk %s, raw content: 
%s", configRes.Mesh, configRes.Spec.NodeData)
+               return bizerror.New(bizerror.ZKError, "cannot unmarshal rule")
+       }
+       st, err := router.ResourceKindRoute(newRuleRes.ResourceKind())
+       if err != nil {
+               logger.Errorf("get %s store failed, cause: %s", 
newRuleRes.ResourceKind(), err.Error())
+               return err
+       }
+       oldRes, exists, err := st.GetByKey(newRuleRes.ResourceKey())
+       if err != nil {
+               logger.Errorf("get rule %s from store failed, cause: %s", 
newRuleRes.ResourceKey(), err.Error())
+               return err
+       }
+       if !exists {
+               err := st.Add(newRuleRes)
+               if err != nil {
+                       logger.Errorf("add rule %s to store failed, cause: %s", 
newRuleRes.ResourceKey(), err.Error())
+                       return err
+               }
+               emitter.Send(events.NewResourceChangedEvent(cache.Added, nil, 
newRuleRes))
+               return nil

Review Comment:
   The processConfigUpsert function calls toRuleRes which may return nil (line 
145), but the subsequent code assumes newRuleRes is non-nil when calling 
newRuleRes.ResourceKind() on line 150 and newRuleRes.ResourceKey() on lines 
155, 157, etc. This could cause a nil pointer dereference panic. The nil check 
on lines 146-149 only handles the error logging but doesn't prevent the code 
from continuing to use newRuleRes.



##########
pkg/discovery/zk/factory.go:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zk
+
+import (
+       "encoding/json"
+       "net/url"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       "github.com/duke-git/lancet/v2/strutil"
+
+       meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1"
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/controller"
+       "github.com/apache/dubbo-admin/pkg/core/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/listerwatcher"
+)
+
+func init() {
+       discovery.RegisterListWatcherFactory(&Factory{})
+}
+
+type Factory struct {
+}
+
+func (f *Factory) Support(typ discoverycfg.Type) bool {
+       return discoverycfg.Zookeeper == typ
+}
+
+func (f *Factory) NewListWatchers(config *discoverycfg.Config) 
([]controller.ResourceListerWatcher, error) {
+       address := config.Address.Registry
+       zkUrl, err := url.Parse(address)
+       if err != nil {
+               return nil, err
+       }
+       conn, _, err := zk.Connect([]string{zkUrl.Host}, time.Second*1, func(c 
*zk.Conn) {
+               c.SetLogger(&zkLogger{})
+       })
+       if err != nil {
+               logger.Fatalf("connect to %s failed," + address)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, "connect to 
zookeeper failed, addr: "+address)
+       }
+       mappingLw, err := listerwatcher.NewListerWatcher(
+               meshresource.ServiceProviderMappingKind,
+               toUpsertMappingResource,
+               toDeleteMappingResource,
+               "/dubbo/mapping",
+               conn,
+               config,
+       )
+       if err != nil {
+               return nil, err
+       }
+       //rpcInstanceLW, err := listerwatcher.NewListerWatcher(
+       //      meshresource.RPCInstanceKind,
+       //      toUpsertRPCInstanceResource,
+       //      toDeleteRPCInstanceResource,
+       //      "/services",
+       //      conn,
+       //      config,
+       //)
+       //if err != nil {
+       //      return nil, err
+       //}
+       configLW, err := listerwatcher.NewListerWatcher(
+               meshresource.ZKConfigKind,
+               toUpsertZKConfigResource,
+               toDeleteZKConfigResource,
+               "/dubbo/config",
+               conn,
+               config,
+       )
+
+       metadataLW, err := listerwatcher.NewListerWatcher(
+               meshresource.ZKMetadataKind,
+               toUpsertZKMetadataResource,
+               toDeleteZKMetadataResource,
+               "/dubbo/metadata",
+               conn,
+               config,
+       )

Review Comment:
   Missing error handling after creating configLW and metadataLW (lines 
88-104). If these calls return an error, it's silently ignored and execution 
continues to return the list including nil values. The error check pattern used 
for mappingLw (lines 74-76) should be applied here as well.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool
+}
+
+// RecursiveWatcher recursive watcher
+type RecursiveWatcher struct {
+       conn      *zk.Conn
+       basePath  string
+       eventChan chan ZookeeperEvent
+       stopChan  chan struct{}
+       mu        sync.Mutex
+}
+
+// NewRecursiveWatcher create a new recursive watcher
+func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher {
+       return &RecursiveWatcher{
+               conn:      conn,
+               basePath:  basePath,
+               eventChan: make(chan ZookeeperEvent, 1000),
+               stopChan:  make(chan struct{}),
+       }
+}
+
+// Start begin watching
+func (rw *RecursiveWatcher) Start() error {
+       logger.Infof("Start watching path: %s", rw.basePath)
+       // Recursively watch the initial path
+       return rw.watchPathRecursively(rw.basePath)
+}
+
+// Stop watching
+func (rw *RecursiveWatcher) Stop() {
+       close(rw.stopChan)
+       rw.conn.Close()
+       close(rw.eventChan)

Review Comment:
   The Stop method closes resources in a potentially unsafe order. Closing 
stopChan first (line 77) may cause goroutines to exit, but then the connection 
is closed (line 78) which could cause errors in those goroutines that are still 
processing. Finally, closing eventChan (line 79) while other goroutines might 
still be sending to it could cause a panic. The resources should be closed in 
the reverse order: first signal stop via stopChan, then close the connection 
after goroutines have exited, and finally close eventChan.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool
+}
+
+// RecursiveWatcher recursive watcher
+type RecursiveWatcher struct {
+       conn      *zk.Conn
+       basePath  string
+       eventChan chan ZookeeperEvent
+       stopChan  chan struct{}
+       mu        sync.Mutex
+}
+
+// NewRecursiveWatcher create a new recursive watcher
+func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher {
+       return &RecursiveWatcher{
+               conn:      conn,
+               basePath:  basePath,
+               eventChan: make(chan ZookeeperEvent, 1000),
+               stopChan:  make(chan struct{}),
+       }
+}
+
+// Start begin watching
+func (rw *RecursiveWatcher) Start() error {
+       logger.Infof("Start watching path: %s", rw.basePath)
+       // Recursively watch the initial path
+       return rw.watchPathRecursively(rw.basePath)
+}
+
+// Stop watching
+func (rw *RecursiveWatcher) Stop() {
+       close(rw.stopChan)
+       rw.conn.Close()
+       close(rw.eventChan)
+}
+
+// EventChan return event channel
+func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent {
+       return rw.eventChan
+}
+
+// watchPathRecursively watch path recursively
+func (rw *RecursiveWatcher) watchPathRecursively(path string) error {
+       // Watch data changes for the current path
+       go rw.watchDataChanges(path)
+
+       // Get children of current path and watch
+       children, _, err := rw.conn.Children(path)
+       if err != nil {
+               logger.Errorf("Failed to get children of path: %s, cause: %v", 
path, err)
+               return err
+       }
+       // Watch children list changes
+       go rw.watchChildrenChanges(path)
+
+       // Recursively watch child nodes
+       for _, child := range children {
+               childPath := path + "/" + child
+               if err := rw.watchPathRecursively(childPath); err != nil {
+                       logger.Errorf("Failed to watch subpath %s: %v", 
childPath, err)
+               }
+       }
+       return nil
+}
+
+// watchDataChanges watch node data changes
+func (rw *RecursiveWatcher) watchDataChanges(path string) {
+       logger.Debugf("Watching data changes for path: %s", path)
+       for {
+
+               select {
+               case <-rw.stopChan:
+                       return
+               default:
+               }
+
+               // Get data and set up watch
+               data, stat, watcher, err := rw.conn.GetW(path)
+               if err != nil {
+                       logger.Errorf("Failed to watch data changes for path 
%s, cause: %v", path, err)
+                       time.Sleep(time.Second)
+                       continue
+               }
+               rw.eventChan <- ZookeeperEvent{
+                       Type:     NodeCreated,
+                       Path:     path,
+                       Data:     string(data),
+                       leafNode: stat.NumChildren == 0,
+               }
+
+               select {
+               case event := <-watcher.EvtCh:
+                       if event.Type == zk.EventNodeDataChanged {
+                               logger.Debugf("node data changed: %s", path)
+
+                               // Re-watch data changes
+                               go rw.watchDataChanges(path)
+                               return
+                       } else if event.Type == zk.EventNodeDeleted {
+                               // Node deleted, stop watching
+                               logger.Debugf("node deleted: %s", path)
+                               rw.eventChan <- ZookeeperEvent{
+                                       Type:     NodeDeleted,
+                                       Path:     path,
+                                       leafNode: stat.NumChildren == 0,
+                               }
+                               return
+                       }
+               case <-rw.stopChan:
+                       return
+               }
+
+               // If node exists but no changes, continue watching
+               if stat != nil {
+                       continue
+               }
+       }
+}
+
+// watchChildrenChanges watch child node changes
+func (rw *RecursiveWatcher) watchChildrenChanges(path string) {
+       logger.Debugf("Watching child node changes for path: %s", path)
+       for {
+               // Check if stopped
+               select {
+               case <-rw.stopChan:
+                       return
+               default:
+               }
+
+               // Get child nodes and set up watch
+               children, stat, watcher, err := rw.conn.ChildrenW(path)
+               if err != nil {
+                       logger.Errorf("Failed to watch child node changes for 
path %s: %v", path, err)
+                       time.Sleep(time.Second)
+                       continue
+               }
+
+               select {
+               case event := <-watcher.EvtCh:
+                       if event.Type == zk.EventNodeChildrenChanged {
+                               logger.Debugf("Child node list changed: %s", 
path)
+
+                               // Get current child nodes
+                               newChildren, _, err := rw.conn.Children(path)
+                               if err != nil {
+                                       logger.Debugf("Failed to get new child 
nodes: %v", err)
+                                       continue
+                               }
+                               // Calculate added and deleted child nodes
+                               newChildSet := set.FromSlice(newChildren)
+                               oldChildSet := set.FromSlice(children)
+                               addedChildren := newChildSet.Minus(oldChildSet)
+                               deletedChildren := 
oldChildSet.Minus(newChildSet)
+
+                               // watch newly added child nodes
+                               for _, c := range addedChildren.ToSlice() {
+                                       childPath := path + "/" + c
+                                       logger.Debugf("New child node added: 
%s", childPath)
+                                       // Recursively watch new node
+                                       go func() {
+                                               err := 
rw.watchPathRecursively(childPath)
+                                               if err != nil {
+                                                       logger.Errorf("Failed 
to watch subpath %s: %v", childPath, err)
+                                               }
+                                       }()
+                               }
+
+                               // Check for deleted child nodes
+                               for _, c := range deletedChildren.ToSlice() {
+                                       logger.Debugf("child node %s of %s 
deleted", c, path)
+                               }
+
+                               // Re-watch child node changes
+                               go rw.watchChildrenChanges(path)
+                               return
+                       } else if event.Type == zk.EventNodeDeleted {
+                               // Parent node deleted, stop watching
+                               logger.Debugf("parent node %s deleted, stop 
watching children changes", path)
+                               return
+                       }
+               case <-rw.stopChan:
+                       return
+               }
+
+               // If node exists but no changes, continue watching
+               if stat != nil {
+                       continue
+               }
+       }
+}
+
+// handleEvents handle events
+func (rw *RecursiveWatcher) handleEvents() {
+       for event := range rw.eventChan {
+               logger.Infof("Event type: %s, Path: %s, leafNode: %v, Data: 
%s", event.Type, event.Path, event.leafNode, event.Data)
+       }
+}

Review Comment:
   The function handleEvents (lines 238-243) is defined but never called. This 
appears to be dead code that should either be removed or integrated into the 
watcher's lifecycle if event handling logic is needed.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool
+}
+
+// RecursiveWatcher recursive watcher
+type RecursiveWatcher struct {
+       conn      *zk.Conn
+       basePath  string
+       eventChan chan ZookeeperEvent
+       stopChan  chan struct{}
+       mu        sync.Mutex
+}
+
+// NewRecursiveWatcher create a new recursive watcher
+func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher {
+       return &RecursiveWatcher{
+               conn:      conn,
+               basePath:  basePath,
+               eventChan: make(chan ZookeeperEvent, 1000),

Review Comment:
   The eventChan buffer size of 1000 (line 63) is arbitrary and could lead to 
blocking if events are produced faster than consumed. Consider making this 
configurable or implementing backpressure handling to avoid blocking the 
watcher goroutines.



##########
pkg/core/resource/apis/mesh/v1alpha1/service_helper.go:
##########
@@ -2,6 +2,9 @@ package v1alpha1
 
 import "github.com/apache/dubbo-admin/pkg/common/constants"
 
-func BuildServiceKey(serviceName, version, group string) string {
-       return serviceName + constants.ColonSeparator + version + 
constants.ColonSeparator + group
+// BuildServiceKey build service key
+// {service}:{version}:{group}:{appName}
+func BuildServiceKey(serviceName, version, group, appName string) string {
+       return serviceName + constants.ColonSeparator + version +
+               constants.ColonSeparator + group + constants.ColonSeparator + 
appName
 }

Review Comment:
   The BuildServiceKey function signature changed to include an appName 
parameter (line 7), but the function documentation doesn't explain what happens 
when appName is empty or how it affects the service key format. Consider adding 
documentation to clarify the expected behavior and format of the returned key.



##########
pkg/discovery/zk/listerwatcher/listerwatcher.go:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package listerwatcher
+
+import (
+       "fmt"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       k8sruntime "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher"
+)
+
+type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) 
coremodel.Resource
+
+type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource
+
+type ListerWatcher[T coremodel.Resource] struct {
+       basePath             string
+       rk                   coremodel.ResourceKind
+       conn                 *zk.Conn
+       cfg                  *discoverycfg.Config
+       toUpsertResourceFunc ToUpsertResourceFunc
+       toDeleteResourceFunc ToDeleteResourceFunc
+       newResourceFunc      coremodel.NewResourceFunc
+       newResListFunc       coremodel.NewResourceListFunc
+       resultChan           chan watch.Event
+       stopChan             chan bool
+}
+
+func NewListerWatcher(
+       rk coremodel.ResourceKind,
+       toResourceFunc ToUpsertResourceFunc,
+       toDeleteResourceFunc ToDeleteResourceFunc,
+       basePath string,
+       conn *zk.Conn,
+       cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) {
+       newResourceFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       newResListFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       return &ListerWatcher[coremodel.Resource]{
+               rk:                   rk,
+               toUpsertResourceFunc: toResourceFunc,
+               toDeleteResourceFunc: toDeleteResourceFunc,
+               basePath:             basePath,
+               conn:                 conn,
+               cfg:                  cfg,
+               newResourceFunc:      newResourceFunc,
+               newResListFunc:       newResListFunc,
+               resultChan:           make(chan watch.Event, 1000),
+               stopChan:             make(chan bool),
+       }, nil
+}
+
+func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, 
error) {
+       resList, err := lw.listRecursively(lw.basePath)
+       if err != nil {
+               logger.Errorf("list all %s under path %s in zk %s failed, 
cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err)
+               return nil, err
+       }
+       resListObj := lw.newResListFunc()
+       resListObj.SetItems(resList)
+       return resListObj, nil
+}
+
+func (lw *ListerWatcher[T]) listRecursively(path string) 
([]coremodel.Resource, error) {
+       resList := make([]coremodel.Resource, 0)
+       nodeData, _, err := lw.conn.Get(path)
+       if err != nil {
+               errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData))
+       if res != nil {
+               resList = append(resList, res)
+       }
+       children, _, err := lw.conn.Children(path)
+       if err != nil {
+               errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       if len(children) == 0 {
+               return resList, nil
+       }
+       for _, childPath := range children {
+               resources, err := lw.listRecursively(path + 
constants.PathSeparator + childPath)
+               if err != nil {
+                       return nil, err
+               }
+               resList = append(resList, resources...)
+       }
+       return resList, nil
+}
+
+func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, 
error) {
+       watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath)
+       go func() {
+               for {
+                       select {
+                       case event, ok := <-watcher.EventChan():
+                               if !ok {
+                                       logger.Warnf("zookeeper watcher 
stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr())
+                                       return
+                               }
+                               lw.handleEvent(event)
+                       case <-lw.stopChan:
+                               logger.Warnf("stop watching %s in %s,", 
lw.basePath, lw.zkAddr())
+                               return
+                       }
+
+               }
+       }()
+       err := watcher.Start()
+       if err != nil {
+               return nil, err
+       }
+       return lw, nil
+}
+
+func (lw *ListerWatcher[T]) handleEvent(event zkwatcher.ZookeeperEvent) {
+       switch event.Type {
+       case zkwatcher.NodeCreated:
+               res := lw.toUpsertResourceFunc(lw.mesh(), event.Path, 
event.Data)
+               if res == nil {
+                       logger.Warnf("skip creating resource, parse zookeeper 
node data to %s failed, path: %s, zk: %s, event: %v",
+                               lw.rk, event.Path, lw.zkAddr(), event)
+                       return
+               }
+               logger.Infof("%s added, rsKey: %s", lw.rk, res.ResourceKey())
+               lw.resultChan <- watch.Event{
+                       Type:   watch.Added,
+                       Object: res,
+               }
+       case zkwatcher.NodeChanged:
+               res := lw.toUpsertResourceFunc(lw.mesh(), event.Path, 
event.Data)
+               if res == nil {
+                       logger.Warnf("skip updating resource, parse zookeeper 
node data to %s failed, path: %s, zk: %s, event: %v",
+                               lw.rk, event.Path, lw.zkAddr(), event)
+                       return
+               }
+               logger.Infof("%s modified, rsKey: %s", lw.rk, res.ResourceKey())
+               lw.resultChan <- watch.Event{
+                       Type:   watch.Modified,
+                       Object: res,
+               }
+       case zkwatcher.NodeDeleted:
+               res := lw.toDeleteResourceFunc(lw.mesh(), event.Path)
+               if res == nil {
+                       logger.Warnf("skip deleting resource, parse zookeeper 
node data to %s failed, path: %s, zk: %s, event: %v",
+                               lw.rk, event.Path, lw.zkAddr(), event)
+                       return
+               }
+               logger.Infof("%s deleted, rsKey: %s", lw.rk, res.ResourceKey())
+               lw.resultChan <- watch.Event{
+                       Type:   watch.Deleted,
+                       Object: res,
+               }
+       default:
+               logger.Warnf("unknown event type, event: %v", event)
+       }
+}
+
+func (lw *ListerWatcher[T]) zkAddr() string {
+       return lw.cfg.Address.Registry
+}
+
+func (lw *ListerWatcher[T]) mesh() string {
+       return lw.cfg.ID
+}
+
+func (lw *ListerWatcher[T]) ResourceKind() coremodel.ResourceKind {
+       return lw.rk
+}
+
+func (lw *ListerWatcher[T]) TransformFunc() cache.TransformFunc {
+       return nil
+}
+
+func (lw *ListerWatcher[T]) Stop() {
+       lw.stopChan <- true

Review Comment:
   The Stop method sends a value to stopChan (line 210) which is unbuffered. If 
no goroutine is listening on stopChan at the time Stop is called, this will 
block indefinitely. Consider making stopChan buffered or using close(stopChan) 
instead of sending a value.



##########
pkg/discovery/zk/factory.go:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zk
+
+import (
+       "encoding/json"
+       "net/url"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       "github.com/duke-git/lancet/v2/strutil"
+
+       meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1"
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/controller"
+       "github.com/apache/dubbo-admin/pkg/core/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/listerwatcher"
+)
+
+func init() {
+       discovery.RegisterListWatcherFactory(&Factory{})
+}
+
+type Factory struct {
+}
+
+func (f *Factory) Support(typ discoverycfg.Type) bool {
+       return discoverycfg.Zookeeper == typ
+}
+
+func (f *Factory) NewListWatchers(config *discoverycfg.Config) 
([]controller.ResourceListerWatcher, error) {
+       address := config.Address.Registry
+       zkUrl, err := url.Parse(address)
+       if err != nil {
+               return nil, err
+       }
+       conn, _, err := zk.Connect([]string{zkUrl.Host}, time.Second*1, func(c 
*zk.Conn) {
+               c.SetLogger(&zkLogger{})
+       })
+       if err != nil {
+               logger.Fatalf("connect to %s failed," + address)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, "connect to 
zookeeper failed, addr: "+address)
+       }
+       mappingLw, err := listerwatcher.NewListerWatcher(
+               meshresource.ServiceProviderMappingKind,
+               toUpsertMappingResource,
+               toDeleteMappingResource,
+               "/dubbo/mapping",
+               conn,
+               config,
+       )
+       if err != nil {
+               return nil, err
+       }
+       //rpcInstanceLW, err := listerwatcher.NewListerWatcher(
+       //      meshresource.RPCInstanceKind,
+       //      toUpsertRPCInstanceResource,
+       //      toDeleteRPCInstanceResource,
+       //      "/services",
+       //      conn,
+       //      config,
+       //)
+       //if err != nil {
+       //      return nil, err
+       //}
+       configLW, err := listerwatcher.NewListerWatcher(
+               meshresource.ZKConfigKind,
+               toUpsertZKConfigResource,
+               toDeleteZKConfigResource,
+               "/dubbo/config",
+               conn,
+               config,
+       )
+
+       metadataLW, err := listerwatcher.NewListerWatcher(
+               meshresource.ZKMetadataKind,
+               toUpsertZKMetadataResource,
+               toDeleteZKMetadataResource,
+               "/dubbo/metadata",
+               conn,
+               config,
+       )
+
+       return []controller.ResourceListerWatcher{
+               //listerwatcher.NewRPCInstanceListerWatcher(conn, config),
+               mappingLw,
+               //rpcInstanceLW,
+               configLW,
+               metadataLW,
+       }, nil

Review Comment:
   The same zookeeper connection (conn) is being reused for multiple 
ListerWatchers (lines 66-112). If any of the ListerWatcher's Stop method closes 
the connection (which the zkwatcher.RecursiveWatcher.Stop does on line 78 of 
watcher.go), it will affect all other ListerWatchers sharing the same 
connection, causing them to fail. Each ListerWatcher should either have its own 
connection, or the connection lifecycle should be managed separately from the 
watcher lifecycle.



##########
pkg/discovery/zk/factory.go:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zk
+
+import (
+       "encoding/json"
+       "net/url"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       "github.com/duke-git/lancet/v2/strutil"
+
+       meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1"
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/controller"
+       "github.com/apache/dubbo-admin/pkg/core/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/listerwatcher"
+)
+
+func init() {
+       discovery.RegisterListWatcherFactory(&Factory{})
+}
+
+type Factory struct {
+}
+
+func (f *Factory) Support(typ discoverycfg.Type) bool {
+       return discoverycfg.Zookeeper == typ
+}
+
+func (f *Factory) NewListWatchers(config *discoverycfg.Config) 
([]controller.ResourceListerWatcher, error) {
+       address := config.Address.Registry
+       zkUrl, err := url.Parse(address)
+       if err != nil {
+               return nil, err
+       }
+       conn, _, err := zk.Connect([]string{zkUrl.Host}, time.Second*1, func(c 
*zk.Conn) {
+               c.SetLogger(&zkLogger{})
+       })
+       if err != nil {
+               logger.Fatalf("connect to %s failed," + address)

Review Comment:
   The error message on line 63 uses string concatenation ("connect to %s 
failed," + address) which results in incorrect formatting. The message will be 
"connect to %s failed,nacos://..." instead of properly formatting the address. 
Either use fmt.Sprintf or include the address in the format string like 
"connect to %s failed".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to