Code-Fight commented on code in PR #779:
URL:
https://github.com/apache/incubator-seata-go/pull/779#discussion_r2311905000
##########
pkg/discovery/zk.go:
##########
@@ -14,17 +14,247 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package discovery
-type ZkRegistryService struct{}
+import (
+ "fmt"
+ "github.com/go-zookeeper/zk"
+ "seata.apache.org/seata-go/pkg/discovery/mock"
+ "seata.apache.org/seata-go/pkg/util/log"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+// zkConnAdapter wraps a real *zk.Conn to implement ZkConnInterface.
+type zkConnAdapter struct {
+ conn *zk.Conn
+}
+
+func (a *zkConnAdapter) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event,
error) {
+ return a.conn.GetW(path)
+}
+
+func (a *zkConnAdapter) Get(path string) ([]byte, *zk.Stat, error) {
+ return a.conn.Get(path)
+}
+
+func (a *zkConnAdapter) Close() error {
+ a.conn.Close()
+ return nil
+}
+
+const (
+ zookeeperClusterPrefix = "/registry-seata"
+)
+
+type ZookeeperRegistryService struct {
+ conn mock.ZkConnInterface
+ vgroupMapping map[string]string
+ grouplist map[string][]*ServiceInstance
+ rwLock sync.RWMutex
+ stopCh chan struct{}
+}
+
+func newZookeeperRegistryService(config *ServiceConfig, zkConfig
*ZookeeperConfig) RegistryService {
+ if zkConfig == nil {
+ log.Fatalf("zookeeper config is nil")
+ panic("zookeeper config is nil")
+ }
+
+ // Connect to the actual *zk.Conn
+ conn, _, err := zk.Connect([]string{zkConfig.ServerAddr},
zkConfig.SessionTimeout)
+ if err != nil {
+ log.Fatalf("failed to create zookeeper client")
+ panic("failed to create zookeeper client")
+ }
+
+ // Wrap it into an adapter
+ adapter := &zkConnAdapter{conn: conn}
+
+ vgroupMapping := config.VgroupMapping
+
+ // init groplist
+ grouplist, err := initFromServiceConfig(config)
+ if err != nil {
+ log.Errorf("Error initializing service config: %v", err)
+ return nil
+ }
+
+ zkRegistryService := &ZookeeperRegistryService{
+ conn: adapter,
+ vgroupMapping: vgroupMapping,
+ grouplist: grouplist,
+ stopCh: make(chan struct{}),
+ }
+
+ //go zkRegistryService.watch(zookeeperClusterPrefix)
+ go zkRegistryService.watch(zkConfig.NodePath)
+
+ return zkRegistryService
+}
+
+func (s *ZookeeperRegistryService) watch(path string) {
+ for {
+ // Get initial data and set the watch
+ data, _, events, err := s.conn.GetW(path)
+ if err != nil {
+ log.Infof("Failed to get server instances from
Zookeeper: %v", err)
+ return
Review Comment:
Here, if an error occurs, the entire watch will just end. Should we consider
adding a retry mechanism?
```
for {
data, _, events, err := s.conn.GetW(path)
if err != nil {
if retries < maxRetries {
retries++
time.Sleep(time.Second * time.Duration(retries))
continue
}
log.Errorf("Failed to watch after %d retries: %v", maxRetries,
err)
return
}
retries = 0 // Reset the retry count.
// ...
}
```
##########
pkg/discovery/zk.go:
##########
@@ -14,17 +14,247 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package discovery
-type ZkRegistryService struct{}
+import (
+ "fmt"
+ "github.com/go-zookeeper/zk"
+ "seata.apache.org/seata-go/pkg/discovery/mock"
+ "seata.apache.org/seata-go/pkg/util/log"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+// zkConnAdapter wraps a real *zk.Conn to implement ZkConnInterface.
+type zkConnAdapter struct {
+ conn *zk.Conn
+}
+
+func (a *zkConnAdapter) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event,
error) {
+ return a.conn.GetW(path)
+}
+
+func (a *zkConnAdapter) Get(path string) ([]byte, *zk.Stat, error) {
+ return a.conn.Get(path)
+}
+
+func (a *zkConnAdapter) Close() error {
+ a.conn.Close()
+ return nil
+}
+
+const (
+ zookeeperClusterPrefix = "/registry-seata"
+)
+
+type ZookeeperRegistryService struct {
+ conn mock.ZkConnInterface
+ vgroupMapping map[string]string
+ grouplist map[string][]*ServiceInstance
+ rwLock sync.RWMutex
+ stopCh chan struct{}
+}
+
+func newZookeeperRegistryService(config *ServiceConfig, zkConfig
*ZookeeperConfig) RegistryService {
+ if zkConfig == nil {
+ log.Fatalf("zookeeper config is nil")
+ panic("zookeeper config is nil")
+ }
+
+ // Connect to the actual *zk.Conn
+ conn, _, err := zk.Connect([]string{zkConfig.ServerAddr},
zkConfig.SessionTimeout)
+ if err != nil {
+ log.Fatalf("failed to create zookeeper client")
+ panic("failed to create zookeeper client")
+ }
+
+ // Wrap it into an adapter
+ adapter := &zkConnAdapter{conn: conn}
+
+ vgroupMapping := config.VgroupMapping
+
+ // init groplist
+ grouplist, err := initFromServiceConfig(config)
+ if err != nil {
+ log.Errorf("Error initializing service config: %v", err)
+ return nil
+ }
+
+ zkRegistryService := &ZookeeperRegistryService{
+ conn: adapter,
+ vgroupMapping: vgroupMapping,
+ grouplist: grouplist,
+ stopCh: make(chan struct{}),
+ }
+
+ //go zkRegistryService.watch(zookeeperClusterPrefix)
+ go zkRegistryService.watch(zkConfig.NodePath)
+
+ return zkRegistryService
+}
+
+func (s *ZookeeperRegistryService) watch(path string) {
+ for {
+ // Get initial data and set the watch
+ data, _, events, err := s.conn.GetW(path)
+ if err != nil {
+ log.Infof("Failed to get server instances from
Zookeeper: %v", err)
+ return
+ }
-func (s *ZkRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
- //TODO implement me
- panic("implement me")
+ // Handle initial data
+ s.handleZookeeperData(path, data)
+
+ // Listen for changes to Zookeeper nodes
+ for {
+ select {
+ case event := <-events:
+ // Re-establish the watch to continue listening
for changes
+ data, _, events, err = s.conn.GetW(path)
+ if err != nil {
+ log.Errorf("Failed to set watch on
Zookeeper node: %v", err)
+ return
+ }
+ switch event.Type {
+ case zk.EventNodeCreated,
zk.EventNodeDataChanged:
+ log.Infof("Node updated: %s",
event.Path)
+ s.handleZookeeperData(event.Path, data)
+
+ case zk.EventNodeDeleted:
+ log.Infof("Node deleted: %s",
event.Path)
+ s.removeServiceInstance(event.Path)
+ }
+
+ case <-s.stopCh:
+ log.Warn("Received stop signal, stopping
watch.")
+ return
+ }
+ }
+ }
+}
+
+func (s *ZookeeperRegistryService) handleZookeeperData(path string, data
[]byte) {
+ clusterName, serverInstance, err := parseZookeeperData(path, data)
+ if err != nil {
+ log.Errorf("Zookeeper data error: %s", err)
+ return
+ }
+
+ s.rwLock.Lock()
+ if s.grouplist[clusterName] == nil {
+ s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
+ } else {
+ s.grouplist[clusterName] = append(s.grouplist[clusterName],
serverInstance)
Review Comment:
• There may be an issue of adding the same service instance
repeatedly.
• It is necessary to check whether the instance already exists.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]