This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 9d88f3186 fix PolarisServiceWatcher bug (#1988)
9d88f3186 is described below

commit 9d88f3186841515d7009ce572f4bccc3e367e492
Author: Jason Deng <[email protected]>
AuthorDate: Tue Aug 2 20:24:37 2022 +0800

    fix PolarisServiceWatcher bug (#1988)
    
    Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.47.0 
to 1.48.0.
    - [Release notes](https://github.com/grpc/grpc-go/releases)
    - [Commits](https://github.com/grpc/grpc-go/compare/v1.47.0...v1.48.0)
    
    ---
    updated-dependencies:
    - dependency-name: google.golang.org/grpc
      dependency-type: direct:production
      update-type: version-update:semver-minor
    ...
    
    Signed-off-by: dependabot[bot] <[email protected]>
    
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 registry/polaris/core.go                     |  59 ++--
 registry/zookeeper/service_discovery_test.go | 391 +++++++++------------------
 2 files changed, 159 insertions(+), 291 deletions(-)

diff --git a/registry/polaris/core.go b/registry/polaris/core.go
index 8c82b7b99..e87fac7c9 100644
--- a/registry/polaris/core.go
+++ b/registry/polaris/core.go
@@ -57,9 +57,8 @@ func newPolarisWatcher(param *api.WatchServiceRequest, 
consumer api.ConsumerAPI)
 // AddSubscriber add subscriber into watcher's subscribers
 func (watcher *PolarisServiceWatcher) AddSubscriber(subscriber 
func(remoting.EventType, []model.Instance)) {
 
-       watcher.lazyRun()
-
        watcher.lock.Lock()
+       watcher.lazyRun()
        defer watcher.lock.Unlock()
 
        watcher.subscribers = append(watcher.subscribers, subscriber)
@@ -74,48 +73,50 @@ func (watcher *PolarisServiceWatcher) lazyRun() {
 
 // startWatch start run work to watch target service by polaris
 func (watcher *PolarisServiceWatcher) startWatch() {
-
        for {
                resp, err := 
watcher.consumer.WatchService(watcher.subscribeParam)
                if err != nil {
                        time.Sleep(time.Duration(500 * time.Millisecond))
                        continue
                }
-
                watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
                        Value:      resp.GetAllInstancesResp.Instances,
                        ConfigType: remoting.EventTypeAdd,
                })
 
-               select {
-               case event := <-resp.EventChannel:
-                       eType := event.GetSubScribeEventType()
-                       if eType == api.EventInstance {
-                               insEvent := event.(*model.InstanceEvent)
-                               if insEvent.AddEvent != nil {
-                                       
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
-                                               Value:      
insEvent.AddEvent.Instances,
-                                               ConfigType: 
remoting.EventTypeAdd,
-                                       })
-                               }
-                               if insEvent.UpdateEvent != nil {
-                                       instances := make([]model.Instance, 
len(insEvent.UpdateEvent.UpdateList))
-                                       for i := range 
insEvent.UpdateEvent.UpdateList {
-                                               instances[i] = 
insEvent.UpdateEvent.UpdateList[i].After
+               for {
+                       select {
+                       case event := <-resp.EventChannel:
+                               eType := event.GetSubScribeEventType()
+                               if eType == api.EventInstance {
+                                       insEvent := event.(*model.InstanceEvent)
+
+                                       if insEvent.AddEvent != nil {
+                                               
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+                                                       Value:      
insEvent.AddEvent.Instances,
+                                                       ConfigType: 
remoting.EventTypeAdd,
+                                               })
+                                       }
+                                       if insEvent.UpdateEvent != nil {
+                                               instances := 
make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
+                                               for i := range 
insEvent.UpdateEvent.UpdateList {
+                                                       instances[i] = 
insEvent.UpdateEvent.UpdateList[i].After
+                                               }
+                                               
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+                                                       Value:      instances,
+                                                       ConfigType: 
remoting.EventTypeUpdate,
+                                               })
+                                       }
+                                       if insEvent.DeleteEvent != nil {
+                                               
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+                                                       Value:      
insEvent.DeleteEvent.Instances,
+                                                       ConfigType: 
remoting.EventTypeDel,
+                                               })
                                        }
-                                       
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
-                                               Value:      instances,
-                                               ConfigType: 
remoting.EventTypeUpdate,
-                                       })
-                               }
-                               if insEvent.DeleteEvent != nil {
-                                       
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
-                                               Value:      
insEvent.DeleteEvent.Instances,
-                                               ConfigType: 
remoting.EventTypeDel,
-                                       })
                                }
                        }
                }
