Copilot commented on code in PR #1371:
URL: https://github.com/apache/dubbo-admin/pull/1371#discussion_r2635030086


##########
pkg/discovery/zk/zkwatcher/watcher_test.go:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+func TestLocalhost(t *testing.T) {
+       zkServers := []string{"localhost:2181"}
+       basePath := "/services"
+       conn, _, err := zk.Connect(zkServers, time.Second*10)
+       if err != nil {
+               logger.Fatalf("Failed to connect to zookeeper: %v", err)
+       }
+       watcher := NewRecursiveWatcher(conn, basePath)
+
+       // Start listening
+       if err := watcher.Start(); err != nil {
+               logger.Fatalf("Failed to start watching: %v", err)

Review Comment:
   The test uses `logger.Fatalf` which will exit the entire test process 
instead of properly failing the test. Use `t.Fatalf` instead to properly report 
test failures within the testing framework.



##########
pkg/discovery/zk/zkwatcher/watcher_test.go:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+func TestLocalhost(t *testing.T) {

Review Comment:
   The test name "TestLocalhost" is not descriptive of what is being tested. 
Consider renaming to something like "TestRecursiveWatcher" or 
"TestRecursiveWatcherBasicFunctionality" to better describe the test's purpose.



##########
pkg/core/discovery/subscriber/zk_metadata.go:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package subscriber
+
+import (
+       "reflect"
+       "strings"
+
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/core/store"
+)
+
+type ZKMetadataEventSubscriber struct {
+       emitter     events.Emitter
+       storeRouter store.Router
+}
+
+func NewZKMetadataEventSubscriber(eventEmitter events.Emitter, storeRouter 
store.Router) *ZKMetadataEventSubscriber {
+       return &ZKMetadataEventSubscriber{
+               emitter:     eventEmitter,
+               storeRouter: storeRouter,
+       }
+}
+
+func (z *ZKMetadataEventSubscriber) ResourceKind() coremodel.ResourceKind {
+       return meshresource.ZKMetadataKind
+}
+
+func (z *ZKMetadataEventSubscriber) Name() string {
+       return "Discovery-" + z.ResourceKind().ToString()
+}
+
+func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error {
+       newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource)
+       if !ok && event.NewObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(newObj), 
event.NewObj())
+       }
+       oldObj, ok := event.OldObj().(*meshresource.ZKMetadataResource)
+       if !ok && event.OldObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(oldObj), 
event.OldObj())
+       }
+       var processErr error
+       switch event.Type() {
+       case cache.Added, cache.Updated, cache.Replaced, cache.Sync:
+               if newObj == nil || newObj.Spec == nil {
+                       errStr := "process zk metadata upsert event, but new 
obj is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               processErr = z.processUpsert(newObj)
+       case cache.Deleted:
+               if oldObj == nil {
+                       errStr := "process zk metadata delete event, but old 
obj is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               // metadata is an ephemeral znode, dubbo client only 
adds/updates the metadata znode and never deletes.
+               // Plus, we can't identify the service only by the node path
+               logger.Infof("ignored zk metadata delete event")
+       }
+       if processErr != nil {
+               logger.Errorf("process zk metadata event failed, cause: %s, 
event: %s", processErr.Error(), event.String())
+               return processErr
+       }
+       logger.Infof("process zk metadata event successfully, event: %s", 
event.String())
+       return nil
+}
+
+func (z *ZKMetadataEventSubscriber) processUpsert(metadataRes 
*meshresource.ZKMetadataResource) error {
+       paths := strings.Split(metadataRes.Spec.NodePath, 
constants.PathSeparator)
+       if paths[len(paths)-2] == constants.ProviderSide {
+               return 
processMetadataUpsert[*meshresource.ServiceProviderMetadataResource](
+                       metadataRes, meshresource.ToServiceProviderMetadataRes, 
z.storeRouter, z.emitter)
+       } else if paths[len(paths)-2] == constants.ConsumerSide {
+               return 
processMetadataUpsert[*meshresource.ServiceConsumerMetadataResource](
+                       metadataRes, 
meshresource.ToServiceConsumerMetadataByRawData, z.storeRouter, z.emitter)
+       }

Review Comment:
   Potential index out of bounds error. The code accesses `paths[len(paths)-2]` 
without checking if the paths slice has at least 2 elements. If the NodePath 
has fewer path segments, this will cause a panic. Add a length check before 
accessing the array element.



##########
pkg/discovery/zk/zkwatcher/watcher_test.go:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+func TestLocalhost(t *testing.T) {
+       zkServers := []string{"localhost:2181"}
+       basePath := "/services"
+       conn, _, err := zk.Connect(zkServers, time.Second*10)
+       if err != nil {
+               logger.Fatalf("Failed to connect to zookeeper: %v", err)
+       }
+       watcher := NewRecursiveWatcher(conn, basePath)
+
+       // Start listening
+       if err := watcher.Start(); err != nil {
+               logger.Fatalf("Failed to start watching: %v", err)
+       }
+
+       // Keep program running
+       select {
+       case <-watcher.stopChan:
+               fmt.Println("Watcher stopped")
+       }
+}

Review Comment:
   This test lacks proper cleanup. The ZooKeeper connection and watcher are 
never properly closed, which will leak resources. Add a `defer watcher.Stop()` 
after line 37 to ensure cleanup happens even if the test fails.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool
+}
+
+// RecursiveWatcher recursive watcher
+type RecursiveWatcher struct {
+       conn      *zk.Conn
+       basePath  string
+       eventChan chan ZookeeperEvent
+       stopChan  chan struct{}
+       wg        sync.WaitGroup
+       mu        sync.Mutex
+}
+
+// NewRecursiveWatcher create a new recursive watcher
+func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher {
+       return &RecursiveWatcher{
+               conn:      conn,
+               basePath:  basePath,
+               eventChan: make(chan ZookeeperEvent, 1000),
+               stopChan:  make(chan struct{}),
+       }
+}
+
+// Start begin watching
+func (rw *RecursiveWatcher) Start() error {
+       logger.Infof("Start watching path: %s", rw.basePath)
+       // Recursively watch the initial path
+       return rw.watchPathRecursively(rw.basePath)
+}
+
+// Stop watching
+func (rw *RecursiveWatcher) Stop() {
+       close(rw.stopChan)
+       rw.wg.Wait()
+       rw.conn.Close()
+       close(rw.eventChan)
+}
+
+// EventChan return event channel
+func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent {
+       return rw.eventChan
+}
+
+// watchPathRecursively watch path recursively
+func (rw *RecursiveWatcher) watchPathRecursively(path string) error {
+       // Watch data changes for the current path
+       go rw.watchDataChanges(path)
+
+       // Get children of current path and watch
+       children, _, err := rw.conn.Children(path)
+       if err != nil {
+               logger.Errorf("Failed to get children of path: %s, cause: %v", 
path, err)
+               return err
+       }
+       // Watch children list changes
+       go rw.watchChildrenChanges(path)
+
+       // Recursively watch child nodes
+       for _, child := range children {
+               childPath := path + "/" + child
+               if err := rw.watchPathRecursively(childPath); err != nil {
+                       logger.Errorf("Failed to watch subpath %s: %v", 
childPath, err)
+               }
+       }
+       return nil
+}
+
+// watchDataChanges watch node data changes
+func (rw *RecursiveWatcher) watchDataChanges(path string) {
+       logger.Debugf("Watching data changes for path: %s", path)
+       for {
+
+               select {
+               case <-rw.stopChan:
+                       return
+               default:
+               }
+
+               // Get data and set up watch
+               data, stat, watcher, err := rw.conn.GetW(path)
+               if err != nil {
+                       logger.Errorf("Failed to watch data changes for path 
%s, cause: %v", path, err)
+                       time.Sleep(time.Second)
+                       continue
+               }
+               rw.eventChan <- ZookeeperEvent{
+                       Type:     NodeCreated,
+                       Path:     path,
+                       Data:     string(data),
+                       leafNode: stat.NumChildren == 0,
+               }
+
+               select {
+               case event := <-watcher.EvtCh:
+                       if event.Type == zk.EventNodeDataChanged {
+                               logger.Debugf("node data changed: %s", path)
+                               rw.eventChan <- ZookeeperEvent{
+                                       Type:     NodeChanged,
+                                       Path:     path,
+                                       Data:     string(data),
+                                       leafNode: stat.NumChildren == 0,
+                               }

Review Comment:
   Using stale data after a node change event. When emitting the NodeChanged 
event, the code uses the old `data` and `stat` values that were captured before 
the event occurred. This should fetch fresh data after detecting the change to 
ensure the event contains the updated node data.



##########
pkg/discovery/zk/zkwatcher/watcher.go:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "sync"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       set "github.com/duke-git/lancet/v2/datastructure/set"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+type EventType string
+
+const (
+       NodeCreated EventType = "NodeCreated"
+       NodeDeleted EventType = "NodeDeleted"
+       NodeChanged EventType = "NodeChanged"
+)
+
+// ZookeeperEvent represents a zookeeper event
+type ZookeeperEvent struct {
+       Type EventType
+       // node path
+       Path string
+       // node data
+       Data string
+       // whether it is a leaf node
+       leafNode bool
+}
+
+// RecursiveWatcher recursive watcher
+type RecursiveWatcher struct {
+       conn      *zk.Conn
+       basePath  string
+       eventChan chan ZookeeperEvent
+       stopChan  chan struct{}
+       wg        sync.WaitGroup
+       mu        sync.Mutex
+}
+
+// NewRecursiveWatcher create a new recursive watcher
+func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher {
+       return &RecursiveWatcher{
+               conn:      conn,
+               basePath:  basePath,
+               eventChan: make(chan ZookeeperEvent, 1000),
+               stopChan:  make(chan struct{}),
+       }
+}
+
+// Start begin watching
+func (rw *RecursiveWatcher) Start() error {
+       logger.Infof("Start watching path: %s", rw.basePath)
+       // Recursively watch the initial path
+       return rw.watchPathRecursively(rw.basePath)
+}
+
+// Stop watching
+func (rw *RecursiveWatcher) Stop() {
+       close(rw.stopChan)
+       rw.wg.Wait()
+       rw.conn.Close()
+       close(rw.eventChan)
+}
+
+// EventChan return event channel
+func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent {
+       return rw.eventChan
+}
+
+// watchPathRecursively watch path recursively
+func (rw *RecursiveWatcher) watchPathRecursively(path string) error {
+       // Watch data changes for the current path
+       go rw.watchDataChanges(path)
+
+       // Get children of current path and watch
+       children, _, err := rw.conn.Children(path)
+       if err != nil {
+               logger.Errorf("Failed to get children of path: %s, cause: %v", 
path, err)
+               return err
+       }
+       // Watch children list changes
+       go rw.watchChildrenChanges(path)

Review Comment:
   Goroutines started in watchPathRecursively are not tracked. The 
sync.WaitGroup field `wg` exists in the RecursiveWatcher struct but is never 
used to track these goroutines, which means Stop() won't wait for them to 
complete. Add `rw.wg.Add(1)` before spawning goroutines and `defer 
rw.wg.Done()` in the goroutine functions to properly track and wait for all 
goroutines during shutdown.



##########
pkg/core/discovery/subscriber/zk_metadata.go:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package subscriber
+
+import (
+       "reflect"
+       "strings"
+
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/core/store"
+)
+
+type ZKMetadataEventSubscriber struct {
+       emitter     events.Emitter
+       storeRouter store.Router
+}
+
+func NewZKMetadataEventSubscriber(eventEmitter events.Emitter, storeRouter 
store.Router) *ZKMetadataEventSubscriber {
+       return &ZKMetadataEventSubscriber{
+               emitter:     eventEmitter,
+               storeRouter: storeRouter,
+       }
+}
+
+func (z *ZKMetadataEventSubscriber) ResourceKind() coremodel.ResourceKind {
+       return meshresource.ZKMetadataKind
+}
+
+func (z *ZKMetadataEventSubscriber) Name() string {
+       return "Discovery-" + z.ResourceKind().ToString()
+}
+
+func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error {
+       newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource)
+       if !ok && event.NewObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(newObj), 
event.NewObj())
+       }
+       oldObj, ok := event.OldObj().(*meshresource.ZKMetadataResource)
+       if !ok && event.OldObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(oldObj), 
event.OldObj())
+       }
+       var processErr error
+       switch event.Type() {
+       case cache.Added, cache.Updated, cache.Replaced, cache.Sync:
+               if newObj == nil || newObj.Spec == nil {
+                       errStr := "process zk metadata upsert event, but new 
obj is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               processErr = z.processUpsert(newObj)
+       case cache.Deleted:
+               if oldObj == nil {
+                       errStr := "process zk metadata delete event, but old 
obj is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               // metadata is an ephemeral znode, dubbo client only 
adds/updates the metadata znode and never deletes.
+               // Plus, we can't identify the service only by the node path
+               logger.Infof("ignored zk metadata delete event")
+       }
+       if processErr != nil {
+               logger.Errorf("process zk metadata event failed, cause: %s, 
event: %s", processErr.Error(), event.String())
+               return processErr
+       }
+       logger.Infof("process zk metadata event successfully, event: %s", 
event.String())
+       return nil
+}
+
+func (z *ZKMetadataEventSubscriber) processUpsert(metadataRes 
*meshresource.ZKMetadataResource) error {
+       paths := strings.Split(metadataRes.Spec.NodePath, 
constants.PathSeparator)
+       if paths[len(paths)-2] == constants.ProviderSide {
+               return 
processMetadataUpsert[*meshresource.ServiceProviderMetadataResource](
+                       metadataRes, meshresource.ToServiceProviderMetadataRes, 
z.storeRouter, z.emitter)
+       } else if paths[len(paths)-2] == constants.ConsumerSide {
+               return 
processMetadataUpsert[*meshresource.ServiceConsumerMetadataResource](
+                       metadataRes, 
meshresource.ToServiceConsumerMetadataByRawData, z.storeRouter, z.emitter)
+       }
+       logger.Warnf("unkonwn metadata, node path: %s, node data: %s", 
metadataRes.Spec.NodePath, metadataRes.Spec.NodeData)

Review Comment:
   Typo in log message: "unkonwn" should be "unknown".
   ```suggestion
        logger.Warnf("unknown metadata, node path: %s, node data: %s", 
metadataRes.Spec.NodePath, metadataRes.Spec.NodeData)
   ```



##########
pkg/core/discovery/subscriber/zk_config.go:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package subscriber
+
+import (
+       "fmt"
+       "reflect"
+       "strings"
+
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/core/store"
+)
+
+type ZKConfigEventSubscriber struct {
+       emitter     events.Emitter
+       storeRouter store.Router
+}
+
+func NewZKConfigEventSubscriber(eventEmitter events.Emitter, storeRouter 
store.Router) *ZKConfigEventSubscriber {
+       return &ZKConfigEventSubscriber{
+               emitter:     eventEmitter,
+               storeRouter: storeRouter,
+       }
+}
+
+func (z *ZKConfigEventSubscriber) ResourceKind() coremodel.ResourceKind {
+       return meshresource.ZKConfigKind
+}
+
+func (z *ZKConfigEventSubscriber) Name() string {
+       return "Discovery-" + z.ResourceKind().ToString()
+}
+
+func (z *ZKConfigEventSubscriber) ProcessEvent(event events.Event) error {
+       newObj, ok := event.NewObj().(*meshresource.ZKConfigResource)
+       if !ok && event.NewObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(newObj), 
event.NewObj())
+       }
+       oldObj, ok := event.OldObj().(*meshresource.ZKConfigResource)
+       if !ok && event.OldObj() != nil {
+               return bizerror.NewAssertionError(reflect.TypeOf(oldObj), 
event.OldObj())
+       }
+       var processErr error
+       switch event.Type() {
+       case cache.Added, cache.Updated, cache.Replaced, cache.Sync:
+               if newObj == nil || newObj.Spec == nil {
+                       errStr := "process zk config upsert event, but new obj 
is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               processErr = z.processUpsert(newObj)
+       case cache.Deleted:
+               if oldObj == nil {
+                       errStr := "process zk config delete event, but old obj 
is nil, skipped processing"
+                       logger.Errorf(errStr)
+                       return bizerror.New(bizerror.EventError, errStr)
+               }
+               processErr = z.processDelete(oldObj)
+       }
+       if processErr != nil {
+               logger.Errorf("process zk config event failed, cause: %s, 
event: %s", processErr.Error(), event.String())
+               return processErr
+       }
+       logger.Infof("process zk config event successfully, event: %s", 
event.String())
+       return nil
+}
+
+func (z *ZKConfigEventSubscriber) processUpsert(configRes 
*meshresource.ZKConfigResource) error {
+       parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator)
+       suffix := parts[len(parts)-1]
+       if !constants.RuleSuffixSet.Contain(suffix) {
+               logger.Warnf("node name is not end with rule suffix, skipped 
processing, nodeName: %s", configRes.Spec.NodeName)
+               return nil
+       }
+       logger.Debugf("process zk config upsert event, config res: %s", 
configRes.String())
+       switch suffix {
+       case constants.TagRuleSuffix:
+               return processConfigUpsert[*meshresource.TagRouteResource](
+                       configRes, meshresource.ToTagRouteResource, 
z.storeRouter, z.emitter)
+       case constants.ConditionRuleSuffix:
+               return 
processConfigUpsert[*meshresource.ConditionRouteResource](
+                       configRes, meshresource.ToConditionRouteResource, 
z.storeRouter, z.emitter)
+       case constants.ConfiguratorsSuffix:
+               return processConfigUpsert[*meshresource.DynamicConfigResource](
+                       configRes, meshresource.ToDynamicConfigResource, 
z.storeRouter, z.emitter)
+       default:
+               return bizerror.New(bizerror.UnknownError,
+                       fmt.Sprintf("unknown rule type in mesh %s, skipped 
processing, path: %s, raw content: %s",
+                               configRes.Mesh, configRes.Spec.NodeName, 
configRes.Spec.NodeData))
+       }
+}
+
+func (z *ZKConfigEventSubscriber) processDelete(configRes 
*meshresource.ZKConfigResource) error {
+       parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator)
+       suffix := parts[len(parts)-1]

Review Comment:
   Potential index out of bounds error. The code accesses `parts[len(parts)-1]` 
without checking if the parts slice is empty. If NodeName is an empty string or 
doesn't contain the dot separator, this will cause a panic. Add a length check 
before accessing the array element.



##########
pkg/discovery/zk/listerwatcher/listerwatcher.go:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package listerwatcher
+
+import (
+       "fmt"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       k8sruntime "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher"
+)
+
+type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) 
coremodel.Resource
+
+type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource
+
+type ListerWatcher[T coremodel.Resource] struct {
+       basePath             string
+       rk                   coremodel.ResourceKind
+       conn                 *zk.Conn
+       cfg                  *discoverycfg.Config
+       toUpsertResourceFunc ToUpsertResourceFunc
+       toDeleteResourceFunc ToDeleteResourceFunc
+       newResourceFunc      coremodel.NewResourceFunc
+       newResListFunc       coremodel.NewResourceListFunc
+       resultChan           chan watch.Event
+       stopChan             chan bool
+}
+
+func NewListerWatcher(
+       rk coremodel.ResourceKind,
+       toResourceFunc ToUpsertResourceFunc,
+       toDeleteResourceFunc ToDeleteResourceFunc,
+       basePath string,
+       conn *zk.Conn,
+       cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) {
+       newResourceFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       newResListFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       return &ListerWatcher[coremodel.Resource]{
+               rk:                   rk,
+               toUpsertResourceFunc: toResourceFunc,
+               toDeleteResourceFunc: toDeleteResourceFunc,
+               basePath:             basePath,
+               conn:                 conn,
+               cfg:                  cfg,
+               newResourceFunc:      newResourceFunc,
+               newResListFunc:       newResListFunc,
+               resultChan:           make(chan watch.Event, 1000),
+               stopChan:             make(chan bool),
+       }, nil
+}
+
+func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, 
error) {
+       resList, err := lw.listRecursively(lw.basePath)
+       if err != nil {
+               logger.Errorf("list all %s under path %s in zk %s failed, 
cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err)
+               return nil, err
+       }
+       resListObj := lw.newResListFunc()
+       resListObj.SetItems(resList)
+       return resListObj, nil
+}
+
+func (lw *ListerWatcher[T]) listRecursively(path string) 
([]coremodel.Resource, error) {
+       resList := make([]coremodel.Resource, 0)
+       nodeData, _, err := lw.conn.Get(path)
+       if err != nil {
+               errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData))
+       if res != nil {
+               resList = append(resList, res)
+       }
+       children, _, err := lw.conn.Children(path)
+       if err != nil {
+               errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       if len(children) == 0 {
+               return resList, nil
+       }
+       for _, childPath := range children {
+               resources, err := lw.listRecursively(path + 
constants.PathSeparator + childPath)
+               if err != nil {
+                       return nil, err
+               }
+               resList = append(resList, resources...)
+       }
+       return resList, nil
+}
+
+func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, 
error) {
+       watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath)
+       go func() {
+               for {
+                       select {
+                       case event, ok := <-watcher.EventChan():
+                               if !ok {
+                                       logger.Warnf("zookeeper watcher 
stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr())
+                                       return
+                               }
+                               lw.handleEvent(event)
+                       case <-lw.stopChan:
+                               logger.Warnf("stop watching %s in %s,", 
lw.basePath, lw.zkAddr())
+                       }
+
+               }
+       }()
+       err := watcher.Start()
+       if err != nil {
+               return nil, err
+       }
+       return lw, nil

Review Comment:
   The watcher instance created in Watch() is never stopped or cleaned up. This 
will leak the ZooKeeper watcher and its goroutines. Consider storing the 
watcher reference in the ListerWatcher struct and calling its Stop() method in 
the ListerWatcher's Stop() method.



##########
pkg/discovery/zk/zkwatcher/watcher_test.go:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zkwatcher
+
+import (
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+func TestLocalhost(t *testing.T) {
+       zkServers := []string{"localhost:2181"}
+       basePath := "/services"
+       conn, _, err := zk.Connect(zkServers, time.Second*10)
+       if err != nil {
+               logger.Fatalf("Failed to connect to zookeeper: %v", err)
+       }
+       watcher := NewRecursiveWatcher(conn, basePath)
+
+       // Start listening
+       if err := watcher.Start(); err != nil {
+               logger.Fatalf("Failed to start watching: %v", err)
+       }
+
+       // Keep program running
+       select {
+       case <-watcher.stopChan:

Review Comment:
   The test directly accesses the unexported field `stopChan` which breaks 
encapsulation. The test should use the `Stop()` method instead to properly shut 
down the watcher.



##########
pkg/discovery/zk/listerwatcher/listerwatcher.go:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package listerwatcher
+
+import (
+       "fmt"
+
+       "github.com/dubbogo/go-zookeeper/zk"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       k8sruntime "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/common/constants"
+       discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher"
+)
+
+type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) 
coremodel.Resource
+
+type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource
+
+type ListerWatcher[T coremodel.Resource] struct {
+       basePath             string
+       rk                   coremodel.ResourceKind
+       conn                 *zk.Conn
+       cfg                  *discoverycfg.Config
+       toUpsertResourceFunc ToUpsertResourceFunc
+       toDeleteResourceFunc ToDeleteResourceFunc
+       newResourceFunc      coremodel.NewResourceFunc
+       newResListFunc       coremodel.NewResourceListFunc
+       resultChan           chan watch.Event
+       stopChan             chan bool
+}
+
+func NewListerWatcher(
+       rk coremodel.ResourceKind,
+       toResourceFunc ToUpsertResourceFunc,
+       toDeleteResourceFunc ToDeleteResourceFunc,
+       basePath string,
+       conn *zk.Conn,
+       cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) {
+       newResourceFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       newResListFunc, err := 
coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk)
+       if err != nil {
+               return nil, err
+       }
+       return &ListerWatcher[coremodel.Resource]{
+               rk:                   rk,
+               toUpsertResourceFunc: toResourceFunc,
+               toDeleteResourceFunc: toDeleteResourceFunc,
+               basePath:             basePath,
+               conn:                 conn,
+               cfg:                  cfg,
+               newResourceFunc:      newResourceFunc,
+               newResListFunc:       newResListFunc,
+               resultChan:           make(chan watch.Event, 1000),
+               stopChan:             make(chan bool),
+       }, nil
+}
+
+func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, 
error) {
+       resList, err := lw.listRecursively(lw.basePath)
+       if err != nil {
+               logger.Errorf("list all %s under path %s in zk %s failed, 
cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err)
+               return nil, err
+       }
+       resListObj := lw.newResListFunc()
+       resListObj.SetItems(resList)
+       return resListObj, nil
+}
+
+func (lw *ListerWatcher[T]) listRecursively(path string) 
([]coremodel.Resource, error) {
+       resList := make([]coremodel.Resource, 0)
+       nodeData, _, err := lw.conn.Get(path)
+       if err != nil {
+               errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData))
+       if res != nil {
+               resList = append(resList, res)
+       }
+       children, _, err := lw.conn.Children(path)
+       if err != nil {
+               errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: 
%s, cause: %v", lw.rk, path, lw.zkAddr(), err)
+               logger.Errorf(errStr)
+               return nil, bizerror.Wrap(err, bizerror.ZKError, errStr)
+       }
+       if len(children) == 0 {
+               return resList, nil
+       }
+       for _, childPath := range children {
+               resources, err := lw.listRecursively(path + 
constants.PathSeparator + childPath)
+               if err != nil {
+                       return nil, err
+               }
+               resList = append(resList, resources...)
+       }
+       return resList, nil
+}
+
+func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, 
error) {
+       watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath)
+       go func() {
+               for {
+                       select {
+                       case event, ok := <-watcher.EventChan():
+                               if !ok {
+                                       logger.Warnf("zookeeper watcher 
stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr())
+                                       return
+                               }
+                               lw.handleEvent(event)
+                       case <-lw.stopChan:
+                               logger.Warnf("stop watching %s in %s,", 
lw.basePath, lw.zkAddr())
+                       }

Review Comment:
   Potential goroutine leak in the Watch implementation. When the stopChan is 
triggered on line 136-137, the goroutine logs a message but doesn't return, 
causing it to continue the loop indefinitely. Add a `return` statement after 
the log message to properly exit the goroutine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to