[ 
https://issues.apache.org/jira/browse/SCB-159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16315583#comment-16315583
 ] 

ASF GitHub Bot commented on SCB-159:
------------------------------------

little-cui closed pull request #245: SCB-159 Watcher will get a wrong event 
sequence when SC modify resource concurrently.
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/245
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pkg/util/log.go b/pkg/util/log.go
index cc8e462a..00de0b8f 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -119,12 +119,12 @@ func getCalleeFuncName() string {
        for i := 2; i <= 4; i++ {
                pc, file, _, ok := runtime.Caller(i)
 
-               if strings.Index(file, "log.go") > 0 {
+               if strings.Index(file, "/log.go") > 0 {
                        continue
                }
 
                if ok {
-                       idx := strings.LastIndex(file, "src")
+                       idx := strings.LastIndex(file, "/src/")
                        switch {
                        case idx >= 0:
                                fullName = file[idx+4:]
diff --git a/server/core/backend/store/cacher.go 
b/server/core/backend/store/cacher.go
index b1ab8677..7ca82cf2 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -41,6 +41,7 @@ type Cache interface {
        Version() int64
        Data(interface{}) interface{}
        Have(interface{}) bool
+       Size() int
 }
 
 type Cacher interface {
@@ -65,6 +66,10 @@ func (n *nullCache) Have(interface{}) bool {
        return false
 }
 
+func (n *nullCache) Size() int {
+       return 0
+}
+
 type nullCacher struct {
 }
 
@@ -128,15 +133,15 @@ func (c *KvCache) Unlock() {
        if c.size >= l &&
                c.lastMaxSize > c.size*DEFAULT_COMPACT_TIMES &&
                time.Now().Sub(c.lastRefresh) >= DEFAULT_COMPACT_TIMEOUT {
-               util.Logger().Infof("cache is empty and not in use over %s, 
compact capacity to size %d->%d",
-                       DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
+               util.Logger().Infof("cache %s is not in use over %s, compact 
capacity to size %d->%d",
+                       c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, 
c.lastMaxSize, c.size)
                // gc
                newCache := make(map[string]*mvccpb.KeyValue, c.size)
                for k, v := range c.store {
                        newCache[k] = v
                }
                c.store = newCache
-               c.lastMaxSize = c.size
+               c.lastMaxSize = l
                c.lastRefresh = time.Now()
        }
        c.rwMux.Unlock()
@@ -313,7 +318,7 @@ func (c *KvCacher) filter(rev int64, items 
[]*mvccpb.KeyValue) []*Event {
                max = nc
        }
 
-       newStore := make(map[string]*mvccpb.KeyValue)
+       newStore := make(map[string]*mvccpb.KeyValue, nc)
        for _, kv := range items {
                newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
        }
@@ -532,7 +537,6 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
        return &KvCache{
                owner:       c,
                size:        size,
-               lastMaxSize: size,
                store:       make(map[string]*mvccpb.KeyValue, size),
                lastRefresh: time.Now(),
        }
diff --git a/server/core/backend/store/defer.go 
b/server/core/backend/store/defer.go
index 1c45a54b..7f7bbfae 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -46,12 +46,7 @@ func (iedh *InstanceEventDeferHandler) deferMode(total int, 
del int) bool {
 }
 
 func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) 
bool {
-       kvCache, ok := cache.(*KvCache)
-       if !ok {
-               return false
-       }
-
-       if !iedh.deferMode(kvCache.Size(), len(evts)) {
+       if !iedh.deferMode(cache.Size(), len(evts)) {
                return false
        }
 
diff --git a/server/core/backend/store/indexer.go 
b/server/core/backend/store/indexer.go
index 65349bdf..528922da 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -224,11 +224,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix 
string) (count int) {
        }
 
        for key := range keysRef {
-               var childs *[]string = nil
-               if arr != nil {
-                       childs = &[]string{}
-               }
-               n := i.getPrefixKey(childs, key)
+               n := i.getPrefixKey(arr, key)
                if n == 0 {
                        count += len(keysRef)
                        if arr != nil {
@@ -238,10 +234,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix 
string) (count int) {
                        }
                        break
                }
-               /*count += n
-               if arr != nil {
-                       *arr = append(*arr, *childs...)
-               }*/
+               count += n
        }
        return count
 }
diff --git a/server/govern/service.go b/server/govern/service.go
index e059aabd..15e36d11 100644
--- a/server/govern/service.go
+++ b/server/govern/service.go
@@ -256,7 +256,7 @@ func getSchemaInfoUtil(ctx context.Context, domainProject 
string, serviceId stri
                registry.WithStrKey(key),
                registry.WithPrefix())
        if err != nil {
-               util.Logger().Errorf(err, "Get schema failed,%s")
+               util.Logger().Errorf(err, "Get schema failed")
                return make([]*pb.Schema, 0), err
        }
        schemas := make([]*pb.Schema, 0, len(resp.Kvs))
diff --git a/server/service/notification/listwatcher.go 
b/server/service/notification/listwatcher.go
index f90eefc1..8a340760 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
 import (
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "time"
 )
 
 // 状态变化推送
@@ -64,7 +65,14 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
                return
        }
 
-       <-w.listCh
+       select {
+       case <-w.listCh:
+       case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+               util.Logger().Errorf(nil,
+                       "the %s listwatcher %s %s is not ready[over %s], drop 
the event %v",
+                       w.Type(), w.Id(), w.Subject(), 
DEFAULT_ON_MESSAGE_TIMEOUT, job)
+               return
+       }
 
        if job.(*WatchJob).Revision <= w.ListRevision {
                util.Logger().Warnf(nil,
@@ -76,10 +84,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
 }
 
 func (w *ListWatcher) sendMessage(job NotifyJob) {
-       util.Logger().Debugf("start notify %s watcher %s %s, job is %v, current 
revision is %v", w.Type(),
+       util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, 
current revision is %v", w.Type(),
                w.Id(), w.Subject(), job, w.ListRevision)
        defer util.RecoverAndReport()
-       w.Job <- job
+       select {
+       case w.Job <- job:
+       case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+               util.Logger().Errorf(nil,
+                       "the %s watcher %s %s event queue is full[over %s], 
drop the event %v",
+                       w.Type(), w.Id(), w.Subject(), 
DEFAULT_ON_MESSAGE_TIMEOUT, job)
+       }
 }
 
 func (w *ListWatcher) Close() {
diff --git a/server/service/notification/notification_healthchecker.go 
b/server/service/notification/notification_healthchecker.go
index e3b8b740..310275b7 100644
--- a/server/service/notification/notification_healthchecker.go
+++ b/server/service/notification/notification_healthchecker.go
@@ -36,8 +36,15 @@ type NotifyServiceHealthCheckJob struct {
 func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) {
        j := job.(*NotifyServiceHealthCheckJob)
        err := j.ErrorSubscriber.Err()
-       util.Logger().Warnf(err, "notify server remove watcher %s %s",
-               j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
+
+       if j.ErrorSubscriber.Type() == NOTIFTY {
+               util.Logger().Errorf(nil, "remove %s watcher %s %s failed, here 
cause a dead lock",
+                       j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), 
j.ErrorSubscriber.Id())
+               return
+       }
+
+       util.Logger().Warnf(err, "notification service remove %s watcher %s %s",
+               j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), 
j.ErrorSubscriber.Id())
        s.Service().RemoveSubscriber(j.ErrorSubscriber)
 }
 
diff --git a/server/service/notification/notification_service.go 
b/server/service/notification/notification_service.go
index c76d22d9..e2d78640 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -73,15 +73,15 @@ func (s *NotifyService) AddSubscriber(n Subscriber) error {
 
        sr, ok := ss[n.Subject()]
        if !ok {
-               sr = make(subscriberIndex)
-               ss[n.Subject()] = sr
+               sr = make(subscriberIndex, DEFAULT_INIT_SUBSCRIBERS)
+               ss[n.Subject()] = sr // add a subscriber
        }
 
        ns, ok := sr[n.Id()]
        if !ok {
                ns = list.New()
        }
-       ns.PushBack(n)
+       ns.PushBack(n) // add a connection
        sr[n.Id()] = ns
 
        n.SetService(s)
@@ -170,7 +170,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
                                ns, ok := m[job.SubscriberId()]
                                if ok {
                                        for n := ns.Front(); n != nil; n = 
n.Next() {
-                                               go 
n.Value.(Subscriber).OnMessage(job)
+                                               
n.Value.(Subscriber).OnMessage(job)
                                        }
                                }
                                s.mutexes[t].Unlock()
@@ -179,7 +179,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
                        for key := range m {
                                ns := m[key]
                                for n := ns.Front(); n != nil; n = n.Next() {
-                                       go n.Value.(Subscriber).OnMessage(job)
+                                       n.Value.(Subscriber).OnMessage(job)
                                }
                        }
                }
@@ -199,12 +199,12 @@ func (s *NotifyService) init() {
                s.Config.MaxQueue = DEFAULT_MAX_QUEUE
        }
 
-       s.services = make(serviceIndex)
+       s.services = make(serviceIndex, typeEnd)
        s.err = make(chan error, 1)
-       s.queues = make(map[NotifyType]chan NotifyJob)
-       s.mutexes = make(map[NotifyType]*sync.Mutex)
+       s.queues = make(map[NotifyType]chan NotifyJob, typeEnd)
+       s.mutexes = make(map[NotifyType]*sync.Mutex, typeEnd)
        for i := NotifyType(0); i != typeEnd; i++ {
-               s.services[i] = make(subscriberSubjectIndex)
+               s.services[i] = make(subscriberSubjectIndex, 
DEFAULT_INIT_SUBSCRIBERS)
                s.queues[i] = make(chan NotifyJob, s.Config.MaxQueue)
                s.mutexes[i] = &sync.Mutex{}
                s.waits.Add(1)
diff --git a/server/service/notification/struct.go 
b/server/service/notification/struct.go
index be53e6b5..f6564ffa 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/struct.go
@@ -24,8 +24,10 @@ import (
 )
 
 const (
-       DEFAULT_MAX_QUEUE = 1000
-       DEFAULT_TIMEOUT   = 30 * time.Second
+       DEFAULT_MAX_QUEUE          = 1000
+       DEFAULT_INIT_SUBSCRIBERS   = 1000
+       DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+       DEFAULT_TIMEOUT            = 30 * time.Second
 
        NOTIFTY NotifyType = iota
        INSTANCE
@@ -61,6 +63,7 @@ type Subscriber interface {
        Service() *NotifyService
        SetService(*NotifyService)
        OnAccept()
+       // The event bus will callback this function, so it must be non-blocked.
        OnMessage(job NotifyJob)
        Close()
 }
diff --git a/server/service/util/instance_util.go 
b/server/service/util/instance_util.go
index 77e91f7c..117ce388 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -245,7 +245,7 @@ func QueryAllProvidersInstances(ctx context.Context, 
selfServiceId string) (resu
                        }
                        results = append(results, &pb.WatchInstanceResponse{
                                Response: 
pb.CreateResponse(pb.Response_SUCCESS, "List instance successfully."),
-                               Action:   string(pb.EVT_CREATE),
+                               Action:   string(pb.EVT_INIT),
                                Key: &pb.MicroServiceKey{
                                        Environment: service.Environment,
                                        AppId:       service.AppId,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Watcher will get a wrong event sequence when SC modify resource concurrently
> ----------------------------------------------------------------------------
>
>                 Key: SCB-159
>                 URL: https://issues.apache.org/jira/browse/SCB-159
>             Project: Apache ServiceComb
>          Issue Type: Bug
>          Components: Service-Center
>            Reporter: little-cui
>             Fix For: service-center-1.0.0-m1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to