This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 9d88f3186 fix PolarisServiceWatcher bug (#1988)
9d88f3186 is described below
commit 9d88f3186841515d7009ce572f4bccc3e367e492
Author: Jason Deng <[email protected]>
AuthorDate: Tue Aug 2 20:24:37 2022 +0800
fix PolarisServiceWatcher bug (#1988)
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.47.0
to 1.48.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.47.0...v1.48.0)
---
updated-dependencies:
- dependency-name: google.golang.org/grpc
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
---
registry/polaris/core.go | 59 ++--
registry/zookeeper/service_discovery_test.go | 391 +++++++++------------------
2 files changed, 159 insertions(+), 291 deletions(-)
diff --git a/registry/polaris/core.go b/registry/polaris/core.go
index 8c82b7b99..e87fac7c9 100644
--- a/registry/polaris/core.go
+++ b/registry/polaris/core.go
@@ -57,9 +57,8 @@ func newPolarisWatcher(param *api.WatchServiceRequest,
consumer api.ConsumerAPI)
// AddSubscriber add subscriber into watcher's subscribers
func (watcher *PolarisServiceWatcher) AddSubscriber(subscriber
func(remoting.EventType, []model.Instance)) {
- watcher.lazyRun()
-
watcher.lock.Lock()
+ watcher.lazyRun()
defer watcher.lock.Unlock()
watcher.subscribers = append(watcher.subscribers, subscriber)
@@ -74,48 +73,50 @@ func (watcher *PolarisServiceWatcher) lazyRun() {
// startWatch start run work to watch target service by polaris
func (watcher *PolarisServiceWatcher) startWatch() {
-
for {
resp, err :=
watcher.consumer.WatchService(watcher.subscribeParam)
if err != nil {
time.Sleep(time.Duration(500 * time.Millisecond))
continue
}
-
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
Value: resp.GetAllInstancesResp.Instances,
ConfigType: remoting.EventTypeAdd,
})
- select {
- case event := <-resp.EventChannel:
- eType := event.GetSubScribeEventType()
- if eType == api.EventInstance {
- insEvent := event.(*model.InstanceEvent)
- if insEvent.AddEvent != nil {
-
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
- Value:
insEvent.AddEvent.Instances,
- ConfigType:
remoting.EventTypeAdd,
- })
- }
- if insEvent.UpdateEvent != nil {
- instances := make([]model.Instance,
len(insEvent.UpdateEvent.UpdateList))
- for i := range
insEvent.UpdateEvent.UpdateList {
- instances[i] =
insEvent.UpdateEvent.UpdateList[i].After
+ for {
+ select {
+ case event := <-resp.EventChannel:
+ eType := event.GetSubScribeEventType()
+ if eType == api.EventInstance {
+ insEvent := event.(*model.InstanceEvent)
+
+ if insEvent.AddEvent != nil {
+
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value:
insEvent.AddEvent.Instances,
+ ConfigType:
remoting.EventTypeAdd,
+ })
+ }
+ if insEvent.UpdateEvent != nil {
+ instances :=
make([]model.Instance, len(insEvent.UpdateEvent.UpdateList))
+ for i := range
insEvent.UpdateEvent.UpdateList {
+ instances[i] =
insEvent.UpdateEvent.UpdateList[i].After
+ }
+
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value: instances,
+ ConfigType:
remoting.EventTypeUpdate,
+ })
+ }
+ if insEvent.DeleteEvent != nil {
+
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value:
insEvent.DeleteEvent.Instances,
+ ConfigType:
remoting.EventTypeDel,
+ })
}
-
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
- Value: instances,
- ConfigType:
remoting.EventTypeUpdate,
- })
- }
- if insEvent.DeleteEvent != nil {
-
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
- Value:
insEvent.DeleteEvent.Instances,
- ConfigType:
remoting.EventTypeDel,
- })
}
}
}
+
}
}
diff --git a/registry/zookeeper/service_discovery_test.go
b/registry/zookeeper/service_discovery_test.go
index 477fa5eef..20e2e4659 100644
--- a/registry/zookeeper/service_discovery_test.go
+++ b/registry/zookeeper/service_discovery_test.go
@@ -17,265 +17,132 @@
package zookeeper
-//
-//import (
-// "context"
-// "strconv"
-// "sync"
-// "testing"
-//)
-//
-//import (
-// "github.com/dubbogo/go-zookeeper/zk"
-//
-// gxset "github.com/dubbogo/gost/container/set"
-//
-// "github.com/stretchr/testify/assert"
-//)
-//
-//import (
-// "dubbo.apache.org/dubbo-go/v3/common"
-// "dubbo.apache.org/dubbo-go/v3/common/constant"
-// "dubbo.apache.org/dubbo-go/v3/common/extension"
-// "github.com/dubbogo/gost/gof/observer"
-// "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
-// "dubbo.apache.org/dubbo-go/v3/config"
-// "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
-// "dubbo.apache.org/dubbo-go/v3/protocol"
-// "dubbo.apache.org/dubbo-go/v3/registry"
-// "dubbo.apache.org/dubbo-go/v3/registry/event"
-//)
-//
-//const testName = "test"
-//
-//func prepareData(t *testing.T) *zk.TestCluster {
-// var err error
-// tc, err := zk.StartTestCluster(1, nil, nil)
-// assert.NoError(t, err)
-// assert.NotNil(t, tc.Servers[0])
-// address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
-// //address := "127.0.0.1:2181"
-//
-// config.GetRootConfig().ServiceDiscoveries[testName] =
&config.ServiceDiscoveryConfig{
-// Protocol: "zookeeper",
-// RemoteRef: "test",
-// }
-//
-// config.GetRootConfig().Remotes[testName] = &config.RemoteConfig{
-// Address: address,
-// TimeoutStr: "10s",
-// }
-// return tc
-//}
-//
-//func TestNewZookeeperServiceDiscovery(t *testing.T) {
-// _, err := newZookeeperServiceDiscovery()
-//
-// // the ShutdownConfig not found
-// // err: could not init the instance because the config is invalid
-// assert.NotNil(t, err)
-//
-// //sdc := &config.ServiceDiscoveryConfig{
-// // Protocol: "zookeeper",
-// // RemoteRef: "mock",
-// //}
-// //config.GetRootConfig().ServiceDiscoveries[name] = sdc
-// _, err = newZookeeperServiceDiscovery()
-//
-// // RemoteConfig not found
-// // err: could not find the remote config for name: mock
-// assert.NotNil(t, err)
-//}
-//
-//func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
-// tc := prepareData(t)
-// defer func() {
-// _ = tc.Stop()
-// }()
-// t.Run("testCURDZookeeperServiceDiscovery",
testCURDZookeeperServiceDiscovery)
-// t.Run("testAddListenerZookeeperServiceDiscovery",
testAddListenerZookeeperServiceDiscovery)
-//}
-//
-//func testCURDZookeeperServiceDiscovery(t *testing.T) {
-// prepareData(t)
-// extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
-// return dispatcher.NewMockEventDispatcher()
-// })
-// extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping
{
-// return mapping.NewMockServiceNameMapping()
-// })
-//
-// extension.SetProtocol("mock", func() protocol.Protocol {
-// return &mockProtocol{}
-// })
-//
-// sd, err := newZookeeperServiceDiscovery()
-// assert.Nil(t, err)
-// defer func() {
-// _ = sd.Destroy()
-// }()
-// ins := ®istry.DefaultServiceInstance{
-// ID: "testID",
-// ServiceName: testName,
-// Host: "127.0.0.1",
-// Port: 2233,
-// Enable: true,
-// Healthy: true,
-// Metadata: nil,
-// }
-// ins.Metadata = map[string]string{"t1": "test1",
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME:
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-// err = sd.Register(ins)
-//
-// assert.Nil(t, err)
-//
-// testsPager := sd.GetHealthyInstancesByPage(testName, 0, 1, true)
-// assert.Equal(t, 1, testsPager.GetDataSize())
-// assert.Equal(t, 1, testsPager.GetTotalPages())
-// test := testsPager.GetData()[0].(registry.ServiceInstance)
-// assert.Equal(t, "127.0.0.1:2233", test.GetID())
-// assert.Equal(t, "test1", test.GetMetadata()["t1"])
-//
-// ins = ®istry.DefaultServiceInstance{
-// ID: "testID",
-// ServiceName: testName,
-// Host: "127.0.0.1",
-// Port: 2233,
-// Enable: true,
-// Healthy: true,
-// }
-// ins.Metadata = map[string]string{"t1": "test12",
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME:
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-//
-// err = sd.Update(ins)
-//
-// assert.Nil(t, err)
-//
-// testsPager = sd.GetInstancesByPage(testName, 0, 1)
-// assert.Equal(t, 1, testsPager.GetDataSize())
-// test = testsPager.GetData()[0].(registry.ServiceInstance)
-// assert.Equal(t, "test12", test.GetMetadata()["t1"])
-//
-// testsMap := sd.GetRequestInstances([]string{testName}, 0, 1)
-// assert.Equal(t, 1, len(testsMap))
-// assert.Equal(t, 1, testsMap[testName].GetDataSize())
-// test = testsMap[testName].GetData()[0].(registry.ServiceInstance)
-// assert.Equal(t, "test12", test.GetMetadata()["t1"])
-//
-// names := sd.GetServices()
-// assert.Equal(t, 1, names.Size())
-// assert.Equal(t, testName, names.Values()[0])
-//
-// err = sd.Unregister(®istry.DefaultServiceInstance{
-// ID: "testID",
-// ServiceName: testName,
-// Host: "127.0.0.1",
-// Port: 2233,
-// Enable: true,
-// Healthy: true,
-// Metadata: nil,
-// })
-// assert.Nil(t, err)
-//}
-//
-//func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
-// sd, err := newZookeeperServiceDiscovery()
-// assert.Nil(t, err)
-// defer func() {
-// _ = sd.Destroy()
-// }()
-//
-// ins := ®istry.DefaultServiceInstance{
-// ID: "testID",
-// ServiceName: testName,
-// Host: "127.0.0.1",
-// Port: 2233,
-// Enable: true,
-// Healthy: true,
-// Metadata: nil,
-// }
-// ins.Metadata = map[string]string{"t1": "test12",
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME:
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-// err = sd.Register(ins)
-//
-// assert.Nil(t, err)
-// wg := &sync.WaitGroup{}
-// wg.Add(1)
-// tn := &testNotify{
-// wg: wg,
-// t: t,
-// }
-// hs := gxset.NewSet()
-// hs.Add(testName)
-//
-// sicl := event.NewServiceInstancesChangedListener(hs)
-// sicl.AddListenerAndNotify(testName, tn)
-// extension.SetAndInitGlobalDispatcher("direct")
-// extension.GetGlobalDispatcher().AddEventListener(sicl)
-// err = sd.AddListener(sicl)
-// assert.NoError(t, err)
-//
-// ins = ®istry.DefaultServiceInstance{
-// ID: "testID",
-// ServiceName: testName,
-// Host: "127.0.0.1",
-// Port: 2233,
-// Enable: true,
-// Healthy: true,
-// Metadata: nil,
-// }
-// ins.Metadata = map[string]string{"t1": "test12",
constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME:
`{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
-// err = sd.Update(ins)
-// assert.NoError(t, err)
-// tn.wg.Wait()
-//}
-//
-//type testNotify struct {
-// wg *sync.WaitGroup
-// t *testing.T
-//}
-//
-//func (tn *testNotify) Notify(e *registry.ServiceEvent) {
-// assert.Equal(tn.t, "2233", e.Service.Port)
-// tn.wg.Done()
-//}
-//func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) {
-//
-//}
-//
-//type mockProtocol struct{}
-//
-//func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter {
-// panic("implement me")
-//}
-//
-//func (m mockProtocol) Refer(*common.URL) protocol.Invoker {
-// return &mockInvoker{}
-//}
-//
-//func (m mockProtocol) Destroy() {
-// panic("implement me")
-//}
-//
-//type mockInvoker struct{}
-//
-//func (m *mockInvoker) GetURL() *common.URL {
-// panic("implement me")
-//}
-//
-//func (m *mockInvoker) IsAvailable() bool {
-// panic("implement me")
-//}
-//
-//func (m *mockInvoker) Destroy() {
-// panic("implement me")
-//}
-//
-//func (m *mockInvoker) Invoke(context.Context, protocol.Invocation)
protocol.Result {
-// // for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent
-// serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"}
-// services := make(map[string]*common.ServiceInfo)
-// services["test"] = serviceInfo
-// return &protocol.RPCResult{
-// Rest: &common.MetadataInfo{
-// Services: services,
-// },
-// }
-//}
+import (
+ "context"
+ "sync"
+ "testing"
+)
+
+import (
+ "github.com/nacos-group/nacos-sdk-go/model"
+ "github.com/nacos-group/nacos-sdk-go/vo"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+func Test_newZookeeperServiceDiscovery(t *testing.T) {
+ url, _ := common.NewURL("dubbo://127.0.0.1:2181",
+ common.WithParamsValue(constant.ClientNameKey, "zk-client"))
+ sd, err := newZookeeperServiceDiscovery(url)
+ assert.Nil(t, err)
+ err = sd.Destroy()
+ assert.Nil(t, err)
+
+}
+func Test_zookeeperServiceDiscovery_DataChange(t *testing.T) {
+ serviceDiscovery := &zookeeperServiceDiscovery{}
+ assert.Equal(t, registry.DefaultPageSize,
serviceDiscovery.GetDefaultPageSize())
+}
+
+type testNotify struct {
+ wg *sync.WaitGroup
+ t *testing.T
+}
+
+func (tn *testNotify) Notify(e *registry.ServiceEvent) {
+ assert.Equal(tn.t, "2181", e.Service.Port)
+ tn.wg.Done()
+}
+
+func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) {}
+
+type mockClient struct {
+ instance []interface{}
+}
+
+func (c mockClient) RegisterInstance(param vo.RegisterInstanceParam) (bool,
error) {
+ return true, nil
+}
+
+func (c mockClient) DeregisterInstance(param vo.DeregisterInstanceParam)
(bool, error) {
+ return true, nil
+}
+
+func (c mockClient) UpdateInstance(param vo.UpdateInstanceParam) (bool, error)
{
+ return true, nil
+}
+
+func (c mockClient) GetService(param vo.GetServiceParam) (model.Service,
error) {
+ panic("implement me")
+}
+
+func (c mockClient) SelectInstances(param vo.SelectInstancesParam)
([]model.Instance, error) {
+ panic("implement me")
+}
+
+func (c mockClient) SelectAllInstances(param vo.SelectAllInstancesParam)
([]model.Instance, error) {
+ panic("implement me")
+}
+
+func (c mockClient) SelectOneHealthyInstance(param
vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
+ panic("implement me")
+}
+
+func (c mockClient) Subscribe(param *vo.SubscribeParam) error {
+ return nil
+}
+
+func (c mockClient) Unsubscribe(param *vo.SubscribeParam) error {
+ panic("implement me")
+}
+
+func (c mockClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam)
(model.ServiceList, error) {
+ panic("implement me")
+}
+
+type mockProtocol struct{}
+
+func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter {
+ panic("implement me")
+}
+
+func (m mockProtocol) Refer(*common.URL) protocol.Invoker {
+ return &mockInvoker{}
+}
+
+func (m mockProtocol) Destroy() {
+ panic("implement me")
+}
+
+type mockInvoker struct{}
+
+func (m *mockInvoker) GetURL() *common.URL {
+ panic("implement me")
+}
+
+func (m *mockInvoker) IsAvailable() bool {
+ panic("implement me")
+}
+
+func (m *mockInvoker) Destroy() {
+ panic("implement me")
+}
+
+func (m *mockInvoker) Invoke(context.Context, protocol.Invocation)
protocol.Result {
+ // for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent
+ serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"}
+ services := make(map[string]*common.ServiceInfo)
+ services["test"] = serviceInfo
+ return &protocol.RPCResult{
+ Rest: &common.MetadataInfo{
+ Services: services,
+ },
+ }
+}