This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 02a1d5b93 add serviceMappingListener for application service discovery
(#2205)
02a1d5b93 is described below
commit 02a1d5b93591391a737d9c380e5199e04d47993e
Author: bobtthp <[email protected]>
AuthorDate: Sat Mar 25 09:01:22 2023 +0800
add serviceMappingListener for application service discovery (#2205)
* add serviceMappingListener
* rollback loadbalance change
* resolve cycle ref
* add ref
* fix lint
* fix ref
* fix protocol prom
* fix ci prom
* fix service mapping duplicated
* retry ci
* delete hc zip
---------
Co-authored-by: bobtthp <[email protected]>
---
config/instance/metadata_report_test.go | 7 +-
metadata/mapping/memory/service_name_mapping.go | 7 +-
metadata/mapping/metadata/service_name_mapping.go | 11 ++-
metadata/mapping/mock_service_name_mapping.go | 7 +-
metadata/mapping/service_name_mapping.go | 4 +-
metadata/report/etcd/report.go | 7 +-
metadata/report/nacos/report.go | 51 +++++++++-
metadata/report/report.go | 6 +-
metadata/report/zookeeper/report.go | 7 +-
metadata/service/remote/service_test.go | 7 +-
registry/event.go | 27 ++++++
registry/nacos/service_discovery_test.go | 4 +-
.../service_mapping_changed_listener.go | 23 +----
.../servicediscovery/service_discovery_registry.go | 79 ++++++++++-----
.../service_instances_changed_listener_impl.go | 2 +-
.../service_mapping_change_listener_impl.go | 108 +++++++++++++++++++++
16 files changed, 297 insertions(+), 60 deletions(-)
diff --git a/config/instance/metadata_report_test.go
b/config/instance/metadata_report_test.go
index 4a56883b4..8d49009f7 100644
--- a/config/instance/metadata_report_test.go
+++ b/config/instance/metadata_report_test.go
@@ -33,6 +33,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
func TestGetMetadataReportInstance(t *testing.T) {
@@ -56,7 +57,11 @@ func (m mockMetadataReport)
RegisterServiceAppMapping(string, string, string) er
panic("implement me")
}
-func (m mockMetadataReport) GetServiceAppMapping(string, string)
(*gxset.HashSet, error) {
+func (m mockMetadataReport) GetServiceAppMapping(string, string,
registry.MappingListener) (*gxset.HashSet, error) {
+ panic("implement me")
+}
+
+func (m mockMetadataReport) RemoveServiceAppMappingListener(string, string)
error {
panic("implement me")
}
diff --git a/metadata/mapping/memory/service_name_mapping.go
b/metadata/mapping/memory/service_name_mapping.go
index 73b0603ea..9d7a72010 100644
--- a/metadata/mapping/memory/service_name_mapping.go
+++ b/metadata/mapping/memory/service_name_mapping.go
@@ -30,6 +30,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
func init() {
@@ -42,10 +43,14 @@ func (i *InMemoryServiceNameMapping) Map(url *common.URL)
error {
return nil
}
-func (i *InMemoryServiceNameMapping) Get(url *common.URL) (*gxset.HashSet,
error) {
+func (i *InMemoryServiceNameMapping) Get(url *common.URL, listener
registry.MappingListener) (*gxset.HashSet, error) {
return gxset.NewSet(config.GetApplicationConfig().Name), nil
}
+func (i *InMemoryServiceNameMapping) Remove(url *common.URL) error {
+ return nil
+}
+
var (
serviceNameMappingInstance *InMemoryServiceNameMapping
serviceNameMappingOnce sync.Once
diff --git a/metadata/mapping/metadata/service_name_mapping.go
b/metadata/mapping/metadata/service_name_mapping.go
index ae934ae48..8c0104000 100644
--- a/metadata/mapping/metadata/service_name_mapping.go
+++ b/metadata/mapping/metadata/service_name_mapping.go
@@ -36,6 +36,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config/instance"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
const (
@@ -75,10 +76,16 @@ func (d *MetadataServiceNameMapping) Map(url *common.URL)
error {
}
// Get will return the application-level services. If not found, the empty set
will be returned.
-func (d *MetadataServiceNameMapping) Get(url *common.URL) (*gxset.HashSet,
error) {
+func (d *MetadataServiceNameMapping) Get(url *common.URL, listener
registry.MappingListener) (*gxset.HashSet, error) {
serviceInterface := url.GetParam(constant.InterfaceKey, "")
metadataReport := instance.GetMetadataReportInstance()
- return metadataReport.GetServiceAppMapping(serviceInterface,
defaultGroup)
+ return metadataReport.GetServiceAppMapping(serviceInterface,
defaultGroup, listener)
+}
+
+func (d *MetadataServiceNameMapping) Remove(url *common.URL) error {
+ serviceInterface := url.GetParam(constant.InterfaceKey, "")
+ metadataReport := instance.GetMetadataReportInstance()
+ return metadataReport.RemoveServiceAppMappingListener(serviceInterface,
defaultGroup)
}
// buildMappingKey will return mapping key, it looks like
defaultGroup/serviceInterface
diff --git a/metadata/mapping/mock_service_name_mapping.go
b/metadata/mapping/mock_service_name_mapping.go
index c3aae3dab..0a5a437d5 100644
--- a/metadata/mapping/mock_service_name_mapping.go
+++ b/metadata/mapping/mock_service_name_mapping.go
@@ -23,6 +23,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
type MockServiceNameMapping struct{}
@@ -35,6 +36,10 @@ func (m *MockServiceNameMapping) Map(*common.URL) error {
return nil
}
-func (m *MockServiceNameMapping) Get(*common.URL) (*gxset.HashSet, error) {
+func (m *MockServiceNameMapping) Get(*common.URL, registry.MappingListener)
(*gxset.HashSet, error) {
+ panic("implement me")
+}
+
+func (m *MockServiceNameMapping) Remove(*common.URL) error {
panic("implement me")
}
diff --git a/metadata/mapping/service_name_mapping.go
b/metadata/mapping/service_name_mapping.go
index 75c9b679d..ed76d0f5d 100644
--- a/metadata/mapping/service_name_mapping.go
+++ b/metadata/mapping/service_name_mapping.go
@@ -23,6 +23,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
// ServiceNameMapping is the interface which trys to build the mapping
between application-level service and interface-level service.
@@ -32,5 +33,6 @@ import (
// Get method will return the application-level services
type ServiceNameMapping interface {
Map(url *common.URL) error
- Get(url *common.URL) (*gxset.HashSet, error)
+ Get(url *common.URL, listener registry.MappingListener)
(*gxset.HashSet, error)
+ Remove(url *common.URL) error
}
diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go
index 6d066b17f..4483a2a91 100644
--- a/metadata/report/etcd/report.go
+++ b/metadata/report/etcd/report.go
@@ -37,6 +37,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
const DEFAULT_ROOT = "dubbo"
@@ -161,7 +162,7 @@ func (e *etcdMetadataReport) RegisterServiceAppMapping(key
string, group string,
}
// GetServiceAppMapping get the app names from the specified Dubbo service
interface
-func (e *etcdMetadataReport) GetServiceAppMapping(key string, group string)
(*gxset.HashSet, error) {
+func (e *etcdMetadataReport) GetServiceAppMapping(key string, group string,
listener registry.MappingListener) (*gxset.HashSet, error) {
path := e.root + constant.PathSeparator + group +
constant.PathSeparator + key
v, err := e.client.Get(path)
if err != nil {
@@ -175,6 +176,10 @@ func (e *etcdMetadataReport) GetServiceAppMapping(key
string, group string) (*gx
return set, nil
}
+func (e *etcdMetadataReport) RemoveServiceAppMappingListener(key string, group
string) error {
+ return nil
+}
+
type etcdMetadataReportFactory struct{}
// CreateMetadataReport get the MetadataReport instance of etcd
diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go
index 655b57704..c4487f878 100644
--- a/metadata/report/nacos/report.go
+++ b/metadata/report/nacos/report.go
@@ -40,6 +40,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+ "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
@@ -209,6 +210,34 @@ func (n *nacosMetadataReport) getConfig(param
vo.ConfigParam) (string, error) {
return cfg, nil
}
+func (n *nacosMetadataReport) addListener(key string, group string, notify
registry.MappingListener) error {
+ return n.client.Client().ListenConfig(vo.ConfigParam{
+ DataId: key,
+ Group: group,
+ OnChange: func(namespace, group, dataId, data string) {
+ go callback(notify, dataId, data)
+ },
+ })
+}
+
+func callback(notify registry.MappingListener, dataId, data string) {
+ appNames := strings.Split(data, constant.CommaSeparator)
+ set := gxset.NewSet()
+ for _, app := range appNames {
+ set.Add(app)
+ }
+ if err := notify.OnEvent(registry.NewServiceMappingChangedEvent(dataId,
set)); err != nil {
+ logger.Errorf("serviceMapping callback err: %s", err.Error())
+ }
+}
+
+func (n *nacosMetadataReport) removeServiceMappingListener(key string, group
string) error {
+ return n.client.Client().CancelListenConfig(vo.ConfigParam{
+ DataId: key,
+ Group: group,
+ })
+}
+
// RegisterServiceAppMapping map the specified Dubbo service interface to
current Dubbo app name
func (n *nacosMetadataReport) RegisterServiceAppMapping(key string, group
string, value string) error {
oldVal, err := n.getConfig(vo.ConfigParam{
@@ -218,8 +247,13 @@ func (n *nacosMetadataReport)
RegisterServiceAppMapping(key string, group string
if err != nil {
return err
}
- if strings.Contains(oldVal, value) {
- return nil
+ oldApps := strings.Split(oldVal, constant.CommaSeparator)
+ if len(oldApps) > 0 {
+ for _, app := range oldApps {
+ if app == value {
+ return nil
+ }
+ }
}
if oldVal != "" {
value = oldVal + constant.CommaSeparator + value
@@ -232,7 +266,13 @@ func (n *nacosMetadataReport)
RegisterServiceAppMapping(key string, group string
}
// GetServiceAppMapping get the app names from the specified Dubbo service
interface
-func (n *nacosMetadataReport) GetServiceAppMapping(key string, group string)
(*gxset.HashSet, error) {
+func (n *nacosMetadataReport) GetServiceAppMapping(key string, group string,
listener registry.MappingListener) (*gxset.HashSet, error) {
+ // add service mapping listener
+ if listener != nil {
+ if err := n.addListener(key, group, listener); err != nil {
+ logger.Errorf("add serviceMapping listener err: %s",
err.Error())
+ }
+ }
v, err := n.getConfig(vo.ConfigParam{
DataId: key,
Group: group,
@@ -251,6 +291,11 @@ func (n *nacosMetadataReport) GetServiceAppMapping(key
string, group string) (*g
return set, nil
}
+// RemoveServiceAppMappingListener remove the serviceMapping listener from
metadata center
+func (n *nacosMetadataReport) RemoveServiceAppMappingListener(key string,
group string) error {
+ return n.removeServiceMappingListener(key, group)
+}
+
type nacosMetadataReportFactory struct{}
// nolint
diff --git a/metadata/report/report.go b/metadata/report/report.go
index bfba6382c..0995d1780 100644
--- a/metadata/report/report.go
+++ b/metadata/report/report.go
@@ -24,6 +24,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
// MetadataReport is an interface of remote metadata report.
@@ -71,5 +72,8 @@ type MetadataReport interface {
RegisterServiceAppMapping(string, string, string) error
// GetServiceAppMapping get the app names from the specified Dubbo
service interface
- GetServiceAppMapping(string, string) (*gxset.HashSet, error)
+ GetServiceAppMapping(string, string, registry.MappingListener)
(*gxset.HashSet, error)
+
+ // RemoveServiceAppMappingListener remove the serviceMapping listener
by key and group
+ RemoveServiceAppMappingListener(string, string) error
}
diff --git a/metadata/report/zookeeper/report.go
b/metadata/report/zookeeper/report.go
index e93c9153a..6e2525da9 100644
--- a/metadata/report/zookeeper/report.go
+++ b/metadata/report/zookeeper/report.go
@@ -37,6 +37,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
var emptyStrSlice = make([]string, 0)
@@ -161,7 +162,7 @@ func (m *zookeeperMetadataReport)
RegisterServiceAppMapping(key string, group st
}
// GetServiceAppMapping get the app names from the specified Dubbo service
interface
-func (m *zookeeperMetadataReport) GetServiceAppMapping(key string, group
string) (*gxset.HashSet, error) {
+func (m *zookeeperMetadataReport) GetServiceAppMapping(key string, group
string, listener registry.MappingListener) (*gxset.HashSet, error) {
path := m.rootDir + group + constant.PathSeparator + key
v, _, err := m.client.GetContent(path)
if err != nil {
@@ -175,6 +176,10 @@ func (m *zookeeperMetadataReport) GetServiceAppMapping(key
string, group string)
return set, nil
}
+func (m *zookeeperMetadataReport) RemoveServiceAppMappingListener(key string,
group string) error {
+ return nil
+}
+
type zookeeperMetadataReportFactory struct{}
// nolint
diff --git a/metadata/service/remote/service_test.go
b/metadata/service/remote/service_test.go
index e435ef7a4..2ed4511f1 100644
--- a/metadata/service/remote/service_test.go
+++ b/metadata/service/remote/service_test.go
@@ -38,6 +38,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
"dubbo.apache.org/dubbo-go/v3/metadata/service/local"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
var (
@@ -61,7 +62,11 @@ func (mr metadataReport) RegisterServiceAppMapping(string,
string, string) error
panic("implement me")
}
-func (mr metadataReport) GetServiceAppMapping(string, string) (*gxset.HashSet,
error) {
+func (mr metadataReport) GetServiceAppMapping(string, string,
registry.MappingListener) (*gxset.HashSet, error) {
+ panic("implement me")
+}
+
+func (mr metadataReport) RemoveServiceAppMappingListener(string, string) error
{
panic("implement me")
}
diff --git a/registry/event.go b/registry/event.go
index 13e066e90..81e78781e 100644
--- a/registry/event.go
+++ b/registry/event.go
@@ -24,6 +24,7 @@ import (
)
import (
+ gxset "github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/gof/observer"
)
@@ -101,3 +102,29 @@ func NewServiceInstancesChangedEvent(serviceName string,
instances []ServiceInst
Instances: instances,
}
}
+
+type ServiceMappingChangeEvent struct {
+ observer.BaseEvent
+ ServiceKey string
+ ServiceNames *gxset.HashSet
+}
+
+// NewServiceMappingChangedEvent will create the ServiceMappingChangeEvent
+func NewServiceMappingChangedEvent(serviceKey string, serviceNames
*gxset.HashSet) *ServiceMappingChangeEvent {
+ return &ServiceMappingChangeEvent{
+ BaseEvent: observer.BaseEvent{
+ Source: serviceKey,
+ Timestamp: time.Now(),
+ },
+ ServiceKey: serviceKey,
+ ServiceNames: serviceNames,
+ }
+}
+
+func (sm *ServiceMappingChangeEvent) GetServiceNames() *gxset.HashSet {
+ return sm.ServiceNames
+}
+
+func (sm *ServiceMappingChangeEvent) GetServiceKey() string {
+ return sm.ServiceKey
+}
diff --git a/registry/nacos/service_discovery_test.go
b/registry/nacos/service_discovery_test.go
index 7eb18a626..99fc4984f 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -41,7 +41,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/registry"
- "dubbo.apache.org/dubbo-go/v3/registry/event"
+ "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
@@ -95,7 +95,7 @@ func TestFunction(t *testing.T) {
hs := gxset.NewSet()
hs.Add(testName)
- sicl := event.NewServiceInstancesChangedListener(hs)
+ sicl := servicediscovery.NewServiceInstancesChangedListener(hs)
sicl.AddListenerAndNotify(testName, tn)
err = sd.AddListener(sicl)
assert.NoError(t, err)
diff --git a/metadata/mapping/mock_service_name_mapping.go
b/registry/service_mapping_changed_listener.go
similarity index 66%
copy from metadata/mapping/mock_service_name_mapping.go
copy to registry/service_mapping_changed_listener.go
index c3aae3dab..3c75ac115 100644
--- a/metadata/mapping/mock_service_name_mapping.go
+++ b/registry/service_mapping_changed_listener.go
@@ -15,26 +15,13 @@
* limitations under the License.
*/
-package mapping
+package registry
import (
- gxset "github.com/dubbogo/gost/container/set"
+ "github.com/dubbogo/gost/gof/observer"
)
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
-)
-
-type MockServiceNameMapping struct{}
-
-func NewMockServiceNameMapping() *MockServiceNameMapping {
- return &MockServiceNameMapping{}
-}
-
-func (m *MockServiceNameMapping) Map(*common.URL) error {
- return nil
-}
-
-func (m *MockServiceNameMapping) Get(*common.URL) (*gxset.HashSet, error) {
- panic("implement me")
+type MappingListener interface {
+ OnEvent(e observer.Event) error
+ Stop()
}
diff --git a/registry/servicediscovery/service_discovery_registry.go
b/registry/servicediscovery/service_discovery_registry.go
index 4bb4cf84b..6854439fa 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -40,7 +40,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/service"
"dubbo.apache.org/dubbo-go/v3/metadata/service/local"
"dubbo.apache.org/dubbo-go/v3/registry"
- "dubbo.apache.org/dubbo-go/v3/registry/event"
+ _ "dubbo.apache.org/dubbo-go/v3/registry/event"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
@@ -49,12 +49,12 @@ func init() {
extension.SetRegistry(constant.ServiceRegistryProtocol,
newServiceDiscoveryRegistry)
}
-// serviceDiscoveryRegistry is the implementation of application-level
registry.
+// ServiceDiscoveryRegistry is the implementation of application-level
registry.
// It's completely different from other registry implementations
// This implementation is based on ServiceDiscovery abstraction and
ServiceNameMapping
// In order to keep compatible with interface-level registry,
// this implementation is
-type serviceDiscoveryRegistry struct {
+type ServiceDiscoveryRegistry struct {
lock sync.RWMutex
url *common.URL
serviceDiscovery registry.ServiceDiscovery
@@ -65,6 +65,7 @@ type serviceDiscoveryRegistry struct {
subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
serviceRevisionExportedURLsCache map[string]map[string][]*common.URL
serviceListeners
map[string]registry.ServiceInstancesChangedListener
+ serviceMappingListeners map[string]registry.MappingListener
}
func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
@@ -79,7 +80,7 @@ func newServiceDiscoveryRegistry(url *common.URL)
(registry.Registry, error) {
if err != nil {
return nil, perrors.WithMessage(err, "could not init metadata
service")
}
- return &serviceDiscoveryRegistry{
+ return &ServiceDiscoveryRegistry{
url: url,
serviceDiscovery: serviceDiscovery,
subscribedServices: subscribedServices,
@@ -89,17 +90,19 @@ func newServiceDiscoveryRegistry(url *common.URL)
(registry.Registry, error) {
serviceNameMapping: serviceNameMapping,
metaDataService: metaDataService,
serviceListeners:
make(map[string]registry.ServiceInstancesChangedListener),
+ // cache for mapping listener
+ serviceMappingListeners:
make(map[string]registry.MappingListener),
}, nil
}
-func (s *serviceDiscoveryRegistry) UnRegister(url *common.URL) error {
+func (s *ServiceDiscoveryRegistry) UnRegister(url *common.URL) error {
if !shouldRegister(url) {
return nil
}
return s.metaDataService.UnexportURL(url)
}
-func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener
registry.NotifyListener) error {
+func (s *ServiceDiscoveryRegistry) UnSubscribe(url *common.URL, listener
registry.NotifyListener) error {
if !shouldSubscribe(url) {
return nil
}
@@ -107,7 +110,7 @@ func (s *serviceDiscoveryRegistry) UnSubscribe(url
*common.URL, listener registr
if err != nil {
return err
}
- services := s.getServices(url)
+ services := s.getServices(url, nil)
if services == nil {
return nil
}
@@ -115,6 +118,11 @@ func (s *serviceDiscoveryRegistry) UnSubscribe(url
*common.URL, listener registr
serviceNamesKey := services.String()
l := s.serviceListeners[serviceNamesKey]
l.RemoveListener(url.ServiceKey())
+ s.stopListen(url)
+ err = s.serviceNameMapping.Remove(url)
+ if err != nil {
+ return err
+ }
return nil
}
@@ -140,29 +148,29 @@ func parseServices(literalServices string) *gxset.HashSet
{
return set
}
-func (s *serviceDiscoveryRegistry) GetServiceDiscovery()
registry.ServiceDiscovery {
+func (s *ServiceDiscoveryRegistry) GetServiceDiscovery()
registry.ServiceDiscovery {
return s.serviceDiscovery
}
-func (s *serviceDiscoveryRegistry) GetURL() *common.URL {
+func (s *ServiceDiscoveryRegistry) GetURL() *common.URL {
return s.url
}
-func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+func (s *ServiceDiscoveryRegistry) IsAvailable() bool {
if s.serviceDiscovery.GetServices() == nil {
return false
}
return len(s.serviceDiscovery.GetServices().Values()) > 0
}
-func (s *serviceDiscoveryRegistry) Destroy() {
+func (s *ServiceDiscoveryRegistry) Destroy() {
err := s.serviceDiscovery.Destroy()
if err != nil {
logger.Errorf("destroy serviceDiscovery catch error:%s",
err.Error())
}
}
-func (s *serviceDiscoveryRegistry) Register(url *common.URL) error {
+func (s *ServiceDiscoveryRegistry) Register(url *common.URL) error {
if !shouldRegister(url) {
return nil
}
@@ -189,7 +197,7 @@ func shouldRegister(url *common.URL) bool {
return false
}
-func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify
registry.NotifyListener) error {
+func (s *ServiceDiscoveryRegistry) Subscribe(url *common.URL, notify
registry.NotifyListener) error {
if !shouldSubscribe(url) {
return nil
}
@@ -197,17 +205,25 @@ func (s *serviceDiscoveryRegistry) Subscribe(url
*common.URL, notify registry.No
if err != nil {
return perrors.WithMessage(err, "subscribe url error:
"+url.String())
}
- services := s.getServices(url)
+
+ mappingListener := NewMappingListener(s.url, url, s.subscribedServices,
notify)
+ services := s.getServices(url, mappingListener)
if services.Empty() {
- return perrors.Errorf("Should has at least one way to know
which services this interface belongs to, "+
- "subscription url:%s", url.String())
+ return nil
}
+ // first notify
+
mappingListener.OnEvent(registry.NewServiceMappingChangedEvent(url.ServiceKey(),
services))
+ return nil
+}
+
+func (s *ServiceDiscoveryRegistry) SubscribeURL(url *common.URL, notify
registry.NotifyListener, services *gxset.HashSet) {
// FIXME ServiceNames.String() is not good
+ var err error
serviceNamesKey := services.String()
protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
listener := s.serviceListeners[serviceNamesKey]
if listener == nil {
- listener = event.NewServiceInstancesChangedListener(services)
+ listener = NewServiceInstancesChangedListener(services)
for _, serviceNameTmp := range services.Values() {
serviceName := serviceNameTmp.(string)
instances :=
s.serviceDiscovery.GetInstances(serviceName)
@@ -222,16 +238,14 @@ func (s *serviceDiscoveryRegistry) Subscribe(url
*common.URL, notify registry.No
}
s.serviceListeners[serviceNamesKey] = listener
listener.AddListenerAndNotify(protocolServiceKey, notify)
-
err = s.serviceDiscovery.AddListener(listener)
if err != nil {
logger.Errorf("add instance listener catch error,url:%s
err:%s", url.String(), err.Error())
}
- return nil
}
// LoadSubscribeInstances load subscribe instance
-func (s *serviceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL,
notify registry.NotifyListener) error {
+func (s *ServiceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL,
notify registry.NotifyListener) error {
appName := url.GetParam(constant.ApplicationKey, url.Username)
instances := s.serviceDiscovery.GetInstances(appName)
for _, instance := range instances {
@@ -248,7 +262,7 @@ func (s *serviceDiscoveryRegistry)
LoadSubscribeInstances(url *common.URL, notif
logger.Infof("Find instance without valid service
metadata: %s", instance.GetHost())
continue
}
- metadataInfo, err := event.GetMetadataInfo(instance, revision)
+ metadataInfo, err := GetMetadataInfo(instance, revision)
if err != nil {
return err
}
@@ -288,7 +302,7 @@ func appendParam(buffer bytes.Buffer, paramKey string, url
*common.URL) {
buffer.WriteString(url.GetParam(paramKey, ""))
}
-func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL
*common.URL, serviceInstances []registry.ServiceInstance) []*common.URL {
+func (s *ServiceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL
*common.URL, serviceInstances []registry.ServiceInstance) []*common.URL {
var urls []*common.URL
for _, syn := range s.subscribedURLsSynthesizers {
if syn.Support(subscribedURL) {
@@ -302,14 +316,14 @@ func shouldSubscribe(url *common.URL) bool {
return !shouldRegister(url)
}
-func (s *serviceDiscoveryRegistry) getServices(url *common.URL) *gxset.HashSet
{
+func (s *ServiceDiscoveryRegistry) getServices(url *common.URL, listener
registry.MappingListener) *gxset.HashSet {
services := gxset.NewSet()
serviceNames := url.GetParam(constant.ProvidedBy, "")
if len(serviceNames) > 0 {
services = parseServices(serviceNames)
}
if services.Empty() {
- services = s.findMappedServices(url)
+ services = s.findMappedServices(url, listener)
if services.Empty() {
return s.subscribedServices
}
@@ -317,15 +331,28 @@ func (s *serviceDiscoveryRegistry) getServices(url
*common.URL) *gxset.HashSet {
return services
}
-func (s *serviceDiscoveryRegistry) findMappedServices(url *common.URL)
*gxset.HashSet {
- serviceNames, err := s.serviceNameMapping.Get(url)
+func (s *ServiceDiscoveryRegistry) findMappedServices(url *common.URL,
listener registry.MappingListener) *gxset.HashSet {
+ serviceNames, err := s.serviceNameMapping.Get(url, listener)
if err != nil {
logger.Errorf("get service names catch error, url:%s, err:%s ",
url.String(), err.Error())
return gxset.NewSet()
}
+ if listener != nil {
+ protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
+ s.serviceMappingListeners[protocolServiceKey] = listener
+ }
return serviceNames
}
var (
exporting = &atomic.Bool{}
)
+
+func (s *ServiceDiscoveryRegistry) stopListen(url *common.URL) {
+ protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
+ listener := s.serviceMappingListeners[protocolServiceKey]
+ if listener != nil {
+ delete(s.serviceMappingListeners, protocolServiceKey)
+ listener.Stop()
+ }
+}
diff --git a/registry/event/service_instances_changed_listener_impl.go
b/registry/servicediscovery/service_instances_changed_listener_impl.go
similarity index 99%
rename from registry/event/service_instances_changed_listener_impl.go
rename to registry/servicediscovery/service_instances_changed_listener_impl.go
index b8de36e77..899b55075 100644
--- a/registry/event/service_instances_changed_listener_impl.go
+++ b/registry/servicediscovery/service_instances_changed_listener_impl.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package event
+package servicediscovery
import (
"reflect"
diff --git a/registry/servicediscovery/service_mapping_change_listener_impl.go
b/registry/servicediscovery/service_mapping_change_listener_impl.go
new file mode 100644
index 000000000..5450d81f5
--- /dev/null
+++ b/registry/servicediscovery/service_mapping_change_listener_impl.go
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "sync"
+)
+
+import (
+ gxset "github.com/dubbogo/gost/container/set"
+ "github.com/dubbogo/gost/gof/observer"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+type ServiceMappingChangedListenerImpl struct {
+ oldServiceNames *gxset.HashSet
+ listener registry.NotifyListener
+ registryUrl *common.URL
+ serviceUrl *common.URL
+ mappingCache *sync.Map
+ stop int
+}
+
+const (
+ ServiceMappingListenerStart = iota
+ ServiceMappingListenerStop
+)
+
+func NewMappingListener(registryUrl *common.URL, serviceUrl *common.URL,
oldServiceNames *gxset.HashSet, listener registry.NotifyListener)
*ServiceMappingChangedListenerImpl {
+ return &ServiceMappingChangedListenerImpl{
+ listener: listener,
+ oldServiceNames: oldServiceNames,
+ registryUrl: registryUrl,
+ serviceUrl: serviceUrl,
+ stop: ServiceMappingListenerStart,
+ mappingCache: &sync.Map{},
+ }
+}
+
+// OnEvent on ServiceMappingChangedEvent the service mapping change event
+func (lstn *ServiceMappingChangedListenerImpl) OnEvent(e observer.Event) error
{
+ var (
+ err error
+ reg registry.Registry
+ )
+ if lstn.stop == ServiceMappingListenerStop {
+ return nil
+ }
+ sm, ok := e.(*registry.ServiceMappingChangeEvent)
+ if !ok {
+ return nil
+ }
+ newServiceNames := sm.GetServiceNames()
+ oldServiceNames := lstn.oldServiceNames
+ // serviceMapping is orderly
+ if newServiceNames.Empty() || oldServiceNames.String() ==
newServiceNames.String() {
+ return nil
+ }
+ if newServiceNames.Size() > 0 && oldServiceNames.Empty() {
+ if reg, err = extension.GetRegistry(lstn.registryUrl.Protocol,
lstn.registryUrl); err != nil {
+ return err
+ }
+ if sdreg, ok := reg.(*ServiceDiscoveryRegistry); ok {
+ sdreg.SubscribeURL(lstn.serviceUrl, lstn.listener,
newServiceNames)
+ }
+ lstn.oldServiceNames = newServiceNames
+ return nil
+ }
+ for _, service := range newServiceNames.Values() {
+ if !oldServiceNames.Contains(service) {
+ lstn.mappingCache.Delete(oldServiceNames.String())
+ lstn.mappingCache.Store(newServiceNames.String(),
newServiceNames)
+ if reg, err =
extension.GetRegistry(lstn.registryUrl.Protocol, lstn.registryUrl); err != nil {
+ return err
+ }
+ if sdreg, ok := reg.(*ServiceDiscoveryRegistry); ok {
+ sdreg.SubscribeURL(lstn.serviceUrl,
lstn.listener, newServiceNames)
+ }
+ lstn.oldServiceNames = newServiceNames
+ }
+ }
+ return err
+}
+
+// Stop on ServiceMappingChangedEvent the service mapping change event
+func (lstn *ServiceMappingChangedListenerImpl) Stop() {
+ lstn.stop = ServiceMappingListenerStop
+}