+
        }
 }
 
diff --git a/registry/zookeeper/service_discovery_test.go 
b/registry/zookeeper/service_discovery_test.go
index 477fa5eef..20e2e4659 100644
--- a/registry/zookeeper/service_discovery_test.go
+++ b/registry/zookeeper/service_discovery_test.go
@@ -17,265 +17,132 @@
 
 package zookeeper
 
-//
-//import (
-//     "context"
-//     "strconv"
-//     "sync"
-//     "testing"
-//)
-//
-//import (
-//     "github.com/dubbogo/go-zookeeper/zk"
-//
-//     gxset "github.com/dubbogo/gost/container/set"
-//
-//     "github.com/stretchr/testify/assert"
-//)
-//
-//import (
-//     "dubbo.apache.org/dubbo-go/v3/common"
-//     "dubbo.apache.org/dubbo-go/v3/common/constant"
-//     "dubbo.apache.org/dubbo-go/v3/common/extension"
-//     "github.com/dubbogo/gost/gof/observer"
-//     "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
-//     "dubbo.apache.org/dubbo-go/v3/config"
-//     "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
-//     "dubbo.apache.org/dubbo-go/v3/protocol"
-//     "dubbo.apache.org/dubbo-go/v3/registry"
-//     "dubbo.apache.org/dubbo-go/v3/registry/event"
-//)
-//
-//const testName = "test"
-//
-//func prepareData(t *testing.T) *zk.TestCluster {
-//     var err error
-//     tc, err := zk.StartTestCluster(1, nil, nil)
-//     assert.NoError(t, err)
-//     assert.NotNil(t, tc.Servers[0])
-//     address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
-//     //address := "127.0.0.1:2181"
-//
-//     config.GetRootConfig().ServiceDiscoveries[testName] = 
&config.ServiceDiscoveryConfig{
-//             Protocol:  "zookeeper",
-//             RemoteRef: "test",
-//     }
-//
-//     config.GetRootConfig().Remotes[testName] = &config.RemoteConfig{
-//             Address:    address,
-//             TimeoutStr: "10s",
-//     }
-//     return tc
-//}
-//
-//func TestNewZookeeperServiceDiscovery(t *testing.T) {
-//     _, err := newZookeeperServiceDiscovery()
-//
-//     // the ShutdownConfig not found
-//     // err: could not init the instance because the config is invalid
-//     assert.NotNil(t, err)
-//
-//     //sdc := &config.ServiceDiscoveryConfig{
-//     //      Protocol:  "zookeeper",
-//     //      RemoteRef: "mock",
-//     //}
-//     //config.GetRootConfig().ServiceDiscoveries[name] = sdc
-//     _, err = newZookeeperServiceDiscovery()
-//
-//     // RemoteConfig not found
-//     // err: could not find the remote config for name: mock
-//     assert.NotNil(t, err)
-//}
-//
-//func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
-//     tc := prepareData(t)
-//     defer func() {
-//             _ = tc.Stop()
-//     }()
-//     t.Run("testCURDZookeeperServiceDiscovery", 
testCURDZookeeperServiceDiscovery)
-//     t.Run("testAddListenerZookeeperServiceDiscovery", 
testAddListenerZookeeperServiceDiscovery)
-//}
-//
-//func testCURDZookeeperServiceDiscovery(t *testing.T) {
-//     prepareData(t)
-//     extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
-//             return dispatcher.NewMockEventDispatcher()
-//     })
-//     extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping 
{
-//             return mapping.NewMockServiceNameMapping()
-//     })
-//
-//     extension.SetProtocol("mock", func() protocol.Protocol {
-//             return &mockProtocol{}
-//     })
-//
-//     sd, err := newZookeeperServiceDiscovery()
-//     assert.Nil(t, err)
-//     defer func() {
-//             _ = sd.Destroy()
-//     }()
-//     ins := &registry.DefaultServiceInstance{
-//             ID:          "testID",
-//             ServiceName: testName,
-//             Host:        "127.0.0.1",
-//             Port:        2233,
-//             Enable:      true,
-//             Healthy:     true,
-//             Metadata:    nil,
-//     }
-//     ins.Metadata = map[string]string{"t1": "test1", 
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: 
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-//     err = sd.Register(ins)
-//
-//     assert.Nil(t, err)
-//
-//     testsPager := sd.GetHealthyInstancesByPage(testName, 0, 1, true)
-//     assert.Equal(t, 1, testsPager.GetDataSize())
-//     assert.Equal(t, 1, testsPager.GetTotalPages())
-//     test := testsPager.GetData()[0].(registry.ServiceInstance)
-//     assert.Equal(t, "127.0.0.1:2233", test.GetID())
-//     assert.Equal(t, "test1", test.GetMetadata()["t1"])
-//
-//     ins = &registry.DefaultServiceInstance{
-//             ID:          "testID",
-//             ServiceName: testName,
-//             Host:        "127.0.0.1",
-//             Port:        2233,
-//             Enable:      true,
-//             Healthy:     true,
-//     }
-//     ins.Metadata = map[string]string{"t1": "test12", 
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: 
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-//
-//     err = sd.Update(ins)
-//
-//     assert.Nil(t, err)
-//
-//     testsPager = sd.GetInstancesByPage(testName, 0, 1)
-//     assert.Equal(t, 1, testsPager.GetDataSize())
-//     test = testsPager.GetData()[0].(registry.ServiceInstance)
-//     assert.Equal(t, "test12", test.GetMetadata()["t1"])
-//
-//     testsMap := sd.GetRequestInstances([]string{testName}, 0, 1)
-//     assert.Equal(t, 1, len(testsMap))
-//     assert.Equal(t, 1, testsMap[testName].GetDataSize())
-//     test = testsMap[testName].GetData()[0].(registry.ServiceInstance)
-//     assert.Equal(t, "test12", test.GetMetadata()["t1"])
-//
-//     names := sd.GetServices()
-//     assert.Equal(t, 1, names.Size())
-//     assert.Equal(t, testName, names.Values()[0])
-//
-//     err = sd.Unregister(&registry.DefaultServiceInstance{
-//             ID:          "testID",
-//             ServiceName: testName,
-//             Host:        "127.0.0.1",
-//             Port:        2233,
-//             Enable:      true,
-//             Healthy:     true,
-//             Metadata:    nil,
-//     })
-//     assert.Nil(t, err)
-//}
-//
-//func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
-//     sd, err := newZookeeperServiceDiscovery()
-//     assert.Nil(t, err)
-//     defer func() {
-//             _ = sd.Destroy()
-//     }()
-//
-//     ins := &registry.DefaultServiceInstance{
-//             ID:          "testID",
-//             ServiceName: testName,
-//             Host:        "127.0.0.1",
-//             Port:        2233,
-//             Enable:      true,
-//             Healthy:     true,
-//             Metadata:    nil,
-//     }
-//     ins.Metadata = map[string]string{"t1": "test12", 
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: 
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-//     err = sd.Register(ins)
-//
-//     assert.Nil(t, err)
-//     wg := &sync.WaitGroup{}
-//     wg.Add(1)
-//     tn := &testNotify{
-//             wg: wg,
-//             t:  t,
-//     }
-//     hs := gxset.NewSet()
-//     hs.Add(testName)
-//
-//     sicl := event.NewServiceInstancesChangedListener(hs)
-//     sicl.AddListenerAndNotify(testName, tn)
-//     extension.SetAndInitGlobalDispatcher("direct")
-//     extension.GetGlobalDispatcher().AddEventListener(sicl)
-//     err = sd.AddListener(sicl)
-//     assert.NoError(t, err)
-//
-//     ins = &registry.DefaultServiceInstance{
-//             ID:          "testID",
-//             ServiceName: testName,
-//             Host:        "127.0.0.1",
-//             Port:        2233,
-//             Enable:      true,
-//             Healthy:     true,
-//             Metadata:    nil,
-//     }
-//     ins.Metadata = map[string]string{"t1": "test12", 
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: 
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-//     err = sd.Update(ins)
-//     assert.NoError(t, err)
-//     tn.wg.Wait()
-//}
-//
-//type testNotify struct {
-//     wg *sync.WaitGroup
-//     t  *testing.T
-//}
-//
-//func (tn *testNotify) Notify(e *registry.ServiceEvent) {
-//     assert.Equal(tn.t, "2233", e.Service.Port)
-//     tn.wg.Done()
-//}
-//func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) {
-//
-//}
-//
-//type mockProtocol struct{}
-//
-//func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter {
-//     panic("implement me")
-//}
-//
-//func (m mockProtocol) Refer(*common.URL) protocol.Invoker {
-//     return &mockInvoker{}
-//}
-//
-//func (m mockProtocol) Destroy() {
-//     panic("implement me")
-//}
-//
-//type mockInvoker struct{}
-//
-//func (m *mockInvoker) GetURL() *common.URL {
-//     panic("implement me")
-//}
-//
-//func (m *mockInvoker) IsAvailable() bool {
-//     panic("implement me")
-//}
-//
-//func (m *mockInvoker) Destroy() {
-//     panic("implement me")
-//}
-//
-//func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) 
protocol.Result {
-//     // for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent
-//     serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"}
-//     services := make(map[string]*common.ServiceInfo)
-//     services["test"] = serviceInfo
-//     return &protocol.RPCResult{
-//             Rest: &common.MetadataInfo{
-//                     Services: services,
-//             },
-//     }
-//}
+import (
+       "context"
+       "sync"
+       "testing"
+)
+
+import (
+       "github.com/nacos-group/nacos-sdk-go/model"
+       "github.com/nacos-group/nacos-sdk-go/vo"
+
+       "github.com/stretchr/testify/assert"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/protocol"
+       "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+func Test_newZookeeperServiceDiscovery(t *testing.T) {
+       url, _ := common.NewURL("dubbo://127.0.0.1:2181",
+               common.WithParamsValue(constant.ClientNameKey, "zk-client"))
+       sd, err := newZookeeperServiceDiscovery(url)
+       assert.Nil(t, err)
+       err = sd.Destroy()
+       assert.Nil(t, err)
+
+}
+func Test_zookeeperServiceDiscovery_DataChange(t *testing.T) {
+       serviceDiscovery := &zookeeperServiceDiscovery{}
+       assert.Equal(t, registry.DefaultPageSize, 
serviceDiscovery.GetDefaultPageSize())
+}
+
+type testNotify struct {
+       wg *sync.WaitGroup
+       t  *testing.T
+}
+
+func (tn *testNotify) Notify(e *registry.ServiceEvent) {
+       assert.Equal(tn.t, "2181", e.Service.Port)
+       tn.wg.Done()
+}
+
+func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) {}
+
+type mockClient struct {
+       instance []interface{}
+}
+
+func (c mockClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, 
error) {
+       return true, nil
+}
+
+func (c mockClient) DeregisterInstance(param vo.DeregisterInstanceParam) 
(bool, error) {
+       return true, nil
+}
+
+func (c mockClient) UpdateInstance(param vo.UpdateInstanceParam) (bool, error) 
{
+       return true, nil
+}
+
+func (c mockClient) GetService(param vo.GetServiceParam) (model.Service, 
error) {
+       panic("implement me")
+}
+
+func (c mockClient) SelectInstances(param vo.SelectInstancesParam) 
([]model.Instance, error) {
+       panic("implement me")
+}
+
+func (c mockClient) SelectAllInstances(param vo.SelectAllInstancesParam) 
([]model.Instance, error) {
+       panic("implement me")
+}
+
+func (c mockClient) SelectOneHealthyInstance(param 
vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
+       panic("implement me")
+}
+
+func (c mockClient) Subscribe(param *vo.SubscribeParam) error {
+       return nil
+}
+
+func (c mockClient) Unsubscribe(param *vo.SubscribeParam) error {
+       panic("implement me")
+}
+
+func (c mockClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) 
(model.ServiceList, error) {
+       panic("implement me")
+}
+
+type mockProtocol struct{}
+
+func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter {
+       panic("implement me")
+}
+
+func (m mockProtocol) Refer(*common.URL) protocol.Invoker {
+       return &mockInvoker{}
+}
+
+func (m mockProtocol) Destroy() {
+       panic("implement me")
+}
+
+type mockInvoker struct{}
+
+func (m *mockInvoker) GetURL() *common.URL {
+       panic("implement me")
+}
+
+func (m *mockInvoker) IsAvailable() bool {
+       panic("implement me")
+}
+
+func (m *mockInvoker) Destroy() {
+       panic("implement me")
+}
+
+func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) 
protocol.Result {
+       // for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent
+       serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"}
+       services := make(map[string]*common.ServiceInfo)
+       services["test"] = serviceInfo
+       return &protocol.RPCResult{
+               Rest: &common.MetadataInfo{
+                       Services: services,
+               },
+       }
+}

Reply via email to