[ https://issues.apache.org/jira/browse/SCB-159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16315583#comment-16315583 ]
ASF GitHub Bot commented on SCB-159: ------------------------------------ little-cui closed pull request #245: SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently. URL: https://github.com/apache/incubator-servicecomb-service-center/pull/245 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pkg/util/log.go b/pkg/util/log.go index cc8e462a..00de0b8f 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -119,12 +119,12 @@ func getCalleeFuncName() string { for i := 2; i <= 4; i++ { pc, file, _, ok := runtime.Caller(i) - if strings.Index(file, "log.go") > 0 { + if strings.Index(file, "/log.go") > 0 { continue } if ok { - idx := strings.LastIndex(file, "src") + idx := strings.LastIndex(file, "/src/") switch { case idx >= 0: fullName = file[idx+4:] diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go index b1ab8677..7ca82cf2 100644 --- a/server/core/backend/store/cacher.go +++ b/server/core/backend/store/cacher.go @@ -41,6 +41,7 @@ type Cache interface { Version() int64 Data(interface{}) interface{} Have(interface{}) bool + Size() int } type Cacher interface { @@ -65,6 +66,10 @@ func (n *nullCache) Have(interface{}) bool { return false } +func (n *nullCache) Size() int { + return 0 +} + type nullCacher struct { } @@ -128,15 +133,15 @@ func (c *KvCache) Unlock() { if c.size >= l && c.lastMaxSize > c.size*DEFAULT_COMPACT_TIMES && time.Now().Sub(c.lastRefresh) >= DEFAULT_COMPACT_TIMEOUT { - util.Logger().Infof("cache is empty and not in use over %s, compact capacity to size %d->%d", - DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size) + util.Logger().Infof("cache %s is not in use over %s, compact capacity to size %d->%d", + c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size) // gc newCache := make(map[string]*mvccpb.KeyValue, c.size) for k, v := range c.store { newCache[k] = v } c.store = newCache - c.lastMaxSize = c.size + c.lastMaxSize = l c.lastRefresh = time.Now() } c.rwMux.Unlock() @@ -313,7 +318,7 @@ func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event { max = nc } - newStore := make(map[string]*mvccpb.KeyValue) + newStore := make(map[string]*mvccpb.KeyValue, nc) for _, kv := range items { newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv } @@ -532,7 +537,6 @@ func NewKvCache(c *KvCacher, size int) *KvCache { return &KvCache{ owner: c, size: size, - lastMaxSize: size, store: make(map[string]*mvccpb.KeyValue, size), lastRefresh: time.Now(), } diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go index 1c45a54b..7f7bbfae 100644 --- a/server/core/backend/store/defer.go +++ b/server/core/backend/store/defer.go @@ -46,12 +46,7 @@ func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool { } func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool { - kvCache, ok := cache.(*KvCache) - if !ok { - return false - } - - if !iedh.deferMode(kvCache.Size(), len(evts)) { + if !iedh.deferMode(cache.Size(), len(evts)) { return false } diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go index 65349bdf..528922da 100644 --- a/server/core/backend/store/indexer.go +++ b/server/core/backend/store/indexer.go @@ -224,11 +224,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) { } for key := range keysRef { - var childs *[]string = nil - if arr != nil { - childs = &[]string{} - } - n := i.getPrefixKey(childs, key) + n := i.getPrefixKey(arr, key) if n == 0 { count += len(keysRef) if arr != nil { @@ -238,10 +234,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) { } break } - /*count += n - if arr != nil { - *arr = append(*arr, *childs...) - }*/ + count += n } return count } diff --git a/server/govern/service.go b/server/govern/service.go index e059aabd..15e36d11 100644 --- a/server/govern/service.go +++ b/server/govern/service.go @@ -256,7 +256,7 @@ func getSchemaInfoUtil(ctx context.Context, domainProject string, serviceId stri registry.WithStrKey(key), registry.WithPrefix()) if err != nil { - util.Logger().Errorf(err, "Get schema failed,%s") + util.Logger().Errorf(err, "Get schema failed") return make([]*pb.Schema, 0), err } schemas := make([]*pb.Schema, 0, len(resp.Kvs)) diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go index f90eefc1..8a340760 100644 --- a/server/service/notification/listwatcher.go +++ b/server/service/notification/listwatcher.go @@ -19,6 +19,7 @@ package notification import ( "github.com/apache/incubator-servicecomb-service-center/pkg/util" pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "time" ) // 状态变化推送 @@ -64,7 +65,14 @@ func (w *ListWatcher) OnMessage(job NotifyJob) { return } - <-w.listCh + select { + case <-w.listCh: + case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT): + util.Logger().Errorf(nil, + "the %s listwatcher %s %s is not ready[over %s], drop the event %v", + w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job) + return + } if job.(*WatchJob).Revision <= w.ListRevision { util.Logger().Warnf(nil, @@ -76,10 +84,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) { } func (w *ListWatcher) sendMessage(job NotifyJob) { - util.Logger().Debugf("start notify %s watcher %s %s, job is %v, current revision is %v", w.Type(), + util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, current revision is %v", w.Type(), w.Id(), w.Subject(), job, w.ListRevision) defer util.RecoverAndReport() - w.Job <- job + select { + case w.Job <- job: + case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT): + util.Logger().Errorf(nil, + "the %s watcher %s %s event queue is full[over %s], drop the event %v", + w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job) + } } func (w *ListWatcher) Close() { diff --git a/server/service/notification/notification_healthchecker.go b/server/service/notification/notification_healthchecker.go index e3b8b740..310275b7 100644 --- a/server/service/notification/notification_healthchecker.go +++ b/server/service/notification/notification_healthchecker.go @@ -36,8 +36,15 @@ type NotifyServiceHealthCheckJob struct { func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) { j := job.(*NotifyServiceHealthCheckJob) err := j.ErrorSubscriber.Err() - util.Logger().Warnf(err, "notify server remove watcher %s %s", - j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id()) + + if j.ErrorSubscriber.Type() == NOTIFTY { + util.Logger().Errorf(nil, "remove %s watcher %s %s failed, here cause a dead lock", + j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id()) + return + } + + util.Logger().Warnf(err, "notification service remove %s watcher %s %s", + j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id()) s.Service().RemoveSubscriber(j.ErrorSubscriber) } diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go index c76d22d9..e2d78640 100644 --- a/server/service/notification/notification_service.go +++ b/server/service/notification/notification_service.go @@ -73,15 +73,15 @@ func (s *NotifyService) AddSubscriber(n Subscriber) error { sr, ok := ss[n.Subject()] if !ok { - sr = make(subscriberIndex) - ss[n.Subject()] = sr + sr = make(subscriberIndex, DEFAULT_INIT_SUBSCRIBERS) + ss[n.Subject()] = sr // add a subscriber } ns, ok := sr[n.Id()] if !ok { ns = list.New() } - ns.PushBack(n) + ns.PushBack(n) // add a connection sr[n.Id()] = ns n.SetService(s) @@ -170,7 +170,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) { ns, ok := m[job.SubscriberId()] if ok { for n := ns.Front(); n != nil; n = n.Next() { - go n.Value.(Subscriber).OnMessage(job) + n.Value.(Subscriber).OnMessage(job) } } s.mutexes[t].Unlock() @@ -179,7 +179,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) { for key := range m { ns := m[key] for n := ns.Front(); n != nil; n = n.Next() { - go n.Value.(Subscriber).OnMessage(job) + n.Value.(Subscriber).OnMessage(job) } } } @@ -199,12 +199,12 @@ func (s *NotifyService) init() { s.Config.MaxQueue = DEFAULT_MAX_QUEUE } - s.services = make(serviceIndex) + s.services = make(serviceIndex, typeEnd) s.err = make(chan error, 1) - s.queues = make(map[NotifyType]chan NotifyJob) - s.mutexes = make(map[NotifyType]*sync.Mutex) + s.queues = make(map[NotifyType]chan NotifyJob, typeEnd) + s.mutexes = make(map[NotifyType]*sync.Mutex, typeEnd) for i := NotifyType(0); i != typeEnd; i++ { - s.services[i] = make(subscriberSubjectIndex) + s.services[i] = make(subscriberSubjectIndex, DEFAULT_INIT_SUBSCRIBERS) s.queues[i] = make(chan NotifyJob, s.Config.MaxQueue) s.mutexes[i] = &sync.Mutex{} s.waits.Add(1) diff --git a/server/service/notification/struct.go b/server/service/notification/struct.go index be53e6b5..f6564ffa 100644 --- a/server/service/notification/struct.go +++ b/server/service/notification/struct.go @@ -24,8 +24,10 @@ import ( ) const ( - DEFAULT_MAX_QUEUE = 1000 - DEFAULT_TIMEOUT = 30 * time.Second + DEFAULT_MAX_QUEUE = 1000 + DEFAULT_INIT_SUBSCRIBERS = 1000 + DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond + DEFAULT_TIMEOUT = 30 * time.Second NOTIFTY NotifyType = iota INSTANCE @@ -61,6 +63,7 @@ type Subscriber interface { Service() *NotifyService SetService(*NotifyService) OnAccept() + // The event bus will callback this function, so it must be non-blocked. OnMessage(job NotifyJob) Close() } diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go index 77e91f7c..117ce388 100644 --- a/server/service/util/instance_util.go +++ b/server/service/util/instance_util.go @@ -245,7 +245,7 @@ func QueryAllProvidersInstances(ctx context.Context, selfServiceId string) (resu } results = append(results, &pb.WatchInstanceResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "List instance successfully."), - Action: string(pb.EVT_CREATE), + Action: string(pb.EVT_INIT), Key: &pb.MicroServiceKey{ Environment: service.Environment, AppId: service.AppId, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Watcher will get a wrong event sequence when SC modify resource concurrently > ---------------------------------------------------------------------------- > > Key: SCB-159 > URL: https://issues.apache.org/jira/browse/SCB-159 > Project: Apache ServiceComb > Issue Type: Bug > Components: Service-Center > Reporter: little-cui > Fix For: service-center-1.0.0-m1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)