Copilot commented on code in PR #1371: URL: https://github.com/apache/dubbo-admin/pull/1371#discussion_r2635260578
########## pkg/discovery/zk/listerwatcher/listerwatcher.go: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package listerwatcher + +import ( + "fmt" + + "github.com/dubbogo/go-zookeeper/zk" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher" +) + +type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) coremodel.Resource + +type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource + +type ListerWatcher[T coremodel.Resource] struct { + basePath string + rk coremodel.ResourceKind + conn *zk.Conn + cfg *discoverycfg.Config + toUpsertResourceFunc ToUpsertResourceFunc + toDeleteResourceFunc ToDeleteResourceFunc + newResourceFunc coremodel.NewResourceFunc + newResListFunc coremodel.NewResourceListFunc + resultChan chan watch.Event + stopChan chan bool +} + +func NewListerWatcher( + rk coremodel.ResourceKind, + toResourceFunc ToUpsertResourceFunc, + toDeleteResourceFunc ToDeleteResourceFunc, + basePath string, + conn *zk.Conn, + cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) { + newResourceFunc, err := coremodel.ResourceSchemaRegistry().NewResourceFunc(rk) + if err != nil { + return nil, err + } + newResListFunc, err := coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk) + if err != nil { + return nil, err + } + return &ListerWatcher[coremodel.Resource]{ + rk: rk, + toUpsertResourceFunc: toResourceFunc, + toDeleteResourceFunc: toDeleteResourceFunc, + basePath: basePath, + conn: conn, + cfg: cfg, + newResourceFunc: newResourceFunc, + newResListFunc: newResListFunc, + resultChan: make(chan watch.Event, 1000), Review Comment: The resultChan buffer size of 1000 (line 78) is arbitrary and could lead to blocking if watch events are produced faster than consumed by the client. Consider making this configurable or implementing backpressure handling. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// Start begin watching +func (rw *RecursiveWatcher) Start() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + close(rw.stopChan) + rw.conn.Close() + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + leafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + + // Re-watch data changes + go rw.watchDataChanges(path) + return + } else if event.Type == zk.EventNodeDeleted { + // Node deleted, stop watching + logger.Debugf("node deleted: %s", path) + rw.eventChan <- ZookeeperEvent{ + Type: NodeDeleted, + Path: path, + leafNode: stat.NumChildren == 0, + } + return + } + case <-rw.stopChan: + return + } + + // If node exists but no changes, continue watching + if stat != nil { + continue + } + } +} + +// watchChildrenChanges watch child node changes +func (rw *RecursiveWatcher) watchChildrenChanges(path string) { + logger.Debugf("Watching child node changes for path: %s", path) + for { + // Check if stopped + select { + case <-rw.stopChan: + return + default: + } + + // Get child nodes and set up watch + children, stat, watcher, err := rw.conn.ChildrenW(path) + if err != nil { + logger.Errorf("Failed to watch child node changes for path %s: %v", path, err) + time.Sleep(time.Second) + continue + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeChildrenChanged { + logger.Debugf("Child node list changed: %s", path) + + // Get current child nodes + newChildren, _, err := rw.conn.Children(path) + if err != nil { + logger.Debugf("Failed to get new child nodes: %v", err) + continue + } + // Calculate added and deleted child nodes + newChildSet := set.FromSlice(newChildren) + oldChildSet := set.FromSlice(children) + addedChildren := newChildSet.Minus(oldChildSet) + deletedChildren := oldChildSet.Minus(newChildSet) + + // watch newly added child nodes + for _, c := range addedChildren.ToSlice() { + childPath := path + "/" + c + logger.Debugf("New child node added: %s", childPath) + // Recursively watch new node + go func() { + err := rw.watchPathRecursively(childPath) + if err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + }() + } + + // Check for deleted child nodes + for _, c := range deletedChildren.ToSlice() { + logger.Debugf("child node %s of %s deleted", c, path) + } + + // Re-watch child node changes + go rw.watchChildrenChanges(path) Review Comment: Similarly, watchChildrenChanges spawns a new goroutine recursively (line 220) without tracking. This could lead to goroutine leaks if child node lists change frequently. The same issue applies to the goroutines spawned in lines 206-211 for newly added child nodes. ########## pkg/core/resource/apis/mesh/v1alpha1/rpc_instance_helper.go: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "encoding/json" + "strconv" + "time" + + set "github.com/duke-git/lancet/v2/datastructure/set" + "github.com/duke-git/lancet/v2/maputil" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +func ToRPCInstance( + mesh string, appName string, ip string, + port int64, metadata map[string]string) *RPCInstanceResource { + resName := BuildInstanceResName(appName, ip, port) Review Comment: The function builds an instance resource name using BuildInstanceResName but this function is not defined in the visible code. This creates a dependency that may not be clear. Ensure that BuildInstanceResName is properly exported and documented, especially since it's used in multiple places throughout the codebase. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// Start begin watching +func (rw *RecursiveWatcher) Start() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + close(rw.stopChan) + rw.conn.Close() + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + leafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + + // Re-watch data changes + go rw.watchDataChanges(path) + return Review Comment: The watchDataChanges function spawns a new goroutine recursively (line 142) without any mechanism to track or limit the number of goroutines. If a node's data changes frequently, this could lead to goroutine leaks and resource exhaustion. Consider using a single long-lived goroutine per path or implementing a goroutine pool. ########## pkg/config/discovery/config.go: ########## @@ -89,11 +78,17 @@ func (c *Config) PreProcess() error { } func (c *Config) Validate() error { + if strutil.IsBlank(c.ID) { + return bizerror.New(bizerror.InvalidArgument, "discovery id is needed") + } if strutil.IsBlank(c.Name) { return bizerror.New(bizerror.InvalidArgument, "discovery name is needed") } if strutil.IsBlank(string(c.Type)) { return bizerror.New(bizerror.InvalidArgument, "discovery type is needed") } + if strutil.IsBlank(c.Address.Registry) && c.Type != Mock { + return bizerror.New(bizerror.InvalidArgument, "discovery address is needed") + } Review Comment: The validation on line 90 checks for blank address only when Type is not Mock. However, in the PreProcess method (line 63), the same check is already performed. This creates inconsistency where PreProcess allows blank addresses for Mock type, but Validate enforces it for non-Mock types. Consider removing this duplicate validation or ensuring both methods have consistent behavior. ```suggestion ``` ########## pkg/discovery/zk/listerwatcher/listerwatcher.go: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package listerwatcher + +import ( + "fmt" + + "github.com/dubbogo/go-zookeeper/zk" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher" +) + +type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) coremodel.Resource + +type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource + +type ListerWatcher[T coremodel.Resource] struct { + basePath string + rk coremodel.ResourceKind + conn *zk.Conn + cfg *discoverycfg.Config + toUpsertResourceFunc ToUpsertResourceFunc + toDeleteResourceFunc ToDeleteResourceFunc + newResourceFunc coremodel.NewResourceFunc + newResListFunc coremodel.NewResourceListFunc + resultChan chan watch.Event + stopChan chan bool +} + +func NewListerWatcher( + rk coremodel.ResourceKind, + toResourceFunc ToUpsertResourceFunc, + toDeleteResourceFunc ToDeleteResourceFunc, + basePath string, + conn *zk.Conn, + cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) { + newResourceFunc, err := coremodel.ResourceSchemaRegistry().NewResourceFunc(rk) + if err != nil { + return nil, err + } + newResListFunc, err := coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk) + if err != nil { + return nil, err + } + return &ListerWatcher[coremodel.Resource]{ + rk: rk, + toUpsertResourceFunc: toResourceFunc, + toDeleteResourceFunc: toDeleteResourceFunc, + basePath: basePath, + conn: conn, + cfg: cfg, + newResourceFunc: newResourceFunc, + newResListFunc: newResListFunc, + resultChan: make(chan watch.Event, 1000), + stopChan: make(chan bool), + }, nil +} + +func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, error) { + resList, err := lw.listRecursively(lw.basePath) + if err != nil { + logger.Errorf("list all %s under path %s in zk %s failed, cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err) + return nil, err + } + resListObj := lw.newResListFunc() + resListObj.SetItems(resList) + return resListObj, nil +} + +func (lw *ListerWatcher[T]) listRecursively(path string) ([]coremodel.Resource, error) { + resList := make([]coremodel.Resource, 0) + nodeData, _, err := lw.conn.Get(path) + if err != nil { + errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData)) + if res != nil { + resList = append(resList, res) + } + children, _, err := lw.conn.Children(path) + if err != nil { + errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + if len(children) == 0 { + return resList, nil + } + for _, childPath := range children { + resources, err := lw.listRecursively(path + constants.PathSeparator + childPath) + if err != nil { + return nil, err + } + resList = append(resList, resources...) + } + return resList, nil +} + +func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, error) { + watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath) + go func() { + for { + select { + case event, ok := <-watcher.EventChan(): + if !ok { + logger.Warnf("zookeeper watcher stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr()) + return + } + lw.handleEvent(event) + case <-lw.stopChan: + logger.Warnf("stop watching %s in %s,", lw.basePath, lw.zkAddr()) + return + } + + } + }() + err := watcher.Start() + if err != nil { + return nil, err + } Review Comment: The Watch method starts the watcher after spawning a goroutine that consumes events from it. If watcher.Start() fails (lines 143-146), the goroutine at lines 127-142 will continue running and may encounter nil pointer dereferences or other issues since the watcher failed to start properly. The goroutine should be stopped if Start() fails, or the order should be reversed to start the watcher before spawning the consumer goroutine. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool Review Comment: The leafNode field in ZookeeperEvent struct is unexported (lowercase 'l'), making it inaccessible to consumers of this event. This appears to be unintentional as other fields are exported. Consider changing 'leafNode' to 'LeafNode' to make it consistent with other fields and accessible to external packages. ########## pkg/core/discovery/subscriber/zk_metadata.go: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subscriber + +import ( + "fmt" + "reflect" + "strings" + + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +type ZKMetadataEventSubscriber struct { + emitter events.Emitter + storeRouter store.Router +} + +func NewZKMetadataEventSubscriber(eventEmitter events.Emitter, storeRouter store.Router) *ZKMetadataEventSubscriber { + return &ZKMetadataEventSubscriber{ + emitter: eventEmitter, + storeRouter: storeRouter, + } +} + +func (z *ZKMetadataEventSubscriber) ResourceKind() coremodel.ResourceKind { + return meshresource.ZKMetadataKind +} + +func (z *ZKMetadataEventSubscriber) Name() string { + return "Discovery-" + z.ResourceKind().ToString() +} + +func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error { + newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource) + if !ok && event.NewObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(newObj), event.NewObj()) + } + oldObj, ok := event.OldObj().(*meshresource.ZKMetadataResource) + if !ok && event.OldObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(oldObj), event.OldObj()) + } + var processErr error + switch event.Type() { + case cache.Added, cache.Updated, cache.Replaced, cache.Sync: + if newObj == nil || newObj.Spec == nil { + errStr := "process zk metadata upsert event, but new obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = z.processUpsert(newObj) + case cache.Deleted: + if oldObj == nil { + errStr := "process zk metadata delete event, but old obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + // Metadata is an ephemeral znode, dubbo client only adds/updates the metadata znode and never deletes. + // And we can't identify the service only by the node path, so for delete event, we just ignored + logger.Infof("ignored zk metadata delete event") + } + if processErr != nil { + logger.Errorf("process zk metadata event failed, cause: %s, event: %s", processErr.Error(), event.String()) + return processErr + } + logger.Infof("process zk metadata event successfully, event: %s", event.String()) + return nil +} + +func (z *ZKMetadataEventSubscriber) processUpsert(metadataRes *meshresource.ZKMetadataResource) error { + paths := strings.Split(metadataRes.Spec.NodePath, constants.PathSeparator) + if len(paths) < 2 { + return bizerror.New(bizerror.ZKError, fmt.Sprintf("invalid zk metadata node path: %s", metadataRes.Spec.NodePath)) + } + if paths[len(paths)-2] == constants.ProviderSide { + return processMetadataUpsert[*meshresource.ServiceProviderMetadataResource]( + metadataRes, meshresource.ToServiceProviderMetadataRes, z.storeRouter, z.emitter) + } else if paths[len(paths)-2] == constants.ConsumerSide { + return processMetadataUpsert[*meshresource.ServiceConsumerMetadataResource]( + metadataRes, meshresource.ToServiceConsumerMetadataByRawData, z.storeRouter, z.emitter) + } + logger.Warnf("unknown metadata, node path: %s, node data: %s", metadataRes.Spec.NodePath, metadataRes.Spec.NodeData) + return nil +} + +// processMetadataUpsert handle service provider/consumer metadata upsert +func processMetadataUpsert[T coremodel.Resource]( + zkMetadataRes *meshresource.ZKMetadataResource, + toMetadataRes meshresource.ToMetadataResFunc, + router store.Router, + emitter events.Emitter) error { + newMetadataRes := toMetadataRes(zkMetadataRes.Mesh, zkMetadataRes.Spec.NodeData) + if newMetadataRes == nil { + logger.Errorf("cannot unmarshal metadata in zk %s, raw content: %s", zkMetadataRes.Mesh, zkMetadataRes.Spec.NodeData) + return bizerror.New(bizerror.ZKError, "cannot unmarshal metadata") + } + st, err := router.ResourceKindRoute(newMetadataRes.ResourceKind()) + if err != nil { + logger.Errorf("get %s store failed, cause: %s", newMetadataRes.ResourceKind(), err.Error()) + return err + } + oldRes, exists, err := st.GetByKey(newMetadataRes.ResourceKey()) + if err != nil { + logger.Errorf("get metadata %s from store failed, cause: %s", newMetadataRes.ResourceKey(), err.Error()) + return err + } + if !exists { + err := st.Add(newMetadataRes) + if err != nil { + logger.Errorf("add metadata %s to store failed, cause: %s", newMetadataRes.ResourceKey(), err.Error()) + return err + } + emitter.Send(events.NewResourceChangedEvent(cache.Added, nil, newMetadataRes)) + return nil Review Comment: The processUpsert function calls toMetadataRes which may return nil (line 114), but the subsequent code assumes newMetadataRes is non-nil when calling newMetadataRes.ResourceKind() on line 119 and newMetadataRes.ResourceKey() on lines 124, 126, etc. This could cause a nil pointer dereference panic. Add a nil check after line 118 before using newMetadataRes. ########## pkg/core/discovery/subscriber/zk_config.go: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subscriber + +import ( + "fmt" + "reflect" + "strings" + + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +type ZKConfigEventSubscriber struct { + emitter events.Emitter + storeRouter store.Router +} + +func NewZKConfigEventSubscriber(eventEmitter events.Emitter, storeRouter store.Router) *ZKConfigEventSubscriber { + return &ZKConfigEventSubscriber{ + emitter: eventEmitter, + storeRouter: storeRouter, + } +} + +func (z *ZKConfigEventSubscriber) ResourceKind() coremodel.ResourceKind { + return meshresource.ZKConfigKind +} + +func (z *ZKConfigEventSubscriber) Name() string { + return "Discovery-" + z.ResourceKind().ToString() +} + +func (z *ZKConfigEventSubscriber) ProcessEvent(event events.Event) error { + newObj, ok := event.NewObj().(*meshresource.ZKConfigResource) + if !ok && event.NewObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(newObj), event.NewObj()) + } + oldObj, ok := event.OldObj().(*meshresource.ZKConfigResource) + if !ok && event.OldObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(oldObj), event.OldObj()) + } + var processErr error + switch event.Type() { + case cache.Added, cache.Updated, cache.Replaced, cache.Sync: + if newObj == nil || newObj.Spec == nil { + errStr := "process zk config upsert event, but new obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = z.processUpsert(newObj) + case cache.Deleted: + if oldObj == nil { + errStr := "process zk config delete event, but old obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = z.processDelete(oldObj) + } + if processErr != nil { + logger.Errorf("process zk config event failed, cause: %s, event: %s", processErr.Error(), event.String()) + return processErr + } + logger.Infof("process zk config event successfully, event: %s", event.String()) + return nil +} + +func (z *ZKConfigEventSubscriber) processUpsert(configRes *meshresource.ZKConfigResource) error { + parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator) + suffix := parts[len(parts)-1] + if !constants.RuleSuffixSet.Contain(suffix) { + logger.Warnf("node name is not end with rule suffix, skipped processing, nodeName: %s", configRes.Spec.NodeName) + return nil + } + logger.Debugf("process zk config upsert event, config res: %s", configRes.String()) + switch suffix { + case constants.TagRuleSuffix: + return processConfigUpsert[*meshresource.TagRouteResource]( + configRes, meshresource.ToTagRouteResource, z.storeRouter, z.emitter) + case constants.ConditionRuleSuffix: + return processConfigUpsert[*meshresource.ConditionRouteResource]( + configRes, meshresource.ToConditionRouteResource, z.storeRouter, z.emitter) + case constants.ConfiguratorsSuffix: + return processConfigUpsert[*meshresource.DynamicConfigResource]( + configRes, meshresource.ToDynamicConfigResource, z.storeRouter, z.emitter) + default: + return bizerror.New(bizerror.UnknownError, + fmt.Sprintf("unknown rule type in mesh %s, skipped processing, path: %s, raw content: %s", + configRes.Mesh, configRes.Spec.NodeName, configRes.Spec.NodeData)) + } +} + +func (z *ZKConfigEventSubscriber) processDelete(configRes *meshresource.ZKConfigResource) error { + parts := strings.Split(configRes.Spec.NodeName, constants.DotSeparator) + suffix := parts[len(parts)-1] + if !constants.RuleSuffixSet.Contain(suffix) { + logger.Warnf("node name is not end with rule suffix, skipped processing, nodeName: %s", configRes.Spec.NodeName) + return nil + } + logger.Debugf("process zk config delete event, config res: %s", configRes.String()) + switch suffix { + case constants.TagRuleSuffix: + return processConfigDelete[*meshresource.TagRouteResource]( + configRes, meshresource.ToTagRouteResource, z.storeRouter, z.emitter) + case constants.ConditionRuleSuffix: + return processConfigDelete[*meshresource.ConditionRouteResource]( + configRes, meshresource.ToConditionRouteResource, z.storeRouter, z.emitter) + case constants.ConfiguratorsSuffix: + return processConfigDelete[*meshresource.DynamicConfigResource]( + configRes, meshresource.ToDynamicConfigResource, z.storeRouter, z.emitter) + default: + return bizerror.New(bizerror.UnknownError, + fmt.Sprintf("unknown rule type in mesh %s, skipped processing, node: %s", + configRes.Mesh, configRes.Spec.NodeName)) + } +} + +func processConfigUpsert[T coremodel.Resource]( + configRes *meshresource.ZKConfigResource, + toRuleRes meshresource.ToRuleResourceFunc, + router store.Router, + emitter events.Emitter) error { + newRuleRes := toRuleRes(configRes.Mesh, configRes.Name, configRes.Spec.NodeData) + if newRuleRes == nil { + logger.Errorf("cannot unmarshal config in zk %s, raw content: %s", configRes.Mesh, configRes.Spec.NodeData) + return bizerror.New(bizerror.ZKError, "cannot unmarshal rule") + } + st, err := router.ResourceKindRoute(newRuleRes.ResourceKind()) + if err != nil { + logger.Errorf("get %s store failed, cause: %s", newRuleRes.ResourceKind(), err.Error()) + return err + } + oldRes, exists, err := st.GetByKey(newRuleRes.ResourceKey()) + if err != nil { + logger.Errorf("get rule %s from store failed, cause: %s", newRuleRes.ResourceKey(), err.Error()) + return err + } + if !exists { + err := st.Add(newRuleRes) + if err != nil { + logger.Errorf("add rule %s to store failed, cause: %s", newRuleRes.ResourceKey(), err.Error()) + return err + } + emitter.Send(events.NewResourceChangedEvent(cache.Added, nil, newRuleRes)) + return nil Review Comment: The processConfigUpsert function calls toRuleRes which may return nil (line 145), but the subsequent code assumes newRuleRes is non-nil when calling newRuleRes.ResourceKind() on line 150 and newRuleRes.ResourceKey() on lines 155, 157, etc. This could cause a nil pointer dereference panic. The nil check on lines 146-149 only handles the error logging but doesn't prevent the code from continuing to use newRuleRes. ########## pkg/discovery/zk/factory.go: ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zk + +import ( + "encoding/json" + "net/url" + "strconv" + "strings" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + "github.com/duke-git/lancet/v2/strutil" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/controller" + "github.com/apache/dubbo-admin/pkg/core/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/listerwatcher" +) + +func init() { + discovery.RegisterListWatcherFactory(&Factory{}) +} + +type Factory struct { +} + +func (f *Factory) Support(typ discoverycfg.Type) bool { + return discoverycfg.Zookeeper == typ +} + +func (f *Factory) NewListWatchers(config *discoverycfg.Config) ([]controller.ResourceListerWatcher, error) { + address := config.Address.Registry + zkUrl, err := url.Parse(address) + if err != nil { + return nil, err + } + conn, _, err := zk.Connect([]string{zkUrl.Host}, time.Second*1, func(c *zk.Conn) { + c.SetLogger(&zkLogger{}) + }) + if err != nil { + logger.Fatalf("connect to %s failed," + address) + return nil, bizerror.Wrap(err, bizerror.ZKError, "connect to zookeeper failed, addr: "+address) + } + mappingLw, err := listerwatcher.NewListerWatcher( + meshresource.ServiceProviderMappingKind, + toUpsertMappingResource, + toDeleteMappingResource, + "/dubbo/mapping", + conn, + config, + ) + if err != nil { + return nil, err + } + //rpcInstanceLW, err := listerwatcher.NewListerWatcher( + // meshresource.RPCInstanceKind, + // toUpsertRPCInstanceResource, + // toDeleteRPCInstanceResource, + // "/services", + // conn, + // config, + //) + //if err != nil { + // return nil, err + //} + configLW, err := listerwatcher.NewListerWatcher( + meshresource.ZKConfigKind, + toUpsertZKConfigResource, + toDeleteZKConfigResource, + "/dubbo/config", + conn, + config, + ) + + metadataLW, err := listerwatcher.NewListerWatcher( + meshresource.ZKMetadataKind, + toUpsertZKMetadataResource, + toDeleteZKMetadataResource, + "/dubbo/metadata", + conn, + config, + ) Review Comment: Missing error handling after creating configLW and metadataLW (lines 88-104). If these calls return an error, it's silently ignored and execution continues to return the list including nil values. The error check pattern used for mappingLw (lines 74-76) should be applied here as well. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// Start begin watching +func (rw *RecursiveWatcher) Start() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + close(rw.stopChan) + rw.conn.Close() + close(rw.eventChan) Review Comment: The Stop method closes resources in a potentially unsafe order. Closing stopChan first (line 77) may cause goroutines to exit, but then the connection is closed (line 78) which could cause errors in those goroutines that are still processing. Finally, closing eventChan (line 79) while other goroutines might still be sending to it could cause a panic. The resources should be closed in the reverse order: first signal stop via stopChan, then close the connection after goroutines have exited, and finally close eventChan. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// Start begin watching +func (rw *RecursiveWatcher) Start() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + close(rw.stopChan) + rw.conn.Close() + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + leafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + + // Re-watch data changes + go rw.watchDataChanges(path) + return + } else if event.Type == zk.EventNodeDeleted { + // Node deleted, stop watching + logger.Debugf("node deleted: %s", path) + rw.eventChan <- ZookeeperEvent{ + Type: NodeDeleted, + Path: path, + leafNode: stat.NumChildren == 0, + } + return + } + case <-rw.stopChan: + return + } + + // If node exists but no changes, continue watching + if stat != nil { + continue + } + } +} + +// watchChildrenChanges watch child node changes +func (rw *RecursiveWatcher) watchChildrenChanges(path string) { + logger.Debugf("Watching child node changes for path: %s", path) + for { + // Check if stopped + select { + case <-rw.stopChan: + return + default: + } + + // Get child nodes and set up watch + children, stat, watcher, err := rw.conn.ChildrenW(path) + if err != nil { + logger.Errorf("Failed to watch child node changes for path %s: %v", path, err) + time.Sleep(time.Second) + continue + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeChildrenChanged { + logger.Debugf("Child node list changed: %s", path) + + // Get current child nodes + newChildren, _, err := rw.conn.Children(path) + if err != nil { + logger.Debugf("Failed to get new child nodes: %v", err) + continue + } + // Calculate added and deleted child nodes + newChildSet := set.FromSlice(newChildren) + oldChildSet := set.FromSlice(children) + addedChildren := newChildSet.Minus(oldChildSet) + deletedChildren := oldChildSet.Minus(newChildSet) + + // watch newly added child nodes + for _, c := range addedChildren.ToSlice() { + childPath := path + "/" + c + logger.Debugf("New child node added: %s", childPath) + // Recursively watch new node + go func() { + err := rw.watchPathRecursively(childPath) + if err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + }() + } + + // Check for deleted child nodes + for _, c := range deletedChildren.ToSlice() { + logger.Debugf("child node %s of %s deleted", c, path) + } + + // Re-watch child node changes + go rw.watchChildrenChanges(path) + return + } else if event.Type == zk.EventNodeDeleted { + // Parent node deleted, stop watching + logger.Debugf("parent node %s deleted, stop watching children changes", path) + return + } + case <-rw.stopChan: + return + } + + // If node exists but no changes, continue watching + if stat != nil { + continue + } + } +} + +// handleEvents handle events +func (rw *RecursiveWatcher) handleEvents() { + for event := range rw.eventChan { + logger.Infof("Event type: %s, Path: %s, leafNode: %v, Data: %s", event.Type, event.Path, event.leafNode, event.Data) + } +} Review Comment: The function handleEvents (lines 238-243) is defined but never called. This appears to be dead code that should either be removed or integrated into the watcher's lifecycle if event handling logic is needed. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + leafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), Review Comment: The eventChan buffer size of 1000 (line 63) is arbitrary and could lead to blocking if events are produced faster than consumed. Consider making this configurable or implementing backpressure handling to avoid blocking the watcher goroutines. ########## pkg/core/resource/apis/mesh/v1alpha1/service_helper.go: ########## @@ -2,6 +2,9 @@ package v1alpha1 import "github.com/apache/dubbo-admin/pkg/common/constants" -func BuildServiceKey(serviceName, version, group string) string { - return serviceName + constants.ColonSeparator + version + constants.ColonSeparator + group +// BuildServiceKey build service key +// {service}:{version}:{group}:{appName} +func BuildServiceKey(serviceName, version, group, appName string) string { + return serviceName + constants.ColonSeparator + version + + constants.ColonSeparator + group + constants.ColonSeparator + appName } Review Comment: The BuildServiceKey function signature changed to include an appName parameter (line 7), but the function documentation doesn't explain what happens when appName is empty or how it affects the service key format. Consider adding documentation to clarify the expected behavior and format of the returned key. ########## pkg/discovery/zk/listerwatcher/listerwatcher.go: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package listerwatcher + +import ( + "fmt" + + "github.com/dubbogo/go-zookeeper/zk" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher" +) + +type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) coremodel.Resource + +type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource + +type ListerWatcher[T coremodel.Resource] struct { + basePath string + rk coremodel.ResourceKind + conn *zk.Conn + cfg *discoverycfg.Config + toUpsertResourceFunc ToUpsertResourceFunc + toDeleteResourceFunc ToDeleteResourceFunc + newResourceFunc coremodel.NewResourceFunc + newResListFunc coremodel.NewResourceListFunc + resultChan chan watch.Event + stopChan chan bool +} + +func NewListerWatcher( + rk coremodel.ResourceKind, + toResourceFunc ToUpsertResourceFunc, + toDeleteResourceFunc ToDeleteResourceFunc, + basePath string, + conn *zk.Conn, + cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) { + newResourceFunc, err := coremodel.ResourceSchemaRegistry().NewResourceFunc(rk) + if err != nil { + return nil, err + } + newResListFunc, err := coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk) + if err != nil { + return nil, err + } + return &ListerWatcher[coremodel.Resource]{ + rk: rk, + toUpsertResourceFunc: toResourceFunc, + toDeleteResourceFunc: toDeleteResourceFunc, + basePath: basePath, + conn: conn, + cfg: cfg, + newResourceFunc: newResourceFunc, + newResListFunc: newResListFunc, + resultChan: make(chan watch.Event, 1000), + stopChan: make(chan bool), + }, nil +} + +func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, error) { + resList, err := lw.listRecursively(lw.basePath) + if err != nil { + logger.Errorf("list all %s under path %s in zk %s failed, cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err) + return nil, err + } + resListObj := lw.newResListFunc() + resListObj.SetItems(resList) + return resListObj, nil +} + +func (lw *ListerWatcher[T]) listRecursively(path string) ([]coremodel.Resource, error) { + resList := make([]coremodel.Resource, 0) + nodeData, _, err := lw.conn.Get(path) + if err != nil { + errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData)) + if res != nil { + resList = append(resList, res) + } + children, _, err := lw.conn.Children(path) + if err != nil { + errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + if len(children) == 0 { + return resList, nil + } + for _, childPath := range children { + resources, err := lw.listRecursively(path + constants.PathSeparator + childPath) + if err != nil { + return nil, err + } + resList = append(resList, resources...) + } + return resList, nil +} + +func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, error) { + watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath) + go func() { + for { + select { + case event, ok := <-watcher.EventChan(): + if !ok { + logger.Warnf("zookeeper watcher stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr()) + return + } + lw.handleEvent(event) + case <-lw.stopChan: + logger.Warnf("stop watching %s in %s,", lw.basePath, lw.zkAddr()) + return + } + + } + }() + err := watcher.Start() + if err != nil { + return nil, err + } + return lw, nil +} + +func (lw *ListerWatcher[T]) handleEvent(event zkwatcher.ZookeeperEvent) { + switch event.Type { + case zkwatcher.NodeCreated: + res := lw.toUpsertResourceFunc(lw.mesh(), event.Path, event.Data) + if res == nil { + logger.Warnf("skip creating resource, parse zookeeper node data to %s failed, path: %s, zk: %s, event: %v", + lw.rk, event.Path, lw.zkAddr(), event) + return + } + logger.Infof("%s added, rsKey: %s", lw.rk, res.ResourceKey()) + lw.resultChan <- watch.Event{ + Type: watch.Added, + Object: res, + } + case zkwatcher.NodeChanged: + res := lw.toUpsertResourceFunc(lw.mesh(), event.Path, event.Data) + if res == nil { + logger.Warnf("skip updating resource, parse zookeeper node data to %s failed, path: %s, zk: %s, event: %v", + lw.rk, event.Path, lw.zkAddr(), event) + return + } + logger.Infof("%s modified, rsKey: %s", lw.rk, res.ResourceKey()) + lw.resultChan <- watch.Event{ + Type: watch.Modified, + Object: res, + } + case zkwatcher.NodeDeleted: + res := lw.toDeleteResourceFunc(lw.mesh(), event.Path) + if res == nil { + logger.Warnf("skip deleting resource, parse zookeeper node data to %s failed, path: %s, zk: %s, event: %v", + lw.rk, event.Path, lw.zkAddr(), event) + return + } + logger.Infof("%s deleted, rsKey: %s", lw.rk, res.ResourceKey()) + lw.resultChan <- watch.Event{ + Type: watch.Deleted, + Object: res, + } + default: + logger.Warnf("unknown event type, event: %v", event) + } +} + +func (lw *ListerWatcher[T]) zkAddr() string { + return lw.cfg.Address.Registry +} + +func (lw *ListerWatcher[T]) mesh() string { + return lw.cfg.ID +} + +func (lw *ListerWatcher[T]) ResourceKind() coremodel.ResourceKind { + return lw.rk +} + +func (lw *ListerWatcher[T]) TransformFunc() cache.TransformFunc { + return nil +} + +func (lw *ListerWatcher[T]) Stop() { + lw.stopChan <- true Review Comment: The Stop method sends a value to stopChan (line 210) which is unbuffered. If no goroutine is listening on stopChan at the time Stop is called, this will block indefinitely. Consider making stopChan buffered or using close(stopChan) instead of sending a value. ########## pkg/discovery/zk/factory.go: ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zk + +import ( + "encoding/json" + "net/url" + "strconv" + "strings" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + "github.com/duke-git/lancet/v2/strutil" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/controller" + "github.com/apache/dubbo-admin/pkg/core/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/listerwatcher" +) + +func init() { + discovery.RegisterListWatcherFactory(&Factory{}) +} + +type Factory struct { +} + +func (f *Factory) Support(typ discoverycfg.Type) bool { + return discoverycfg.Zookeeper == typ +} + +func (f *Factory) NewListWatchers(config *discoverycfg.Config) ([]controller.ResourceListerWatcher, error) { + address := config.Address.Registry + zkUrl, err := url.Parse(address) + if err != nil { + return nil, err + } + conn, _, err := zk.Connect([]string{zkUrl.Host}, time.Second*1, func(c *zk.Conn) { + c.SetLogger(&zkLogger{}) + }) + if err != nil { + logger.Fatalf("connect to %s failed," + address) + return nil, bizerror.Wrap(err, bizerror.ZKError, "connect to zookeeper failed, addr: "+address) + } + mappingLw, err := listerwatcher.NewListerWatcher( + meshresource.ServiceProviderMappingKind, + toUpsertMappingResource, + toDeleteMappingResource, + "/dubbo/mapping", + conn, + config, + ) + if err != nil { + return nil, err + } + //rpcInstanceLW, err := listerwatcher.NewListerWatcher( + // meshresource.RPCInstanceKind, + // toUpsertRPCInstanceResource, + // toDeleteRPCInstanceResource, + // "/services", + // conn, + // config, + //) + //if err != nil { + // return nil, err + //} + configLW, err := listerwatcher.NewListerWatcher( + meshresource.ZKConfigKind, + toUpsertZKConfigResource, + toDeleteZKConfigResource, + "/dubbo/config", + conn, + config, + ) + + metadataLW, err := listerwatcher.NewListerWatcher( + meshresource.ZKMetadataKind, + toUpsertZKMetadataResource, + toDeleteZKMetadataResource, + "/dubbo/metadata", + conn, + config, + ) + + return []controller.ResourceListerWatcher{ + //listerwatcher.NewRPCInstanceListerWatcher(conn, config), + mappingLw, + //rpcInstanceLW, + configLW, + metadataLW, + }, nil Review Comment: The same zookeeper connection (conn) is being reused for multiple ListerWatchers (lines 66-112). If any of the ListerWatcher's Stop method closes the connection (which the zkwatcher.RecursiveWatcher.Stop does on line 78 of watcher.go), it will affect all other ListerWatchers sharing the same connection, causing them to fail. Each ListerWatcher should either have its own connection, or the connection lifecycle should be managed separately from the watcher lifecycle. ########## pkg/discovery/zk/factory.go: ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zk + +import ( + "encoding/json" + "net/url" + "strconv" + "strings" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + "github.com/duke-git/lancet/v2/strutil" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/controller" + "github.com/apache/dubbo-admin/pkg/core/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/listerwatcher" +) + +func init() { + discovery.RegisterListWatcherFactory(&Factory{}) +} + +type Factory struct { +} + +func (f *Factory) Support(typ discoverycfg.Type) bool { + return discoverycfg.Zookeeper == typ +} + +func (f *Factory) NewListWatchers(config *discoverycfg.Config) ([]controller.ResourceListerWatcher, error) { + address := config.Address.Registry + zkUrl, err := url.Parse(address) + if err != nil { + return nil, err + } + conn, _, err := zk.Connect([]string{zkUrl.Host}, time.Second*1, func(c *zk.Conn) { + c.SetLogger(&zkLogger{}) + }) + if err != nil { + logger.Fatalf("connect to %s failed," + address) Review Comment: The error message on line 63 uses string concatenation ("connect to %s failed," + address) which results in incorrect formatting. The message will be "connect to %s failed,nacos://..." instead of properly formatting the address. Either use fmt.Sprintf or include the address in the format string like "connect to %s failed". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
