This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 02a1d5b93 add serviceMappingListener for application service discovery 
(#2205)
02a1d5b93 is described below

commit 02a1d5b93591391a737d9c380e5199e04d47993e
Author: bobtthp <[email protected]>
AuthorDate: Sat Mar 25 09:01:22 2023 +0800

    add serviceMappingListener for application service discovery (#2205)
    
    * add serviceMappingListener
    
    * rollback loadbalance change
    
    * resolve cycle ref
    
    * add ref
    
    * fix lint
    
    * fix ref
    
    * fix protocol prom
    
    * fix ci prom
    
    * fix service mapping duplicated
    
    * retry ci
    
    * delete hc zip
    
    ---------
    
    Co-authored-by: bobtthp <[email protected]>
---
 config/instance/metadata_report_test.go            |   7 +-
 metadata/mapping/memory/service_name_mapping.go    |   7 +-
 metadata/mapping/metadata/service_name_mapping.go  |  11 ++-
 metadata/mapping/mock_service_name_mapping.go      |   7 +-
 metadata/mapping/service_name_mapping.go           |   4 +-
 metadata/report/etcd/report.go                     |   7 +-
 metadata/report/nacos/report.go                    |  51 +++++++++-
 metadata/report/report.go                          |   6 +-
 metadata/report/zookeeper/report.go                |   7 +-
 metadata/service/remote/service_test.go            |   7 +-
 registry/event.go                                  |  27 ++++++
 registry/nacos/service_discovery_test.go           |   4 +-
 .../service_mapping_changed_listener.go            |  23 +----
 .../servicediscovery/service_discovery_registry.go |  79 ++++++++++-----
 .../service_instances_changed_listener_impl.go     |   2 +-
 .../service_mapping_change_listener_impl.go        | 108 +++++++++++++++++++++
 16 files changed, 297 insertions(+), 60 deletions(-)

diff --git a/config/instance/metadata_report_test.go 
b/config/instance/metadata_report_test.go
index 4a56883b4..8d49009f7 100644
--- a/config/instance/metadata_report_test.go
+++ b/config/instance/metadata_report_test.go
@@ -33,6 +33,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/metadata/identifier"
        "dubbo.apache.org/dubbo-go/v3/metadata/report"
        "dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 func TestGetMetadataReportInstance(t *testing.T) {
@@ -56,7 +57,11 @@ func (m mockMetadataReport) 
RegisterServiceAppMapping(string, string, string) er
        panic("implement me")
 }
 
-func (m mockMetadataReport) GetServiceAppMapping(string, string) 
(*gxset.HashSet, error) {
+func (m mockMetadataReport) GetServiceAppMapping(string, string, 
registry.MappingListener) (*gxset.HashSet, error) {
+       panic("implement me")
+}
+
+func (m mockMetadataReport) RemoveServiceAppMappingListener(string, string) 
error {
        panic("implement me")
 }
 
diff --git a/metadata/mapping/memory/service_name_mapping.go 
b/metadata/mapping/memory/service_name_mapping.go
index 73b0603ea..9d7a72010 100644
--- a/metadata/mapping/memory/service_name_mapping.go
+++ b/metadata/mapping/memory/service_name_mapping.go
@@ -30,6 +30,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/config"
        "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 func init() {
@@ -42,10 +43,14 @@ func (i *InMemoryServiceNameMapping) Map(url *common.URL) 
error {
        return nil
 }
 
-func (i *InMemoryServiceNameMapping) Get(url *common.URL) (*gxset.HashSet, 
error) {
+func (i *InMemoryServiceNameMapping) Get(url *common.URL, listener 
registry.MappingListener) (*gxset.HashSet, error) {
        return gxset.NewSet(config.GetApplicationConfig().Name), nil
 }
 
+func (i *InMemoryServiceNameMapping) Remove(url *common.URL) error {
+       return nil
+}
+
 var (
        serviceNameMappingInstance *InMemoryServiceNameMapping
        serviceNameMappingOnce     sync.Once
diff --git a/metadata/mapping/metadata/service_name_mapping.go 
b/metadata/mapping/metadata/service_name_mapping.go
index ae934ae48..8c0104000 100644
--- a/metadata/mapping/metadata/service_name_mapping.go
+++ b/metadata/mapping/metadata/service_name_mapping.go
@@ -36,6 +36,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/config/instance"
        "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
        "dubbo.apache.org/dubbo-go/v3/metadata/report"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 const (
@@ -75,10 +76,16 @@ func (d *MetadataServiceNameMapping) Map(url *common.URL) 
error {
 }
 
 // Get will return the application-level services. If not found, the empty set 
will be returned.
-func (d *MetadataServiceNameMapping) Get(url *common.URL) (*gxset.HashSet, 
error) {
+func (d *MetadataServiceNameMapping) Get(url *common.URL, listener 
registry.MappingListener) (*gxset.HashSet, error) {
        serviceInterface := url.GetParam(constant.InterfaceKey, "")
        metadataReport := instance.GetMetadataReportInstance()
-       return metadataReport.GetServiceAppMapping(serviceInterface, 
defaultGroup)
+       return metadataReport.GetServiceAppMapping(serviceInterface, 
defaultGroup, listener)
+}
+
+func (d *MetadataServiceNameMapping) Remove(url *common.URL) error {
+       serviceInterface := url.GetParam(constant.InterfaceKey, "")
+       metadataReport := instance.GetMetadataReportInstance()
+       return metadataReport.RemoveServiceAppMappingListener(serviceInterface, 
defaultGroup)
 }
 
 // buildMappingKey will return mapping key, it looks like 
defaultGroup/serviceInterface
diff --git a/metadata/mapping/mock_service_name_mapping.go 
b/metadata/mapping/mock_service_name_mapping.go
index c3aae3dab..0a5a437d5 100644
--- a/metadata/mapping/mock_service_name_mapping.go
+++ b/metadata/mapping/mock_service_name_mapping.go
@@ -23,6 +23,7 @@ import (
 
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 type MockServiceNameMapping struct{}
@@ -35,6 +36,10 @@ func (m *MockServiceNameMapping) Map(*common.URL) error {
        return nil
 }
 
-func (m *MockServiceNameMapping) Get(*common.URL) (*gxset.HashSet, error) {
+func (m *MockServiceNameMapping) Get(*common.URL, registry.MappingListener) 
(*gxset.HashSet, error) {
+       panic("implement me")
+}
+
+func (m *MockServiceNameMapping) Remove(*common.URL) error {
        panic("implement me")
 }
diff --git a/metadata/mapping/service_name_mapping.go 
b/metadata/mapping/service_name_mapping.go
index 75c9b679d..ed76d0f5d 100644
--- a/metadata/mapping/service_name_mapping.go
+++ b/metadata/mapping/service_name_mapping.go
@@ -23,6 +23,7 @@ import (
 
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 // ServiceNameMapping  is the interface which trys to build the mapping 
between application-level service and interface-level service.
@@ -32,5 +33,6 @@ import (
 // Get method will return the application-level services
 type ServiceNameMapping interface {
        Map(url *common.URL) error
-       Get(url *common.URL) (*gxset.HashSet, error)
+       Get(url *common.URL, listener registry.MappingListener) 
(*gxset.HashSet, error)
+       Remove(url *common.URL) error
 }
diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go
index 6d066b17f..4483a2a91 100644
--- a/metadata/report/etcd/report.go
+++ b/metadata/report/etcd/report.go
@@ -37,6 +37,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/metadata/identifier"
        "dubbo.apache.org/dubbo-go/v3/metadata/report"
        "dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 const DEFAULT_ROOT = "dubbo"
@@ -161,7 +162,7 @@ func (e *etcdMetadataReport) RegisterServiceAppMapping(key 
string, group string,
 }
 
 // GetServiceAppMapping get the app names from the specified Dubbo service 
interface
-func (e *etcdMetadataReport) GetServiceAppMapping(key string, group string) 
(*gxset.HashSet, error) {
+func (e *etcdMetadataReport) GetServiceAppMapping(key string, group string, 
listener registry.MappingListener) (*gxset.HashSet, error) {
        path := e.root + constant.PathSeparator + group + 
constant.PathSeparator + key
        v, err := e.client.Get(path)
        if err != nil {
@@ -175,6 +176,10 @@ func (e *etcdMetadataReport) GetServiceAppMapping(key 
string, group string) (*gx
        return set, nil
 }
 
+func (e *etcdMetadataReport) RemoveServiceAppMappingListener(key string, group 
string) error {
+       return nil
+}
+
 type etcdMetadataReportFactory struct{}
 
 // CreateMetadataReport get the MetadataReport instance of etcd
diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go
index 655b57704..c4487f878 100644
--- a/metadata/report/nacos/report.go
+++ b/metadata/report/nacos/report.go
@@ -40,6 +40,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/metadata/identifier"
        "dubbo.apache.org/dubbo-go/v3/metadata/report"
        "dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+       "dubbo.apache.org/dubbo-go/v3/registry"
        "dubbo.apache.org/dubbo-go/v3/remoting/nacos"
 )
 
@@ -209,6 +210,34 @@ func (n *nacosMetadataReport) getConfig(param 
vo.ConfigParam) (string, error) {
        return cfg, nil
 }
 
+func (n *nacosMetadataReport) addListener(key string, group string, notify 
registry.MappingListener) error {
+       return n.client.Client().ListenConfig(vo.ConfigParam{
+               DataId: key,
+               Group:  group,
+               OnChange: func(namespace, group, dataId, data string) {
+                       go callback(notify, dataId, data)
+               },
+       })
+}
+
+func callback(notify registry.MappingListener, dataId, data string) {
+       appNames := strings.Split(data, constant.CommaSeparator)
+       set := gxset.NewSet()
+       for _, app := range appNames {
+               set.Add(app)
+       }
+       if err := notify.OnEvent(registry.NewServiceMappingChangedEvent(dataId, 
set)); err != nil {
+               logger.Errorf("serviceMapping callback err: %s", err.Error())
+       }
+}
+
+func (n *nacosMetadataReport) removeServiceMappingListener(key string, group 
string) error {
+       return n.client.Client().CancelListenConfig(vo.ConfigParam{
+               DataId: key,
+               Group:  group,
+       })
+}
+
 // RegisterServiceAppMapping map the specified Dubbo service interface to 
current Dubbo app name
 func (n *nacosMetadataReport) RegisterServiceAppMapping(key string, group 
string, value string) error {
        oldVal, err := n.getConfig(vo.ConfigParam{
@@ -218,8 +247,13 @@ func (n *nacosMetadataReport) 
RegisterServiceAppMapping(key string, group string
        if err != nil {
                return err
        }
-       if strings.Contains(oldVal, value) {
-               return nil
+       oldApps := strings.Split(oldVal, constant.CommaSeparator)
+       if len(oldApps) > 0 {
+               for _, app := range oldApps {
+                       if app == value {
+                               return nil
+                       }
+               }
        }
        if oldVal != "" {
                value = oldVal + constant.CommaSeparator + value
@@ -232,7 +266,13 @@ func (n *nacosMetadataReport) 
RegisterServiceAppMapping(key string, group string
 }
 
 // GetServiceAppMapping get the app names from the specified Dubbo service 
interface
-func (n *nacosMetadataReport) GetServiceAppMapping(key string, group string) 
(*gxset.HashSet, error) {
+func (n *nacosMetadataReport) GetServiceAppMapping(key string, group string, 
listener registry.MappingListener) (*gxset.HashSet, error) {
+       // add service mapping listener
+       if listener != nil {
+               if err := n.addListener(key, group, listener); err != nil {
+                       logger.Errorf("add serviceMapping listener err: %s", 
err.Error())
+               }
+       }
        v, err := n.getConfig(vo.ConfigParam{
                DataId: key,
                Group:  group,
@@ -251,6 +291,11 @@ func (n *nacosMetadataReport) GetServiceAppMapping(key 
string, group string) (*g
        return set, nil
 }
 
+// RemoveServiceAppMappingListener remove the serviceMapping listener from 
metadata center
+func (n *nacosMetadataReport) RemoveServiceAppMappingListener(key string, 
group string) error {
+       return n.removeServiceMappingListener(key, group)
+}
+
 type nacosMetadataReportFactory struct{}
 
 // nolint
diff --git a/metadata/report/report.go b/metadata/report/report.go
index bfba6382c..0995d1780 100644
--- a/metadata/report/report.go
+++ b/metadata/report/report.go
@@ -24,6 +24,7 @@ import (
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/metadata/identifier"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 // MetadataReport is an interface of remote metadata report.
@@ -71,5 +72,8 @@ type MetadataReport interface {
        RegisterServiceAppMapping(string, string, string) error
 
        // GetServiceAppMapping get the app names from the specified Dubbo 
service interface
-       GetServiceAppMapping(string, string) (*gxset.HashSet, error)
+       GetServiceAppMapping(string, string, registry.MappingListener) 
(*gxset.HashSet, error)
+
+       // RemoveServiceAppMappingListener remove the serviceMapping listener 
by key and group
+       RemoveServiceAppMappingListener(string, string) error
 }
diff --git a/metadata/report/zookeeper/report.go 
b/metadata/report/zookeeper/report.go
index e93c9153a..6e2525da9 100644
--- a/metadata/report/zookeeper/report.go
+++ b/metadata/report/zookeeper/report.go
@@ -37,6 +37,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/metadata/identifier"
        "dubbo.apache.org/dubbo-go/v3/metadata/report"
        "dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 var emptyStrSlice = make([]string, 0)
@@ -161,7 +162,7 @@ func (m *zookeeperMetadataReport) 
RegisterServiceAppMapping(key string, group st
 }
 
 // GetServiceAppMapping get the app names from the specified Dubbo service 
interface
-func (m *zookeeperMetadataReport) GetServiceAppMapping(key string, group 
string) (*gxset.HashSet, error) {
+func (m *zookeeperMetadataReport) GetServiceAppMapping(key string, group 
string, listener registry.MappingListener) (*gxset.HashSet, error) {
        path := m.rootDir + group + constant.PathSeparator + key
        v, _, err := m.client.GetContent(path)
        if err != nil {
@@ -175,6 +176,10 @@ func (m *zookeeperMetadataReport) GetServiceAppMapping(key 
string, group string)
        return set, nil
 }
 
+func (m *zookeeperMetadataReport) RemoveServiceAppMappingListener(key string, 
group string) error {
+       return nil
+}
+
 type zookeeperMetadataReportFactory struct{}
 
 // nolint
diff --git a/metadata/service/remote/service_test.go 
b/metadata/service/remote/service_test.go
index e435ef7a4..2ed4511f1 100644
--- a/metadata/service/remote/service_test.go
+++ b/metadata/service/remote/service_test.go
@@ -38,6 +38,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/metadata/report"
        "dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
        "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
+       "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
 var (
@@ -61,7 +62,11 @@ func (mr metadataReport) RegisterServiceAppMapping(string, 
string, string) error
        panic("implement me")
 }
 
-func (mr metadataReport) GetServiceAppMapping(string, string) (*gxset.HashSet, 
error) {
+func (mr metadataReport) GetServiceAppMapping(string, string, 
registry.MappingListener) (*gxset.HashSet, error) {
+       panic("implement me")
+}
+
+func (mr metadataReport) RemoveServiceAppMappingListener(string, string) error 
{
        panic("implement me")
 }
 
diff --git a/registry/event.go b/registry/event.go
index 13e066e90..81e78781e 100644
--- a/registry/event.go
+++ b/registry/event.go
@@ -24,6 +24,7 @@ import (
 )
 
 import (
+       gxset "github.com/dubbogo/gost/container/set"
        "github.com/dubbogo/gost/gof/observer"
 )
 
@@ -101,3 +102,29 @@ func NewServiceInstancesChangedEvent(serviceName string, 
instances []ServiceInst
                Instances:   instances,
        }
 }
+
+type ServiceMappingChangeEvent struct {
+       observer.BaseEvent
+       ServiceKey   string
+       ServiceNames *gxset.HashSet
+}
+
+// NewServiceMappingChangedEvent will create the ServiceMappingChangeEvent
+func NewServiceMappingChangedEvent(serviceKey string, serviceNames 
*gxset.HashSet) *ServiceMappingChangeEvent {
+       return &ServiceMappingChangeEvent{
+               BaseEvent: observer.BaseEvent{
+                       Source:    serviceKey,
+                       Timestamp: time.Now(),
+               },
+               ServiceKey:   serviceKey,
+               ServiceNames: serviceNames,
+       }
+}
+
+func (sm *ServiceMappingChangeEvent) GetServiceNames() *gxset.HashSet {
+       return sm.ServiceNames
+}
+
+func (sm *ServiceMappingChangeEvent) GetServiceKey() string {
+       return sm.ServiceKey
+}
diff --git a/registry/nacos/service_discovery_test.go 
b/registry/nacos/service_discovery_test.go
index 7eb18a626..99fc4984f 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -41,7 +41,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/protocol"
        "dubbo.apache.org/dubbo-go/v3/registry"
-       "dubbo.apache.org/dubbo-go/v3/registry/event"
+       "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
        "dubbo.apache.org/dubbo-go/v3/remoting/nacos"
 )
 
@@ -95,7 +95,7 @@ func TestFunction(t *testing.T) {
        hs := gxset.NewSet()
        hs.Add(testName)
 
-       sicl := event.NewServiceInstancesChangedListener(hs)
+       sicl := servicediscovery.NewServiceInstancesChangedListener(hs)
        sicl.AddListenerAndNotify(testName, tn)
        err = sd.AddListener(sicl)
        assert.NoError(t, err)
diff --git a/metadata/mapping/mock_service_name_mapping.go 
b/registry/service_mapping_changed_listener.go
similarity index 66%
copy from metadata/mapping/mock_service_name_mapping.go
copy to registry/service_mapping_changed_listener.go
index c3aae3dab..3c75ac115 100644
--- a/metadata/mapping/mock_service_name_mapping.go
+++ b/registry/service_mapping_changed_listener.go
@@ -15,26 +15,13 @@
  * limitations under the License.
  */
 
-package mapping
+package registry
 
 import (
-       gxset "github.com/dubbogo/gost/container/set"
+       "github.com/dubbogo/gost/gof/observer"
 )
 
-import (
-       "dubbo.apache.org/dubbo-go/v3/common"
-)
-
-type MockServiceNameMapping struct{}
-
-func NewMockServiceNameMapping() *MockServiceNameMapping {
-       return &MockServiceNameMapping{}
-}
-
-func (m *MockServiceNameMapping) Map(*common.URL) error {
-       return nil
-}
-
-func (m *MockServiceNameMapping) Get(*common.URL) (*gxset.HashSet, error) {
-       panic("implement me")
+type MappingListener interface {
+       OnEvent(e observer.Event) error
+       Stop()
 }
diff --git a/registry/servicediscovery/service_discovery_registry.go 
b/registry/servicediscovery/service_discovery_registry.go
index 4bb4cf84b..6854439fa 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -40,7 +40,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/metadata/service"
        "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
        "dubbo.apache.org/dubbo-go/v3/registry"
-       "dubbo.apache.org/dubbo-go/v3/registry/event"
+       _ "dubbo.apache.org/dubbo-go/v3/registry/event"
        "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer"
        "dubbo.apache.org/dubbo-go/v3/remoting"
 )
@@ -49,12 +49,12 @@ func init() {
        extension.SetRegistry(constant.ServiceRegistryProtocol, 
newServiceDiscoveryRegistry)
 }
 
-// serviceDiscoveryRegistry is the implementation of application-level 
registry.
+// ServiceDiscoveryRegistry is the implementation of application-level 
registry.
 // It's completely different from other registry implementations
 // This implementation is based on ServiceDiscovery abstraction and 
ServiceNameMapping
 // In order to keep compatible with interface-level registry,
 // this implementation is
-type serviceDiscoveryRegistry struct {
+type ServiceDiscoveryRegistry struct {
        lock                             sync.RWMutex
        url                              *common.URL
        serviceDiscovery                 registry.ServiceDiscovery
@@ -65,6 +65,7 @@ type serviceDiscoveryRegistry struct {
        subscribedURLsSynthesizers       []synthesizer.SubscribedURLsSynthesizer
        serviceRevisionExportedURLsCache map[string]map[string][]*common.URL
        serviceListeners                 
map[string]registry.ServiceInstancesChangedListener
+       serviceMappingListeners          map[string]registry.MappingListener
 }
 
 func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
@@ -79,7 +80,7 @@ func newServiceDiscoveryRegistry(url *common.URL) 
(registry.Registry, error) {
        if err != nil {
                return nil, perrors.WithMessage(err, "could not init metadata 
service")
        }
-       return &serviceDiscoveryRegistry{
+       return &ServiceDiscoveryRegistry{
                url:                              url,
                serviceDiscovery:                 serviceDiscovery,
                subscribedServices:               subscribedServices,
@@ -89,17 +90,19 @@ func newServiceDiscoveryRegistry(url *common.URL) 
(registry.Registry, error) {
                serviceNameMapping:               serviceNameMapping,
                metaDataService:                  metaDataService,
                serviceListeners:                 
make(map[string]registry.ServiceInstancesChangedListener),
+               // cache for mapping listener
+               serviceMappingListeners: 
make(map[string]registry.MappingListener),
        }, nil
 }
 
-func (s *serviceDiscoveryRegistry) UnRegister(url *common.URL) error {
+func (s *ServiceDiscoveryRegistry) UnRegister(url *common.URL) error {
        if !shouldRegister(url) {
                return nil
        }
        return s.metaDataService.UnexportURL(url)
 }
 
-func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener 
registry.NotifyListener) error {
+func (s *ServiceDiscoveryRegistry) UnSubscribe(url *common.URL, listener 
registry.NotifyListener) error {
        if !shouldSubscribe(url) {
                return nil
        }
@@ -107,7 +110,7 @@ func (s *serviceDiscoveryRegistry) UnSubscribe(url 
*common.URL, listener registr
        if err != nil {
                return err
        }
-       services := s.getServices(url)
+       services := s.getServices(url, nil)
        if services == nil {
                return nil
        }
@@ -115,6 +118,11 @@ func (s *serviceDiscoveryRegistry) UnSubscribe(url 
*common.URL, listener registr
        serviceNamesKey := services.String()
        l := s.serviceListeners[serviceNamesKey]
        l.RemoveListener(url.ServiceKey())
+       s.stopListen(url)
+       err = s.serviceNameMapping.Remove(url)
+       if err != nil {
+               return err
+       }
        return nil
 }
 
@@ -140,29 +148,29 @@ func parseServices(literalServices string) *gxset.HashSet 
{
        return set
 }
 
-func (s *serviceDiscoveryRegistry) GetServiceDiscovery() 
registry.ServiceDiscovery {
+func (s *ServiceDiscoveryRegistry) GetServiceDiscovery() 
registry.ServiceDiscovery {
        return s.serviceDiscovery
 }
 
-func (s *serviceDiscoveryRegistry) GetURL() *common.URL {
+func (s *ServiceDiscoveryRegistry) GetURL() *common.URL {
        return s.url
 }
 
-func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+func (s *ServiceDiscoveryRegistry) IsAvailable() bool {
        if s.serviceDiscovery.GetServices() == nil {
                return false
        }
        return len(s.serviceDiscovery.GetServices().Values()) > 0
 }
 
-func (s *serviceDiscoveryRegistry) Destroy() {
+func (s *ServiceDiscoveryRegistry) Destroy() {
        err := s.serviceDiscovery.Destroy()
        if err != nil {
                logger.Errorf("destroy serviceDiscovery catch error:%s", 
err.Error())
        }
 }
 
-func (s *serviceDiscoveryRegistry) Register(url *common.URL) error {
+func (s *ServiceDiscoveryRegistry) Register(url *common.URL) error {
        if !shouldRegister(url) {
                return nil
        }
@@ -189,7 +197,7 @@ func shouldRegister(url *common.URL) bool {
        return false
 }
 
-func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify 
registry.NotifyListener) error {
+func (s *ServiceDiscoveryRegistry) Subscribe(url *common.URL, notify 
registry.NotifyListener) error {
        if !shouldSubscribe(url) {
                return nil
        }
@@ -197,17 +205,25 @@ func (s *serviceDiscoveryRegistry) Subscribe(url 
*common.URL, notify registry.No
        if err != nil {
                return perrors.WithMessage(err, "subscribe url error: 
"+url.String())
        }
-       services := s.getServices(url)
+
+       mappingListener := NewMappingListener(s.url, url, s.subscribedServices, 
notify)
+       services := s.getServices(url, mappingListener)
        if services.Empty() {
-               return perrors.Errorf("Should has at least one way to know 
which services this interface belongs to, "+
-                       "subscription url:%s", url.String())
+               return nil
        }
+       // first notify
+       
mappingListener.OnEvent(registry.NewServiceMappingChangedEvent(url.ServiceKey(),
 services))
+       return nil
+}
+
+func (s *ServiceDiscoveryRegistry) SubscribeURL(url *common.URL, notify 
registry.NotifyListener, services *gxset.HashSet) {
        // FIXME ServiceNames.String() is not good
+       var err error
        serviceNamesKey := services.String()
        protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
        listener := s.serviceListeners[serviceNamesKey]
        if listener == nil {
-               listener = event.NewServiceInstancesChangedListener(services)
+               listener = NewServiceInstancesChangedListener(services)
                for _, serviceNameTmp := range services.Values() {
                        serviceName := serviceNameTmp.(string)
                        instances := 
s.serviceDiscovery.GetInstances(serviceName)
@@ -222,16 +238,14 @@ func (s *serviceDiscoveryRegistry) Subscribe(url 
*common.URL, notify registry.No
        }
        s.serviceListeners[serviceNamesKey] = listener
        listener.AddListenerAndNotify(protocolServiceKey, notify)
-
        err = s.serviceDiscovery.AddListener(listener)
        if err != nil {
                logger.Errorf("add instance listener catch error,url:%s 
err:%s", url.String(), err.Error())
        }
-       return nil
 }
 
 // LoadSubscribeInstances load subscribe instance
-func (s *serviceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL, 
notify registry.NotifyListener) error {
+func (s *ServiceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL, 
notify registry.NotifyListener) error {
        appName := url.GetParam(constant.ApplicationKey, url.Username)
        instances := s.serviceDiscovery.GetInstances(appName)
        for _, instance := range instances {
@@ -248,7 +262,7 @@ func (s *serviceDiscoveryRegistry) 
LoadSubscribeInstances(url *common.URL, notif
                        logger.Infof("Find instance without valid service 
metadata: %s", instance.GetHost())
                        continue
                }
-               metadataInfo, err := event.GetMetadataInfo(instance, revision)
+               metadataInfo, err := GetMetadataInfo(instance, revision)
                if err != nil {
                        return err
                }
@@ -288,7 +302,7 @@ func appendParam(buffer bytes.Buffer, paramKey string, url 
*common.URL) {
        buffer.WriteString(url.GetParam(paramKey, ""))
 }
 
-func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL 
*common.URL, serviceInstances []registry.ServiceInstance) []*common.URL {
+func (s *ServiceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL 
*common.URL, serviceInstances []registry.ServiceInstance) []*common.URL {
        var urls []*common.URL
        for _, syn := range s.subscribedURLsSynthesizers {
                if syn.Support(subscribedURL) {
@@ -302,14 +316,14 @@ func shouldSubscribe(url *common.URL) bool {
        return !shouldRegister(url)
 }
 
-func (s *serviceDiscoveryRegistry) getServices(url *common.URL) *gxset.HashSet 
{
+func (s *ServiceDiscoveryRegistry) getServices(url *common.URL, listener 
registry.MappingListener) *gxset.HashSet {
        services := gxset.NewSet()
        serviceNames := url.GetParam(constant.ProvidedBy, "")
        if len(serviceNames) > 0 {
                services = parseServices(serviceNames)
        }
        if services.Empty() {
-               services = s.findMappedServices(url)
+               services = s.findMappedServices(url, listener)
                if services.Empty() {
                        return s.subscribedServices
                }
@@ -317,15 +331,28 @@ func (s *serviceDiscoveryRegistry) getServices(url 
*common.URL) *gxset.HashSet {
        return services
 }
 
-func (s *serviceDiscoveryRegistry) findMappedServices(url *common.URL) 
*gxset.HashSet {
-       serviceNames, err := s.serviceNameMapping.Get(url)
+func (s *ServiceDiscoveryRegistry) findMappedServices(url *common.URL, 
listener registry.MappingListener) *gxset.HashSet {
+       serviceNames, err := s.serviceNameMapping.Get(url, listener)
        if err != nil {
                logger.Errorf("get service names catch error, url:%s, err:%s ", 
url.String(), err.Error())
                return gxset.NewSet()
        }
+       if listener != nil {
+               protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
+               s.serviceMappingListeners[protocolServiceKey] = listener
+       }
        return serviceNames
 }
 
 var (
        exporting = &atomic.Bool{}
 )
+
+func (s *ServiceDiscoveryRegistry) stopListen(url *common.URL) {
+       protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
+       listener := s.serviceMappingListeners[protocolServiceKey]
+       if listener != nil {
+               delete(s.serviceMappingListeners, protocolServiceKey)
+               listener.Stop()
+       }
+}
diff --git a/registry/event/service_instances_changed_listener_impl.go 
b/registry/servicediscovery/service_instances_changed_listener_impl.go
similarity index 99%
rename from registry/event/service_instances_changed_listener_impl.go
rename to registry/servicediscovery/service_instances_changed_listener_impl.go
index b8de36e77..899b55075 100644
--- a/registry/event/service_instances_changed_listener_impl.go
+++ b/registry/servicediscovery/service_instances_changed_listener_impl.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package event
+package servicediscovery
 
 import (
        "reflect"
diff --git a/registry/servicediscovery/service_mapping_change_listener_impl.go 
b/registry/servicediscovery/service_mapping_change_listener_impl.go
new file mode 100644
index 000000000..5450d81f5
--- /dev/null
+++ b/registry/servicediscovery/service_mapping_change_listener_impl.go
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+       "sync"
+)
+
+import (
+       gxset "github.com/dubbogo/gost/container/set"
+       "github.com/dubbogo/gost/gof/observer"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
+       "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+type ServiceMappingChangedListenerImpl struct {
+       oldServiceNames *gxset.HashSet
+       listener        registry.NotifyListener
+       registryUrl     *common.URL
+       serviceUrl      *common.URL
+       mappingCache    *sync.Map
+       stop            int
+}
+
+const (
+       ServiceMappingListenerStart = iota
+       ServiceMappingListenerStop
+)
+
+func NewMappingListener(registryUrl *common.URL, serviceUrl *common.URL, 
oldServiceNames *gxset.HashSet, listener registry.NotifyListener) 
*ServiceMappingChangedListenerImpl {
+       return &ServiceMappingChangedListenerImpl{
+               listener:        listener,
+               oldServiceNames: oldServiceNames,
+               registryUrl:     registryUrl,
+               serviceUrl:      serviceUrl,
+               stop:            ServiceMappingListenerStart,
+               mappingCache:    &sync.Map{},
+       }
+}
+
+// OnEvent on ServiceMappingChangedEvent the service mapping change event
+func (lstn *ServiceMappingChangedListenerImpl) OnEvent(e observer.Event) error 
{
+       var (
+               err error
+               reg registry.Registry
+       )
+       if lstn.stop == ServiceMappingListenerStop {
+               return nil
+       }
+       sm, ok := e.(*registry.ServiceMappingChangeEvent)
+       if !ok {
+               return nil
+       }
+       newServiceNames := sm.GetServiceNames()
+       oldServiceNames := lstn.oldServiceNames
+       // serviceMapping is orderly
+       if newServiceNames.Empty() || oldServiceNames.String() == 
newServiceNames.String() {
+               return nil
+       }
+       if newServiceNames.Size() > 0 && oldServiceNames.Empty() {
+               if reg, err = extension.GetRegistry(lstn.registryUrl.Protocol, 
lstn.registryUrl); err != nil {
+                       return err
+               }
+               if sdreg, ok := reg.(*ServiceDiscoveryRegistry); ok {
+                       sdreg.SubscribeURL(lstn.serviceUrl, lstn.listener, 
newServiceNames)
+               }
+               lstn.oldServiceNames = newServiceNames
+               return nil
+       }
+       for _, service := range newServiceNames.Values() {
+               if !oldServiceNames.Contains(service) {
+                       lstn.mappingCache.Delete(oldServiceNames.String())
+                       lstn.mappingCache.Store(newServiceNames.String(), 
newServiceNames)
+                       if reg, err = 
extension.GetRegistry(lstn.registryUrl.Protocol, lstn.registryUrl); err != nil {
+                               return err
+                       }
+                       if sdreg, ok := reg.(*ServiceDiscoveryRegistry); ok {
+                               sdreg.SubscribeURL(lstn.serviceUrl, 
lstn.listener, newServiceNames)
+                       }
+                       lstn.oldServiceNames = newServiceNames
+               }
+       }
+       return err
+}
+
+// Stop on ServiceMappingChangedEvent the service mapping change event
+func (lstn *ServiceMappingChangedListenerImpl) Stop() {
+       lstn.stop = ServiceMappingListenerStop
+}

Reply via email to