AlexStocks commented on a change in pull request #708:
URL: https://github.com/apache/dubbo-go/pull/708#discussion_r471106948
##########
File path: cluster/router/chain/chain.go
##########
@@ -79,6 +105,104 @@ func (c *RouterChain) AddRouters(routers
[]router.PriorityRouter) {
c.routers = newRouters
}
+// SetInvokers receives updated invokers from registry center. If the times of
notification exceeds countThreshold and
+// time interval exceeds timeThreshold since last cache update, then notify to
update the cache.
+func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
+ c.mutex.Lock()
+ c.invokers = invokers
+ c.mutex.Unlock()
+
+ c.count++
+ now := time.Now()
+ if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
+ c.last = now
+ c.count = 0
+ go func() {
+ c.notify <- struct{}{}
+ }()
+ }
+}
+
+// loop listens on events to update the address cache when it's necessary,
either when it receives notification
+// from address update, or when timeInterval exceeds.
+func (c *RouterChain) loop() {
+ for {
+ select {
+ case <-time.Tick(timeInterval):
Review comment:
the right way to use `time.Tick` is as follows,
```Go
ch :=time.Tick(3*time.Second)
for {
select {
case <-ch:
}
}
```
FROM the time.Tick doc https://godoc.org/time#Tick
```
Tick is a convenience wrapper for NewTicker providing access to the ticking
channel only. While Tick is useful for clients that have no need to shut down
the Ticker, be aware that without a way to shut it down the underlying Ticker
cannot be recovered by the garbage collector; it "leaks".
```
as the doc, the time.Tick return value can not be collected by gc. So your
codes will cause many "leaks".
##########
File path: cluster/router/chain/chain.go
##########
@@ -79,6 +105,104 @@ func (c *RouterChain) AddRouters(routers
[]router.PriorityRouter) {
c.routers = newRouters
}
+// SetInvokers receives updated invokers from registry center. If the times of
notification exceeds countThreshold and
+// time interval exceeds timeThreshold since last cache update, then notify to
update the cache.
+func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
+ c.mutex.Lock()
+ c.invokers = invokers
+ c.mutex.Unlock()
+
+ c.count++
+ now := time.Now()
+ if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
+ c.last = now
+ c.count = 0
+ go func() {
+ c.notify <- struct{}{}
+ }()
+ }
+}
+
+// loop listens on events to update the address cache when it's necessary,
either when it receives notification
+// from address update, or when timeInterval exceeds.
+func (c *RouterChain) loop() {
+ for {
+ select {
+ case <-time.Tick(timeInterval):
+ c.buildCache()
+ case <-c.notify:
+ c.buildCache()
+ }
+ }
+}
+
+// copyRouters make a snapshot copy from RouterChain's router list.
+func (c *RouterChain) copyRouters() []router.PriorityRouter {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ ret := copySlice(c.routers)
+ return ret.([]router.PriorityRouter)
+}
+
+// copyInvokers copies a snapshot of the received invokers.
+func (c *RouterChain) copyInvokers() []protocol.Invoker {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ if c.invokers == nil || len(c.invokers) == 0 {
+ return nil
+ }
+ ret := copySlice(c.invokers)
+ return ret.([]protocol.Invoker)
+}
+
+func copySlice(s interface{}) interface{} {
+ t, v := reflect.TypeOf(s), reflect.ValueOf(s)
+ c := reflect.MakeSlice(t, v.Len(), v.Len())
+ reflect.Copy(c, v)
+ return c.Interface()
+}
+
+// loadCache loads cache from sync.Value to guarantee the visibility
+func (c *RouterChain) loadCache() *InvokerCache {
+ v := c.cache.Load()
+ if v == nil {
+ return nil
+ }
+
+ return v.(*InvokerCache)
+}
+
+// buildCache builds address cache with the new invokers for all poolable
routers.
+func (c *RouterChain) buildCache() {
+ invokers := c.copyInvokers()
+ if invokers == nil || len(c.invokers) == 0 {
Review comment:
pls delete the condition `len(c.invokers) == 0` because u have check it
in c.copyInvokers().
##########
File path: cluster/router/chain/chain.go
##########
@@ -79,6 +105,104 @@ func (c *RouterChain) AddRouters(routers
[]router.PriorityRouter) {
c.routers = newRouters
}
+// SetInvokers receives updated invokers from registry center. If the times of
notification exceeds countThreshold and
+// time interval exceeds timeThreshold since last cache update, then notify to
update the cache.
+func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
+ c.mutex.Lock()
+ c.invokers = invokers
+ c.mutex.Unlock()
+
+ c.count++
+ now := time.Now()
+ if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
+ c.last = now
+ c.count = 0
+ go func() {
+ c.notify <- struct{}{}
+ }()
+ }
+}
+
+// loop listens on events to update the address cache when it's necessary,
either when it receives notification
+// from address update, or when timeInterval exceeds.
+func (c *RouterChain) loop() {
+ for {
+ select {
+ case <-time.Tick(timeInterval):
+ c.buildCache()
+ case <-c.notify:
+ c.buildCache()
+ }
+ }
+}
+
+// copyRouters make a snapshot copy from RouterChain's router list.
+func (c *RouterChain) copyRouters() []router.PriorityRouter {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ ret := copySlice(c.routers)
+ return ret.([]router.PriorityRouter)
+}
+
+// copyInvokers copies a snapshot of the received invokers.
+func (c *RouterChain) copyInvokers() []protocol.Invoker {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ if c.invokers == nil || len(c.invokers) == 0 {
+ return nil
+ }
+ ret := copySlice(c.invokers)
+ return ret.([]protocol.Invoker)
+}
+
+func copySlice(s interface{}) interface{} {
Review comment:
delete this func.
##########
File path: cluster/router/chain/chain.go
##########
@@ -109,14 +233,62 @@ func NewRouterChain(url *common.URL) (*RouterChain,
error) {
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
+ last: time.Now(),
+ notify: make(chan struct{}),
}
if url != nil {
chain.url = *url
}
+ go chain.loop()
return chain, nil
}
+// poolRouter calls poolable router's Pool() to create new address pool and
address metadata if necessary.
+// If the corresponding cache entry exists, and the poolable router answers no
need to re-pool (possibly because its
+// rule doesn't change), and the address list doesn't change, then the
existing data will be re-used.
+func poolRouter(p router.Poolable, origin *InvokerCache, invokers
[]protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
+ name := p.Name()
+ if isCacheMiss(origin, name) || p.ShouldPool() ||
isInvokersChanged(origin.invokers, invokers) {
+ logger.Debugf("build address cache for router %q", name)
+ return p.Pool(invokers)
+ }
+
+ logger.Debugf("reuse existing address cache for router %q", name)
+ return origin.pools[name], origin.metadatas[name]
+}
+
+// isCacheMiss checks if the corresponding cache entry for a poolable router
has already existed.
+// False returns when the cache is nil, or cache's pool is nil, or cache's
invokers snapshot is nil, or the entry
+// doesn't exist.
+func isCacheMiss(cache *InvokerCache, key string) bool {
+ if cache == nil || cache.pools == nil || cache.invokers == nil ||
cache.pools[key] == nil {
Review comment:
u should lock cache.pools/cache.invokers/cache.pools
##########
File path: cluster/router/healthcheck/health_check_route.go
##########
@@ -51,25 +58,48 @@ func NewHealthCheckRouter(url *common.URL)
(router.PriorityRouter, error) {
}
// Route gets a list of healthy invoker
-func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url
*common.URL, invocation protocol.Invocation) []protocol.Invoker {
+func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache
router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !r.enabled {
return invokers
}
- healthyInvokers := make([]protocol.Invoker, 0, len(invokers))
+
+ addrPool := cache.FindAddrPool(r)
// Add healthy invoker to the list
- for _, invoker := range invokers {
- if r.checker.IsHealthy(invoker) {
- healthyInvokers = append(healthyInvokers, invoker)
- }
- }
- // If all Invoke are considered unhealthy, downgrade to all inovker
- if len(healthyInvokers) == 0 {
+ healthyInvokers := utils.JoinIfNotEqual(addrPool[healthy], invokers)
+ // If all Invoke are considered unhealthy, downgrade to all invoker
Review comment:
'all Invoke' --> 'all invokers'?
##########
File path: cluster/router/chain/chain.go
##########
@@ -79,6 +105,104 @@ func (c *RouterChain) AddRouters(routers
[]router.PriorityRouter) {
c.routers = newRouters
}
+// SetInvokers receives updated invokers from registry center. If the times of
notification exceeds countThreshold and
+// time interval exceeds timeThreshold since last cache update, then notify to
update the cache.
+func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
+ c.mutex.Lock()
+ c.invokers = invokers
+ c.mutex.Unlock()
+
+ c.count++
+ now := time.Now()
+ if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
+ c.last = now
+ c.count = 0
+ go func() {
+ c.notify <- struct{}{}
+ }()
+ }
+}
+
+// loop listens on events to update the address cache when it's necessary,
either when it receives notification
+// from address update, or when timeInterval exceeds.
+func (c *RouterChain) loop() {
+ for {
+ select {
+ case <-time.Tick(timeInterval):
+ c.buildCache()
+ case <-c.notify:
+ c.buildCache()
+ }
+ }
+}
+
+// copyRouters make a snapshot copy from RouterChain's router list.
+func (c *RouterChain) copyRouters() []router.PriorityRouter {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ ret := copySlice(c.routers)
+ return ret.([]router.PriorityRouter)
+}
+
+// copyInvokers copies a snapshot of the received invokers.
+func (c *RouterChain) copyInvokers() []protocol.Invoker {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ if c.invokers == nil || len(c.invokers) == 0 {
+ return nil
+ }
+ ret := copySlice(c.invokers)
Review comment:
As u know the type of `c.invokers`, pls rewrite the codes as follows,
```Go
ret := make([]protocol.Invoker, 0, len(c.invokers))
ret = append(ret, c.invokers...)
return ret
```
Pls remember that do not use reflect as best as u can.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]