This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 188b9938d add polaris subscribe (#2100)
188b9938d is described below

commit 188b9938dee6524eaf635a0a83c116fe9a043fe1
Author: Jason Deng <[email protected]>
AuthorDate: Fri Nov 11 21:13:52 2022 +0800

    add polaris subscribe (#2100)
    
    * Merge remote-tracking branch 'origin/3.0' into 3.0
    
    * Merge remote-tracking branch 'origin/3.0' into 3.0
    
    * Merge remote-tracking branch 'origin/3.0' into 3.0
    
    * Merge remote-tracking branch 'origin/3.0' into 3.0
---
 registry/polaris/listener.go  | 27 ++++++++++-------
 registry/polaris/registry.go  | 68 ++++++++++++++++++++++++++++++++++++-------
 remoting/polaris/polaris.yaml | 13 ++-------
 3 files changed, 76 insertions(+), 32 deletions(-)

diff --git a/registry/polaris/listener.go b/registry/polaris/listener.go
index c949f292c..49ef6dcdd 100644
--- a/registry/polaris/listener.go
+++ b/registry/polaris/listener.go
@@ -35,32 +35,39 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/config_center"
        "dubbo.apache.org/dubbo-go/v3/registry"
+       "dubbo.apache.org/dubbo-go/v3/remoting"
 )
 
 type polarisListener struct {
-       watcher   *PolarisServiceWatcher
-       listenUrl *common.URL
-       events    *gxchan.UnboundedChan
-       closeCh   chan struct{}
+       watcher *PolarisServiceWatcher
+       events  *gxchan.UnboundedChan
+       closeCh chan struct{}
 }
 
 // NewPolarisListener new polaris listener
-func NewPolarisListener(url *common.URL) (*polarisListener, error) {
+func NewPolarisListener(watcher *PolarisServiceWatcher) (*polarisListener, 
error) {
        listener := &polarisListener{
-               listenUrl: url,
-               events:    gxchan.NewUnboundedChan(32),
-               closeCh:   make(chan struct{}),
+               watcher: watcher,
+               events:  gxchan.NewUnboundedChan(32),
+               closeCh: make(chan struct{}),
        }
-
+       listener.startListen()
        return listener, nil
 }
+func (pl *polarisListener) startListen() {
+       pl.watcher.AddSubscriber(func(et remoting.EventType, ins 
[]model.Instance) {
+               for i := range ins {
+                       pl.events.In() <- 
&config_center.ConfigChangeEvent{Value: generateUrl(ins[i]), ConfigType: et}
+               }
+       })
+}
 
 // Next returns next service event once received
 func (pl *polarisListener) Next() (*registry.ServiceEvent, error) {
        for {
                select {
                case <-pl.closeCh:
-                       logger.Warnf("polaris listener is close!listenUrl:%+v", 
pl.listenUrl)
+                       logger.Warnf("polaris listener is close")
                        return nil, perrors.New("listener stopped")
                case val := <-pl.events.Out():
                        e, _ := val.(*config_center.ConfigChangeEvent)
diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go
index cdfc361da..7b0526a77 100644
--- a/registry/polaris/registry.go
+++ b/registry/polaris/registry.go
@@ -52,12 +52,14 @@ func init() {
 
 // newPolarisRegistry will create new instance
 func newPolarisRegistry(url *common.URL) (registry.Registry, error) {
-       sdkCtx, _, err := polaris.GetPolarisConfig(url)
+       sdkCtx, ns, err := polaris.GetPolarisConfig(url)
        if err != nil {
                return &polarisRegistry{}, err
        }
        pRegistry := &polarisRegistry{
+               namespace:    ns,
                provider:     api.NewProviderAPIByContext(sdkCtx),
+               consumer:     api.NewConsumerAPIByContext(sdkCtx),
                lock:         &sync.RWMutex{},
                registryUrls: make(map[string]*PolarisHeartbeat),
                listenerLock: &sync.RWMutex{},
@@ -67,11 +69,13 @@ func newPolarisRegistry(url *common.URL) 
(registry.Registry, error) {
 }
 
 type polarisRegistry struct {
+       consumer     api.ConsumerAPI
+       namespace    string
        url          *common.URL
        provider     api.ProviderAPI
        lock         *sync.RWMutex
        registryUrls map[string]*PolarisHeartbeat
-
+       watchers     map[string]*PolarisServiceWatcher
        listenerLock *sync.RWMutex
 }
 
@@ -147,33 +151,49 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL) 
error {
 
 // Subscribe returns nil if subscribing registry successfully. If not returns 
an error.
 func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener 
registry.NotifyListener) error {
-       var (
-               newParam    api.WatchServiceRequest
-               newConsumer api.ConsumerAPI
-       )
 
        role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
        if role != common.CONSUMER {
                return nil
        }
+       timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
+       defer timer.Stop()
+
+       req := api.WatchServiceRequest{
+               WatchServiceRequest: model.WatchServiceRequest{
+                       Key: model.ServiceKey{
+                               Service:   common.GetSubscribeName(url),
+                               Namespace: pr.namespace,
+                       },
+               },
+       }
 
        for {
-               listener, err := NewPolarisListener(url)
+               watcher, err := newPolarisWatcher(&req, pr.consumer)
+
+               if err != nil {
+                       logger.Warnf("getwatcher() = err:%v", 
perrors.WithStack(err))
+                       <-timer.C
+                       timer.Reset(time.Duration(RegistryConnDelay) * 
time.Second)
+                       continue
+               }
+               listener, err := NewPolarisListener(watcher)
+
                if err != nil {
                        logger.Warnf("getListener() = err:%v", 
perrors.WithStack(err))
-                       <-time.After(time.Duration(RegistryConnDelay) * 
time.Second)
+                       <-timer.C
+                       timer.Reset(time.Duration(RegistryConnDelay) * 
time.Second)
                        continue
                }
 
-               watcher, err := newPolarisWatcher(&newParam, newConsumer)
                if err != nil {
                        logger.Warnf("getwatcher() = err:%v", 
perrors.WithStack(err))
                        timer := time.NewTimer(time.Duration(RegistryConnDelay) 
* time.Second)
                        timer.Reset(time.Duration(RegistryConnDelay) * 
time.Second)
                        continue
                }
-               for {
 
+               for {
                        serviceEvent, err := listener.Next()
 
                        if err != nil {
@@ -199,6 +219,31 @@ func (pr *polarisRegistry) GetURL() *common.URL {
        return pr.url
 }
 
+func (pr *polarisRegistry) createPolarisWatcher(serviceName string) 
(*PolarisServiceWatcher, error) {
+
+       pr.listenerLock.Lock()
+       defer pr.listenerLock.Unlock()
+
+       if _, exist := pr.watchers[serviceName]; !exist {
+               subscribeParam := &api.WatchServiceRequest{
+                       WatchServiceRequest: model.WatchServiceRequest{
+                               Key: model.ServiceKey{
+                                       Namespace: pr.namespace,
+                                       Service:   serviceName,
+                               },
+                       },
+               }
+
+               watcher, err := newPolarisWatcher(subscribeParam, pr.consumer)
+               if err != nil {
+                       return nil, err
+               }
+               pr.watchers[serviceName] = watcher
+       }
+
+       return pr.watchers[serviceName], nil
+}
+
 // Destroy stop polaris registry.
 func (pr *polarisRegistry) Destroy() {
        for _, val := range pr.registryUrls {
@@ -218,7 +263,8 @@ func (pr *polarisRegistry) IsAvailable() bool {
 }
 
 // doHeartbeat Since polaris does not support automatic reporting of instance 
heartbeats, separate logic is
-//  needed to implement it
+//
+//     needed to implement it
 func (pr *polarisRegistry) doHeartbeat(ctx context.Context, ins 
*api.InstanceRegisterRequest) {
        ticker := time.NewTicker(time.Duration(4) * time.Second)
 
diff --git a/remoting/polaris/polaris.yaml b/remoting/polaris/polaris.yaml
index 98741f216..3d794ec55 100644
--- a/remoting/polaris/polaris.yaml
+++ b/remoting/polaris/polaris.yaml
@@ -31,16 +31,7 @@ global:
       grpc:
         maxCallRecvMsgSize: 52428800
   statReporter:
-    enable: true
-    chain:
-      - stat2Monitor
-      - serviceCache
-    plugin:
-      stat2Monitor:
-        metricsReportWindow: 1m
-        metricsNumBuckets: 12
-      serviceCache:
-        reportInterval: 3m
+    enable: false
 consumer:
   localCache:
     type: inmemory
@@ -92,4 +83,4 @@ consumer:
     type: subscribeLocalChannel
     plugin:
       subscribeLocalChannel:
-        channelBufferSize: 50
+        channelBufferSize: 50
\ No newline at end of file

Reply via email to