This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 188b9938d add polaris subscribe (#2100)
188b9938d is described below
commit 188b9938dee6524eaf635a0a83c116fe9a043fe1
Author: Jason Deng <[email protected]>
AuthorDate: Fri Nov 11 21:13:52 2022 +0800
add polaris subscribe (#2100)
* Merge remote-tracking branch 'origin/3.0' into 3.0
* Merge remote-tracking branch 'origin/3.0' into 3.0
* Merge remote-tracking branch 'origin/3.0' into 3.0
* Merge remote-tracking branch 'origin/3.0' into 3.0
---
registry/polaris/listener.go | 27 ++++++++++-------
registry/polaris/registry.go | 68 ++++++++++++++++++++++++++++++++++++-------
remoting/polaris/polaris.yaml | 13 ++-------
3 files changed, 76 insertions(+), 32 deletions(-)
diff --git a/registry/polaris/listener.go b/registry/polaris/listener.go
index c949f292c..49ef6dcdd 100644
--- a/registry/polaris/listener.go
+++ b/registry/polaris/listener.go
@@ -35,32 +35,39 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
)
type polarisListener struct {
- watcher *PolarisServiceWatcher
- listenUrl *common.URL
- events *gxchan.UnboundedChan
- closeCh chan struct{}
+ watcher *PolarisServiceWatcher
+ events *gxchan.UnboundedChan
+ closeCh chan struct{}
}
// NewPolarisListener new polaris listener
-func NewPolarisListener(url *common.URL) (*polarisListener, error) {
+func NewPolarisListener(watcher *PolarisServiceWatcher) (*polarisListener,
error) {
listener := &polarisListener{
- listenUrl: url,
- events: gxchan.NewUnboundedChan(32),
- closeCh: make(chan struct{}),
+ watcher: watcher,
+ events: gxchan.NewUnboundedChan(32),
+ closeCh: make(chan struct{}),
}
-
+ listener.startListen()
return listener, nil
}
+func (pl *polarisListener) startListen() {
+ pl.watcher.AddSubscriber(func(et remoting.EventType, ins
[]model.Instance) {
+ for i := range ins {
+ pl.events.In() <-
&config_center.ConfigChangeEvent{Value: generateUrl(ins[i]), ConfigType: et}
+ }
+ })
+}
// Next returns next service event once received
func (pl *polarisListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-pl.closeCh:
- logger.Warnf("polaris listener is close!listenUrl:%+v",
pl.listenUrl)
+ logger.Warnf("polaris listener is close")
return nil, perrors.New("listener stopped")
case val := <-pl.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go
index cdfc361da..7b0526a77 100644
--- a/registry/polaris/registry.go
+++ b/registry/polaris/registry.go
@@ -52,12 +52,14 @@ func init() {
// newPolarisRegistry will create new instance
func newPolarisRegistry(url *common.URL) (registry.Registry, error) {
- sdkCtx, _, err := polaris.GetPolarisConfig(url)
+ sdkCtx, ns, err := polaris.GetPolarisConfig(url)
if err != nil {
return &polarisRegistry{}, err
}
pRegistry := &polarisRegistry{
+ namespace: ns,
provider: api.NewProviderAPIByContext(sdkCtx),
+ consumer: api.NewConsumerAPIByContext(sdkCtx),
lock: &sync.RWMutex{},
registryUrls: make(map[string]*PolarisHeartbeat),
listenerLock: &sync.RWMutex{},
@@ -67,11 +69,13 @@ func newPolarisRegistry(url *common.URL)
(registry.Registry, error) {
}
type polarisRegistry struct {
+ consumer api.ConsumerAPI
+ namespace string
url *common.URL
provider api.ProviderAPI
lock *sync.RWMutex
registryUrls map[string]*PolarisHeartbeat
-
+ watchers map[string]*PolarisServiceWatcher
listenerLock *sync.RWMutex
}
@@ -147,33 +151,49 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL)
error {
// Subscribe returns nil if subscribing registry successfully. If not returns
an error.
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener
registry.NotifyListener) error {
- var (
- newParam api.WatchServiceRequest
- newConsumer api.ConsumerAPI
- )
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
}
+ timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
+ defer timer.Stop()
+
+ req := api.WatchServiceRequest{
+ WatchServiceRequest: model.WatchServiceRequest{
+ Key: model.ServiceKey{
+ Service: common.GetSubscribeName(url),
+ Namespace: pr.namespace,
+ },
+ },
+ }
for {
- listener, err := NewPolarisListener(url)
+ watcher, err := newPolarisWatcher(&req, pr.consumer)
+
+ if err != nil {
+ logger.Warnf("getwatcher() = err:%v",
perrors.WithStack(err))
+ <-timer.C
+ timer.Reset(time.Duration(RegistryConnDelay) *
time.Second)
+ continue
+ }
+ listener, err := NewPolarisListener(watcher)
+
if err != nil {
logger.Warnf("getListener() = err:%v",
perrors.WithStack(err))
- <-time.After(time.Duration(RegistryConnDelay) *
time.Second)
+ <-timer.C
+ timer.Reset(time.Duration(RegistryConnDelay) *
time.Second)
continue
}
- watcher, err := newPolarisWatcher(&newParam, newConsumer)
if err != nil {
logger.Warnf("getwatcher() = err:%v",
perrors.WithStack(err))
timer := time.NewTimer(time.Duration(RegistryConnDelay)
* time.Second)
timer.Reset(time.Duration(RegistryConnDelay) *
time.Second)
continue
}
- for {
+ for {
serviceEvent, err := listener.Next()
if err != nil {
@@ -199,6 +219,31 @@ func (pr *polarisRegistry) GetURL() *common.URL {
return pr.url
}
+func (pr *polarisRegistry) createPolarisWatcher(serviceName string)
(*PolarisServiceWatcher, error) {
+
+ pr.listenerLock.Lock()
+ defer pr.listenerLock.Unlock()
+
+ if _, exist := pr.watchers[serviceName]; !exist {
+ subscribeParam := &api.WatchServiceRequest{
+ WatchServiceRequest: model.WatchServiceRequest{
+ Key: model.ServiceKey{
+ Namespace: pr.namespace,
+ Service: serviceName,
+ },
+ },
+ }
+
+ watcher, err := newPolarisWatcher(subscribeParam, pr.consumer)
+ if err != nil {
+ return nil, err
+ }
+ pr.watchers[serviceName] = watcher
+ }
+
+ return pr.watchers[serviceName], nil
+}
+
// Destroy stop polaris registry.
func (pr *polarisRegistry) Destroy() {
for _, val := range pr.registryUrls {
@@ -218,7 +263,8 @@ func (pr *polarisRegistry) IsAvailable() bool {
}
// doHeartbeat Since polaris does not support automatic reporting of instance
heartbeats, separate logic is
-// needed to implement it
+//
+// needed to implement it
func (pr *polarisRegistry) doHeartbeat(ctx context.Context, ins
*api.InstanceRegisterRequest) {
ticker := time.NewTicker(time.Duration(4) * time.Second)
diff --git a/remoting/polaris/polaris.yaml b/remoting/polaris/polaris.yaml
index 98741f216..3d794ec55 100644
--- a/remoting/polaris/polaris.yaml
+++ b/remoting/polaris/polaris.yaml
@@ -31,16 +31,7 @@ global:
grpc:
maxCallRecvMsgSize: 52428800
statReporter:
- enable: true
- chain:
- - stat2Monitor
- - serviceCache
- plugin:
- stat2Monitor:
- metricsReportWindow: 1m
- metricsNumBuckets: 12
- serviceCache:
- reportInterval: 3m
+ enable: false
consumer:
localCache:
type: inmemory
@@ -92,4 +83,4 @@ consumer:
type: subscribeLocalChannel
plugin:
subscribeLocalChannel:
- channelBufferSize: 50
+ channelBufferSize: 50
\ No newline at end of file