Copilot commented on code in PR #1371: URL: https://github.com/apache/dubbo-admin/pull/1371#discussion_r2635030086
########## pkg/discovery/zk/zkwatcher/watcher_test.go: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "fmt" + "testing" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +func TestLocalhost(t *testing.T) { + zkServers := []string{"localhost:2181"} + basePath := "/services" + conn, _, err := zk.Connect(zkServers, time.Second*10) + if err != nil { + logger.Fatalf("Failed to connect to zookeeper: %v", err) + } + watcher := NewRecursiveWatcher(conn, basePath) + + // Start listening + if err := watcher.Start(); err != nil { + logger.Fatalf("Failed to start watching: %v", err) Review Comment: The test uses `logger.Fatalf` which will exit the entire test process instead of properly failing the test. Use `t.Fatalf` instead to properly report test failures within the testing framework. ########## pkg/discovery/zk/zkwatcher/watcher_test.go: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "fmt" + "testing" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +func TestLocalhost(t *testing.T) { Review Comment: The test name "TestLocalhost" is not descriptive of what is being tested. Consider renaming to something like "TestRecursiveWatcher" or "TestRecursiveWatcherBasicFunctionality" to better describe the test's purpose. ########## pkg/core/discovery/subscriber/zk_metadata.go: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subscriber + +import ( + "reflect" + "strings" + + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +type ZKMetadataEventSubscriber struct { + emitter events.Emitter + storeRouter store.Router +} + +func NewZKMetadataEventSubscriber(eventEmitter events.Emitter, storeRouter store.Router) *ZKMetadataEventSubscriber { + return &ZKMetadataEventSubscriber{ + emitter: eventEmitter, + storeRouter: storeRouter, + } +} + +func (z *ZKMetadataEventSubscriber) ResourceKind() coremodel.ResourceKind { + return meshresource.ZKMetadataKind +} + +func (z *ZKMetadataEventSubscriber) Name() string { + return "Discovery-" + z.ResourceKind().ToString() +} + +func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error { + newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource) + if !ok && event.NewObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(newObj), event.NewObj()) + } + oldObj, ok := event.OldObj().(*meshresource.ZKMetadataResource) + if !ok && event.OldObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(oldObj), event.OldObj()) + } + var processErr error + switch event.Type() { + case cache.Added, cache.Updated, cache.Replaced, cache.Sync: + if newObj == nil || newObj.Spec == nil { + errStr := "process zk metadata upsert event, but new obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = z.processUpsert(newObj) + case cache.Deleted: + if oldObj == nil { + errStr := "process zk metadata delete event, but old obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + // metadata is an ephemeral znode, dubbo client only adds/updates the metadata znode and never deletes. + // Plus, we can't identify the service only by the node path + logger.Infof("ignored zk metadata delete event") + } + if processErr != nil { + logger.Errorf("process zk metadata event failed, cause: %s, event: %s", processErr.Error(), event.String()) + return processErr + } + logger.Infof("process zk metadata event successfully, event: %s", event.String()) + return nil +} + +func (z *ZKMetadataEventSubscriber) processUpsert(metadataRes *meshresource.ZKMetadataResource) error { + paths := strings.Split(metadataRes.Spec.NodePath, constants.PathSeparator) + if paths[len(paths)-2] == constants.ProviderSide { + return processMetadataUpsert[*meshresource.ServiceProviderMetadataResource]( + metadataRes, meshresource.ToServiceProviderMetadataRes, z.storeRouter, z.emitter) + } else if paths[len(paths)-2] == constants.ConsumerSide { + return processMetadataUpsert[*meshresource.ServiceConsumerMetadataResource]( + metadataRes, meshresource.ToServiceConsumerMetadataByRawData, z.storeRouter, z.emitter) + } Review Comment: Potential index out of bounds error. The code accesses `paths[len(paths)-2]` without checking if the paths slice has at least 2 elements. If the NodePath has fewer path segments, this will cause a panic. Add a length check before accessing the array element. ########## pkg/discovery/zk/zkwatcher/watcher_test.go: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "fmt" + "testing" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +func TestLocalhost(t *testing.T) { + zkServers := []string{"localhost:2181"} + basePath := "/services" + conn, _, err := zk.Connect(zkServers, time.Second*10) + if err != nil { + logger.Fatalf("Failed to connect to zookeeper: %v", err) + } + watcher := NewRecursiveWatcher(conn, basePath) + + // Start listening + if err := watcher.Start(); err != nil { + logger.Fatalf("Failed to start watching: %v", err) + } + + // Keep program running + select { + case <-watcher.stopChan: + fmt.Println("Watcher stopped") + } +} Review Comment: This test lacks proper cleanup. The ZooKeeper connection and watcher are never properly closed, which will leak resources. Add a `defer watcher.Stop()` after line 37 to ensure cleanup happens even if the test fails. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + wg sync.WaitGroup + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// Start begin watching +func (rw *RecursiveWatcher) Start() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + close(rw.stopChan) + rw.wg.Wait() + rw.conn.Close() + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + leafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + rw.eventChan <- ZookeeperEvent{ + Type: NodeChanged, + Path: path, + Data: string(data), + leafNode: stat.NumChildren == 0, + } Review Comment: Using stale data after a node change event. When emitting the NodeChanged event, the code uses the old `data` and `stat` values that were captured before the event occurred. This should fetch fresh data after detecting the change to ensure the event contains the updated node data. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + wg sync.WaitGroup + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// Start begin watching +func (rw *RecursiveWatcher) Start() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + close(rw.stopChan) + rw.wg.Wait() + rw.conn.Close() + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) Review Comment: Goroutines started in watchPathRecursively are not tracked. The sync.WaitGroup field `wg` exists in the RecursiveWatcher struct but is never used to track these goroutines, which means Stop() won't wait for them to complete. Add `rw.wg.Add(1)` before spawning goroutines and `defer rw.wg.Done()` in the goroutine functions to properly track and wait for all goroutines during shutdown. ########## pkg/core/discovery/subscriber/zk_metadata.go: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subscriber + +import ( + "reflect" + "strings" + + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +type ZKMetadataEventSubscriber struct { + emitter events.Emitter + storeRouter store.Router +} + +func NewZKMetadataEventSubscriber(eventEmitter events.Emitter, storeRouter store.Router) *ZKMetadataEventSubscriber { + return &ZKMetadataEventSubscriber{ + emitter: eventEmitter, + storeRouter: storeRouter, + } +} + +func (z *ZKMetadataEventSubscriber) ResourceKind() coremodel.ResourceKind { + return meshresource.ZKMetadataKind +} + +func (z *ZKMetadataEventSubscriber) Name() string { + return "Discovery-" + z.ResourceKind().ToString() +} + +func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error { + newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource) + if !ok && event.NewObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(newObj), event.NewObj()) + } + oldObj, ok := event.OldObj().(*meshresource.ZKMetadataResource) + if !ok && event.OldObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(oldObj), event.OldObj()) + } + var processErr error + switch event.Type() { + case cache.Added, cache.Updated, cache.Replaced, cache.Sync: + if newObj == nil || newObj.Spec == nil { + errStr := "process zk metadata upsert event, but new obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = z.processUpsert(newObj) + case cache.Deleted: + if oldObj == nil { + errStr := "process zk metadata delete event, but old obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + // metadata is an ephemeral znode, dubbo client only adds/updates the metadata znode and never deletes. + // Plus, we can't identify the service only by the node path + logger.Infof("ignored zk metadata delete event") + } + if processErr != nil { + logger.Errorf("process zk metadata event failed, cause: %s, event: %s", processErr.Error(), event.String()) + return processErr + } + logger.Infof("process zk metadata event successfully, event: %s", event.String()) + return nil +} + +func (z *ZKMetadataEventSubscriber) processUpsert(metadataRes *meshresource.ZKMetadataResource) error { + paths := strings.Split(metadataRes.Spec.NodePath, constants.PathSeparator) + if paths[len(paths)-2] == constants.ProviderSide { + return processMetadataUpsert[*meshresource.ServiceProviderMetadataResource]( + metadataRes, meshresource.ToServiceProviderMetadataRes, z.storeRouter, z.emitter) + } else if paths[len(paths)-2] == constants.ConsumerSide { + return processMetadataUpsert[*meshresource.ServiceConsumerMetadataResource]( + metadataRes, meshresource.ToServiceConsumerMetadataByRawData, z.storeRouter, z.emitter) + } + logger.Warnf("unkonwn metadata, node path: %s, node data: %s", metadataRes.Spec.NodePath, metadataRes.Spec.NodeData) Review Comment: Typo in log message: "unkonwn" should be "unknown". ```suggestion logger.Warnf("unknown metadata, node path: %s, node data: %s", metadataRes.Spec.NodePath, metadataRes.Spec.NodeData) ``` ########## pkg/core/discovery/subscriber/zk_config.go: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subscriber + +import ( + "fmt" + "reflect" + "strings" + + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +type ZKConfigEventSubscriber struct { + emitter events.Emitter + storeRouter store.Router +} + +func NewZKConfigEventSubscriber(eventEmitter events.Emitter, storeRouter store.Router) *ZKConfigEventSubscriber { + return &ZKConfigEventSubscriber{ + emitter: eventEmitter, + storeRouter: storeRouter, + } +} + +func (z *ZKConfigEventSubscriber) ResourceKind() coremodel.ResourceKind { + return meshresource.ZKConfigKind +} + +func (z *ZKConfigEventSubscriber) Name() string { + return "Discovery-" + z.ResourceKind().ToString() +} + +func (z *ZKConfigEventSubscriber) ProcessEvent(event events.Event) error { + newObj, ok := event.NewObj().(*meshresource.ZKConfigResource) + if !ok && event.NewObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(newObj), event.NewObj()) + } + oldObj, ok := event.OldObj().(*meshresource.ZKConfigResource) + if !ok && event.OldObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(oldObj), event.OldObj()) + } + var processErr error + switch event.Type() { + case cache.Added, cache.Updated, cache.Replaced, cache.Sync: + if newObj == nil || newObj.Spec == nil { + errStr := "process zk config upsert event, but new obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = z.processUpsert(newObj) + case cache.Deleted: + if oldObj == nil { + errStr := "process zk config delete event, but old obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = z.processDelete(oldObj) + } + if processErr != nil { + logger.Errorf("process zk config event failed, cause: %s, event: %s", processErr.Error(), event.String()) + return processErr + } + logger.Infof("process zk config event successfully, event: %s", event.String()) + return nil +} + +func (z *ZKConfigEventSubscriber) processUpsert(configRes *meshresource.ZKConfigResource) error { + parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator) + suffix := parts[len(parts)-1] + if !constants.RuleSuffixSet.Contain(suffix) { + logger.Warnf("node name is not end with rule suffix, skipped processing, nodeName: %s", configRes.Spec.NodeName) + return nil + } + logger.Debugf("process zk config upsert event, config res: %s", configRes.String()) + switch suffix { + case constants.TagRuleSuffix: + return processConfigUpsert[*meshresource.TagRouteResource]( + configRes, meshresource.ToTagRouteResource, z.storeRouter, z.emitter) + case constants.ConditionRuleSuffix: + return processConfigUpsert[*meshresource.ConditionRouteResource]( + configRes, meshresource.ToConditionRouteResource, z.storeRouter, z.emitter) + case constants.ConfiguratorsSuffix: + return processConfigUpsert[*meshresource.DynamicConfigResource]( + configRes, meshresource.ToDynamicConfigResource, z.storeRouter, z.emitter) + default: + return bizerror.New(bizerror.UnknownError, + fmt.Sprintf("unknown rule type in mesh %s, skipped processing, path: %s, raw content: %s", + configRes.Mesh, configRes.Spec.NodeName, configRes.Spec.NodeData)) + } +} + +func (z *ZKConfigEventSubscriber) processDelete(configRes *meshresource.ZKConfigResource) error { + parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator) + suffix := parts[len(parts)-1] Review Comment: Potential index out of bounds error. The code accesses `parts[len(parts)-1]` without checking if the parts slice is empty. If NodeName is an empty string or doesn't contain the dot separator, this will cause a panic. Add a length check before accessing the array element. ########## pkg/discovery/zk/listerwatcher/listerwatcher.go: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package listerwatcher + +import ( + "fmt" + + "github.com/dubbogo/go-zookeeper/zk" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher" +) + +type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) coremodel.Resource + +type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource + +type ListerWatcher[T coremodel.Resource] struct { + basePath string + rk coremodel.ResourceKind + conn *zk.Conn + cfg *discoverycfg.Config + toUpsertResourceFunc ToUpsertResourceFunc + toDeleteResourceFunc ToDeleteResourceFunc + newResourceFunc coremodel.NewResourceFunc + newResListFunc coremodel.NewResourceListFunc + resultChan chan watch.Event + stopChan chan bool +} + +func NewListerWatcher( + rk coremodel.ResourceKind, + toResourceFunc ToUpsertResourceFunc, + toDeleteResourceFunc ToDeleteResourceFunc, + basePath string, + conn *zk.Conn, + cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) { + newResourceFunc, err := coremodel.ResourceSchemaRegistry().NewResourceFunc(rk) + if err != nil { + return nil, err + } + newResListFunc, err := coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk) + if err != nil { + return nil, err + } + return &ListerWatcher[coremodel.Resource]{ + rk: rk, + toUpsertResourceFunc: toResourceFunc, + toDeleteResourceFunc: toDeleteResourceFunc, + basePath: basePath, + conn: conn, + cfg: cfg, + newResourceFunc: newResourceFunc, + newResListFunc: newResListFunc, + resultChan: make(chan watch.Event, 1000), + stopChan: make(chan bool), + }, nil +} + +func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, error) { + resList, err := lw.listRecursively(lw.basePath) + if err != nil { + logger.Errorf("list all %s under path %s in zk %s failed, cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err) + return nil, err + } + resListObj := lw.newResListFunc() + resListObj.SetItems(resList) + return resListObj, nil +} + +func (lw *ListerWatcher[T]) listRecursively(path string) ([]coremodel.Resource, error) { + resList := make([]coremodel.Resource, 0) + nodeData, _, err := lw.conn.Get(path) + if err != nil { + errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData)) + if res != nil { + resList = append(resList, res) + } + children, _, err := lw.conn.Children(path) + if err != nil { + errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + if len(children) == 0 { + return resList, nil + } + for _, childPath := range children { + resources, err := lw.listRecursively(path + constants.PathSeparator + childPath) + if err != nil { + return nil, err + } + resList = append(resList, resources...) + } + return resList, nil +} + +func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, error) { + watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath) + go func() { + for { + select { + case event, ok := <-watcher.EventChan(): + if !ok { + logger.Warnf("zookeeper watcher stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr()) + return + } + lw.handleEvent(event) + case <-lw.stopChan: + logger.Warnf("stop watching %s in %s,", lw.basePath, lw.zkAddr()) + } + + } + }() + err := watcher.Start() + if err != nil { + return nil, err + } + return lw, nil Review Comment: The watcher instance created in Watch() is never stopped or cleaned up. This will leak the ZooKeeper watcher and its goroutines. Consider storing the watcher reference in the ListerWatcher struct and calling its Stop() method in the ListerWatcher's Stop() method. ########## pkg/discovery/zk/zkwatcher/watcher_test.go: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "fmt" + "testing" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +func TestLocalhost(t *testing.T) { + zkServers := []string{"localhost:2181"} + basePath := "/services" + conn, _, err := zk.Connect(zkServers, time.Second*10) + if err != nil { + logger.Fatalf("Failed to connect to zookeeper: %v", err) + } + watcher := NewRecursiveWatcher(conn, basePath) + + // Start listening + if err := watcher.Start(); err != nil { + logger.Fatalf("Failed to start watching: %v", err) + } + + // Keep program running + select { + case <-watcher.stopChan: Review Comment: The test directly accesses the unexported field `stopChan` which breaks encapsulation. The test should use the `Stop()` method instead to properly shut down the watcher. ########## pkg/discovery/zk/listerwatcher/listerwatcher.go: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package listerwatcher + +import ( + "fmt" + + "github.com/dubbogo/go-zookeeper/zk" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher" +) + +type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) coremodel.Resource + +type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource + +type ListerWatcher[T coremodel.Resource] struct { + basePath string + rk coremodel.ResourceKind + conn *zk.Conn + cfg *discoverycfg.Config + toUpsertResourceFunc ToUpsertResourceFunc + toDeleteResourceFunc ToDeleteResourceFunc + newResourceFunc coremodel.NewResourceFunc + newResListFunc coremodel.NewResourceListFunc + resultChan chan watch.Event + stopChan chan bool +} + +func NewListerWatcher( + rk coremodel.ResourceKind, + toResourceFunc ToUpsertResourceFunc, + toDeleteResourceFunc ToDeleteResourceFunc, + basePath string, + conn *zk.Conn, + cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) { + newResourceFunc, err := coremodel.ResourceSchemaRegistry().NewResourceFunc(rk) + if err != nil { + return nil, err + } + newResListFunc, err := coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk) + if err != nil { + return nil, err + } + return &ListerWatcher[coremodel.Resource]{ + rk: rk, + toUpsertResourceFunc: toResourceFunc, + toDeleteResourceFunc: toDeleteResourceFunc, + basePath: basePath, + conn: conn, + cfg: cfg, + newResourceFunc: newResourceFunc, + newResListFunc: newResListFunc, + resultChan: make(chan watch.Event, 1000), + stopChan: make(chan bool), + }, nil +} + +func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, error) { + resList, err := lw.listRecursively(lw.basePath) + if err != nil { + logger.Errorf("list all %s under path %s in zk %s failed, cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err) + return nil, err + } + resListObj := lw.newResListFunc() + resListObj.SetItems(resList) + return resListObj, nil +} + +func (lw *ListerWatcher[T]) listRecursively(path string) ([]coremodel.Resource, error) { + resList := make([]coremodel.Resource, 0) + nodeData, _, err := lw.conn.Get(path) + if err != nil { + errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData)) + if res != nil { + resList = append(resList, res) + } + children, _, err := lw.conn.Children(path) + if err != nil { + errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + if len(children) == 0 { + return resList, nil + } + for _, childPath := range children { + resources, err := lw.listRecursively(path + constants.PathSeparator + childPath) + if err != nil { + return nil, err + } + resList = append(resList, resources...) + } + return resList, nil +} + +func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, error) { + watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath) + go func() { + for { + select { + case event, ok := <-watcher.EventChan(): + if !ok { + logger.Warnf("zookeeper watcher stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr()) + return + } + lw.handleEvent(event) + case <-lw.stopChan: + logger.Warnf("stop watching %s in %s,", lw.basePath, lw.zkAddr()) + } Review Comment: Potential goroutine leak in the Watch implementation. When the stopChan is triggered on line 136-137, the goroutine logs a message but doesn't return, causing it to continue the loop indefinitely. Add a `return` statement after the log message to properly exit the goroutine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
