[ 
https://issues.apache.org/jira/browse/SCB-993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16680690#comment-16680690
 ] 

ASF GitHub Bot commented on SCB-993:
------------------------------------

little-cui closed pull request #481: SCB-993 print warning log when aggregate 
conflict
URL: https://github.com/apache/servicecomb-service-center/pull/481
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pkg/client/sc/client.go b/pkg/client/sc/client.go
index 38ab11bd..18ee9db2 100644
--- a/pkg/client/sc/client.go
+++ b/pkg/client/sc/client.go
@@ -24,18 +24,18 @@ func NewSCClient(cfg Config) (*SCClient, error) {
        if err != nil {
                return nil, err
        }
-       return &SCClient{LBClient: client, Token: cfg.Token}, nil
+       return &SCClient{LBClient: client, Cfg: cfg}, nil
 }
 
 type SCClient struct {
        *LBClient
-       Token string
+       Cfg Config
 }
 
 func (c *SCClient) CommonHeaders() http.Header {
        var headers = make(http.Header)
-       if len(c.Token) > 0 {
-               headers.Set("X-Auth-Token", c.Token)
+       if len(c.Cfg.Token) > 0 {
+               headers.Set("X-Auth-Token", c.Cfg.Token)
        }
        return headers
 }
diff --git a/pkg/client/sc/config.go b/pkg/client/sc/config.go
index f78845b2..9cbc0c60 100644
--- a/pkg/client/sc/config.go
+++ b/pkg/client/sc/config.go
@@ -22,8 +22,11 @@ import (
        "time"
 )
 
+const defaultRequestTimeout = 10 * time.Second
+
 type Config struct {
        rest.URLClientOption
+       Name      string
        Endpoints []string
        // TODO Expandable header not only token header
        Token          string
@@ -38,7 +41,7 @@ func (cfg *Config) Merge() rest.URLClientOption {
        }
        cfg.SSLEnabled = ssl
        if cfg.RequestTimeout == 0 {
-               cfg.RequestTimeout = 10 * time.Second
+               cfg.RequestTimeout = defaultRequestTimeout
        }
        return cfg.URLClientOption
 }
diff --git a/server/admin/model/dump.go b/server/admin/model/dump.go
index 461be7ec..6165c53b 100644
--- a/server/admin/model/dump.go
+++ b/server/admin/model/dump.go
@@ -161,9 +161,10 @@ type Cache struct {
 }
 
 type KV struct {
-       Key   string      `json:"key"`
-       Rev   int64       `json:"rev"`
-       Value interface{} `json:"-"`
+       Key         string      `json:"key"`
+       Rev         int64       `json:"rev"`
+       Value       interface{} `json:"-"`
+       ClusterName string      `json:"cluster"`
 }
 
 type Microservice struct {
diff --git a/server/admin/service.go b/server/admin/service.go
index 6255f382..90c6d9e3 100644
--- a/server/admin/service.go
+++ b/server/admin/service.go
@@ -95,13 +95,13 @@ func (service *AdminService) dumpAll(ctx context.Context, 
cache *model.Cache) {
 }
 
 func setValue(e discovery.Adaptor, setter model.Setter) {
-       exists := make(map[string]struct{})
        e.Cache().ForEach(func(k string, kv *discovery.KeyValue) (next bool) {
-               if _, ok := exists[k]; ok {
-                       return true
-               }
-               exists[k] = struct{}{}
-               setter.SetValue(&model.KV{Key: k, Rev: kv.ModRevision, Value: 
kv.Value})
+               setter.SetValue(&model.KV{
+                       Key:         k,
+                       Rev:         kv.ModRevision,
+                       Value:       kv.Value,
+                       ClusterName: kv.ClusterName,
+               })
                return true
        })
 }
diff --git a/server/core/key_convertor.go b/server/core/key_convertor.go
index 296b727f..d0730986 100644
--- a/server/core/key_convertor.go
+++ b/server/core/key_convertor.go
@@ -108,6 +108,10 @@ func GetInfoFromSvcIndexKV(key []byte) *pb.MicroServiceKey 
{
        }
 }
 
+func GetInfoFromSvcAliasKV(key []byte) *pb.MicroServiceKey {
+       return GetInfoFromSvcIndexKV(key)
+}
+
 func GetInfoFromSchemaSummaryKV(key []byte) (domainProject, serviceId, 
schemaId string) {
        keys := KvToResponse(key)
        l := len(keys)
diff --git a/server/plugin/pkg/discovery/aggregate/adaptor.go 
b/server/plugin/pkg/discovery/aggregate/adaptor.go
index cb669806..d48e7d5b 100644
--- a/server/plugin/pkg/discovery/aggregate/adaptor.go
+++ b/server/plugin/pkg/discovery/aggregate/adaptor.go
@@ -16,69 +16,86 @@
 package aggregate
 
 import (
-       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/log"
+       "github.com/apache/incubator-servicecomb-service-center/server/core"
+       
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
        mgr 
"github.com/apache/incubator-servicecomb-service-center/server/plugin"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
-       
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
-       "golang.org/x/net/context"
 )
 
-// Aggregator is a discovery service adaptor implement of one kubernetes 
cluster
-type Aggregator []discovery.Adaptor
-
-func (as Aggregator) Search(ctx context.Context, opts 
...registry.PluginOpOption) (*discovery.Response, error) {
-       var (
-               response discovery.Response
-               exists   = make(map[string]struct{})
-       )
-       for _, a := range as {
-               resp, err := a.Search(ctx, opts...)
-               if err != nil {
-                       continue
-               }
-               for _, kv := range resp.Kvs {
-                       key := util.BytesToStringWithNoCopy(kv.Key)
-                       if _, ok := exists[key]; !ok {
-                               exists[key] = struct{}{}
-                               response.Kvs = append(response.Kvs, kv)
-                       }
-               }
-               response.Count += resp.Count
-       }
-       return &response, nil
+// Aggregator is a discovery service adaptor implement of one registry cluster
+type Aggregator struct {
+       discovery.Indexer
+       Adaptors []discovery.Adaptor
 }
 
-func (as Aggregator) Cache() discovery.Cache {
+func (as *Aggregator) Cache() discovery.Cache {
        var cache Cache
-       for _, a := range as {
+       for _, a := range as.Adaptors {
                cache = append(cache, a.Cache())
        }
        return cache
 }
 
-func (as Aggregator) Run() {
-       for _, a := range as {
+func (as *Aggregator) Run() {
+       for _, a := range as.Adaptors {
                a.Run()
        }
 }
 
-func (as Aggregator) Stop() {
-       for _, a := range as {
+func (as *Aggregator) Stop() {
+       for _, a := range as.Adaptors {
                a.Stop()
        }
 }
 
-func (as Aggregator) Ready() <-chan struct{} {
-       for _, a := range as {
+func (as *Aggregator) Ready() <-chan struct{} {
+       for _, a := range as.Adaptors {
                <-a.Ready()
        }
        return closedCh
 }
 
-func NewAggregator(t discovery.Type, cfg *discovery.Config) (as Aggregator) {
+func getLogConflictFunc(t discovery.Type) func(origin, conflict 
*discovery.KeyValue) {
+       switch t {
+       case backend.SERVICE_INDEX:
+               return func(origin, conflict *discovery.KeyValue) {
+                       if serviceId, conflictId := origin.Value.(string), 
conflict.Value.(string); conflictId != serviceId {
+                               key := core.GetInfoFromSvcIndexKV(conflict.Key)
+                               log.Warnf("conflict! can not merge microservice 
index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
+                                       conflict.ClusterName, conflictId, 
key.Environment, key.AppId, key.ServiceName, key.Version,
+                                       serviceId, origin.ClusterName)
+                       }
+               }
+       case backend.SERVICE_ALIAS:
+               return func(origin, conflict *discovery.KeyValue) {
+                       if serviceId, conflictId := origin.Value.(string), 
conflict.Value.(string); conflictId != serviceId {
+                               key := core.GetInfoFromSvcAliasKV(conflict.Key)
+                               log.Warnf("conflict! can not merge microservice 
alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
+                                       conflict.ClusterName, conflictId, 
key.Environment, key.AppId, key.ServiceName, key.Version,
+                                       serviceId, origin.ClusterName)
+                       }
+               }
+       }
+       return nil
+}
+
+func NewAggregator(t discovery.Type, cfg *discovery.Config) *Aggregator {
+       as := &Aggregator{}
        for _, name := range repos {
                repo := mgr.Plugins().Get(mgr.DISCOVERY, 
name).New().(discovery.AdaptorRepository)
-               as = append(as, repo.New(t, cfg))
+               as.Adaptors = append(as.Adaptors, repo.New(t, cfg))
+       }
+
+       switch t {
+       case backend.SCHEMA:
+               // schema does not been cached, so new the adaptor indexer
+               as.Indexer = NewAdaptorsIndexer(as.Adaptors)
+       case backend.SERVICE_INDEX, backend.SERVICE_ALIAS:
+               NewConflictChecker(as.Cache(), getLogConflictFunc(t))
+               fallthrough
+       default:
+               as.Indexer = discovery.NewCacheIndexer(as.Cache())
        }
        return as
 }
diff --git a/server/plugin/pkg/discovery/aggregate/cache.go 
b/server/plugin/pkg/discovery/aggregate/cache.go
index 28c65ad5..b12fbe93 100644
--- a/server/plugin/pkg/discovery/aggregate/cache.go
+++ b/server/plugin/pkg/discovery/aggregate/cache.go
@@ -15,11 +15,20 @@
 
 package aggregate
 
-import 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
+import (
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
+)
 
 type Cache []discovery.Cache
 
-func (c Cache) Name() string { return c[0].Name() }
+func (c Cache) Name() string {
+       if len(c) == 0 {
+               return ""
+       }
+       return c[0].Name()
+}
+
 func (c Cache) Size() (s int) {
        for _, cache := range c {
                s += cache.Size()
@@ -35,20 +44,54 @@ func (c Cache) Get(k string) (kv *discovery.KeyValue) {
        return
 }
 func (c Cache) GetAll(arr *[]*discovery.KeyValue) (s int) {
+       exists := make(map[string]struct{})
        for _, cache := range c {
-               s += cache.GetAll(arr)
+               var tmp []*discovery.KeyValue
+               if l := cache.GetAll(&tmp); l == 0 {
+                       continue
+               }
+               s += c.append(tmp, arr, exists)
        }
        return
 }
 func (c Cache) GetPrefix(prefix string, arr *[]*discovery.KeyValue) (s int) {
+       exists := make(map[string]struct{})
        for _, cache := range c {
-               s += cache.GetPrefix(prefix, arr)
+               var tmp []*discovery.KeyValue
+               if l := cache.GetPrefix(prefix, &tmp); l == 0 {
+                       continue
+               }
+               s += c.append(tmp, arr, exists)
+       }
+       return
+}
+
+func (c Cache) append(tmp []*discovery.KeyValue, arr *[]*discovery.KeyValue,
+       exists map[string]struct{}) (s int) {
+       for _, kv := range tmp {
+               key := util.BytesToStringWithNoCopy(kv.Key)
+               if _, ok := exists[key]; ok {
+                       continue
+               }
+               exists[key] = struct{}{}
+               if arr != nil {
+                       *arr = append(*arr, kv)
+               }
+               s += 1
        }
        return
 }
+
 func (c Cache) ForEach(iter func(k string, v *discovery.KeyValue) (next bool)) 
{
+       exists := make(map[string]struct{})
        for _, cache := range c {
-               cache.ForEach(iter)
+               cache.ForEach(func(k string, v *discovery.KeyValue) bool {
+                       if _, ok := exists[k]; ok {
+                               return true
+                       }
+                       exists[k] = struct{}{}
+                       return iter(k, v)
+               })
        }
 }
 func (c Cache) Put(k string, v *discovery.KeyValue) { return }
diff --git a/server/plugin/pkg/discovery/aggregate/conflict_checker.go 
b/server/plugin/pkg/discovery/aggregate/conflict_checker.go
new file mode 100644
index 00000000..3eabae96
--- /dev/null
+++ b/server/plugin/pkg/discovery/aggregate/conflict_checker.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate
+
+import (
+       "github.com/apache/incubator-servicecomb-service-center/pkg/gopool"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
+       
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
+       "golang.org/x/net/context"
+       "time"
+)
+
+type ConflictChecker struct {
+       Cache              discovery.Cache
+       ConflictHandleFunc func(origin, conflict *discovery.KeyValue)
+}
+
+func (c *ConflictChecker) Run(ctx context.Context) {
+       d := registry.Configuration().AutoSyncInterval
+       if d == 0 || c.Cache == nil {
+               return
+       }
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(d):
+                       c.Check()
+               }
+       }
+}
+
+func (c *ConflictChecker) Check() {
+       caches, ok := c.Cache.(Cache)
+       if !ok {
+               return
+       }
+
+       var arr []*discovery.KeyValue
+       for _, cache := range caches {
+               cache.GetAll(&arr)
+       }
+
+       exists := make(map[string]*discovery.KeyValue)
+       for _, v := range arr {
+               key := util.BytesToStringWithNoCopy(v.Key)
+               if kv, ok := exists[key]; ok {
+                       c.ConflictHandleFunc(kv, v)
+                       continue
+               }
+               exists[key] = v
+       }
+}
+
+func NewConflictChecker(cache discovery.Cache, f func(origin, conflict 
*discovery.KeyValue)) *ConflictChecker {
+       checker := &ConflictChecker{Cache: cache, ConflictHandleFunc: f}
+       gopool.Go(checker.Run)
+       return checker
+}
diff --git a/server/plugin/pkg/discovery/aggregate/indexer.go 
b/server/plugin/pkg/discovery/aggregate/indexer.go
new file mode 100644
index 00000000..22cc217e
--- /dev/null
+++ b/server/plugin/pkg/discovery/aggregate/indexer.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate
+
+import (
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
+       
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
+       "golang.org/x/net/context"
+)
+
+type AdaptorsIndexer struct {
+       Adaptors []discovery.Adaptor
+}
+
+func (i *AdaptorsIndexer) Search(ctx context.Context, opts 
...registry.PluginOpOption) (*discovery.Response, error) {
+       var (
+               response discovery.Response
+               exists   = make(map[string]struct{})
+       )
+       for _, a := range i.Adaptors {
+               resp, err := a.Search(ctx, opts...)
+               if err != nil {
+                       continue
+               }
+               for _, kv := range resp.Kvs {
+                       key := util.BytesToStringWithNoCopy(kv.Key)
+                       if _, ok := exists[key]; !ok {
+                               exists[key] = struct{}{}
+                               response.Kvs = append(response.Kvs, kv)
+                               response.Count += 1
+                       }
+               }
+       }
+       return &response, nil
+}
+
+func NewAdaptorsIndexer(as []discovery.Adaptor) *AdaptorsIndexer {
+       return &AdaptorsIndexer{Adaptors: as}
+}
diff --git a/server/plugin/pkg/discovery/etcd/cacher_kv.go 
b/server/plugin/pkg/discovery/etcd/cacher_kv.go
index 81362440..b8ab28e3 100644
--- a/server/plugin/pkg/discovery/etcd/cacher_kv.go
+++ b/server/plugin/pkg/discovery/etcd/cacher_kv.go
@@ -412,7 +412,7 @@ func (c *KvCacher) notify(evts []discovery.KvEvent) {
 }
 
 func (c *KvCacher) doParse(src *mvccpb.KeyValue) (kv *discovery.KeyValue) {
-       kv = new(discovery.KeyValue)
+       kv = discovery.NewKeyValue()
        if err := FromEtcdKeyValue(kv, src, c.Cfg.Parser); err != nil {
                log.Errorf(err, "parse %s value failed", 
util.BytesToStringWithNoCopy(src.Key))
                return nil
diff --git a/server/plugin/pkg/discovery/indexer_cache.go 
b/server/plugin/pkg/discovery/indexer_cache.go
index f10b755f..170265f1 100644
--- a/server/plugin/pkg/discovery/indexer_cache.go
+++ b/server/plugin/pkg/discovery/indexer_cache.go
@@ -21,13 +21,11 @@ import (
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
        "golang.org/x/net/context"
-       "sync"
        "time"
 )
 
 type CacheIndexer struct {
        Cache Cache
-       lock  sync.Mutex
 }
 
 func (i *CacheIndexer) Search(ctx context.Context, opts 
...registry.PluginOpOption) (resp *Response, _ error) {
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/convertor.go 
b/server/plugin/pkg/discovery/k8s/adaptor/convertor.go
index 8bc2a08d..b191df2e 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/convertor.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/convertor.go
@@ -113,9 +113,9 @@ func FromK8sService(domainProject string, svc *v1.Service) 
(ms *pb.MicroService)
 
 func AsKeyValue(key string, v interface{}, resourceVersion string) 
*discovery.KeyValue {
        rev, _ := strconv.ParseInt(resourceVersion, 10, 64)
-       return &discovery.KeyValue{
-               Key:         util.StringToBytesWithNoCopy(key),
-               Value:       v,
-               ModRevision: rev,
-       }
+       kv := discovery.NewKeyValue()
+       kv.Key = util.StringToBytesWithNoCopy(key)
+       kv.Value = v
+       kv.ModRevision = rev
+       return kv
 }
diff --git a/server/plugin/pkg/discovery/servicecenter/adaptor.go 
b/server/plugin/pkg/discovery/servicecenter/adaptor.go
index 99726a52..0cf123e5 100644
--- a/server/plugin/pkg/discovery/servicecenter/adaptor.go
+++ b/server/plugin/pkg/discovery/servicecenter/adaptor.go
@@ -48,7 +48,7 @@ func (se *ServiceCenterAdaptor) Ready() <-chan struct{} {
 func NewServiceCenterAdaptor(t discovery.Type, cfg *discovery.Config) 
*ServiceCenterAdaptor {
        if t == backend.SCHEMA {
                return &ServiceCenterAdaptor{
-                       Indexer: ServiceCenter(),
+                       Indexer: GetOrCreateClusterIndexer(),
                        Cacher:  discovery.NullCacher,
                }
        }
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go 
b/server/plugin/pkg/discovery/servicecenter/aggregate.go
index 6c654e55..a53010fa 100644
--- a/server/plugin/pkg/discovery/servicecenter/aggregate.go
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go
@@ -40,25 +40,40 @@ func getClientTLS() (*tls.Config, error) {
        return clientTLS, err
 }
 
-func (c *SCClientAggregate) GetScCache() (*model.Cache, error) {
-       var caches model.Cache
+func (c *SCClientAggregate) GetScCache() (*model.Cache, map[string]error) {
+       var caches *model.Cache
+       errs := make(map[string]error)
        for _, client := range *c {
                cache, err := client.GetScCache()
                if err != nil {
-                       log.Errorf(err, "get service center cache failed")
+                       errs[client.Cfg.Name] = err
                        continue
                }
-               caches.Microservices = append(caches.Microservices, 
cache.Microservices...)
-               caches.Indexes = append(caches.Indexes, cache.Indexes...)
-               caches.Aliases = append(caches.Aliases, cache.Aliases...)
-               caches.Tags = append(caches.Tags, cache.Tags...)
-               caches.Rules = append(caches.Rules, cache.Rules...)
-               caches.RuleIndexes = append(caches.RuleIndexes, 
cache.RuleIndexes...)
-               caches.DependencyRules = append(caches.DependencyRules, 
cache.DependencyRules...)
-               caches.Summaries = append(caches.Summaries, cache.Summaries...)
-               caches.Instances = append(caches.Instances, cache.Instances...)
+
+               if caches == nil {
+                       caches = &model.Cache{}
+               }
+
+               c.cacheAppend(client.Cfg.Name, &caches.Microservices, 
&cache.Microservices)
+               c.cacheAppend(client.Cfg.Name, &caches.Indexes, &cache.Indexes)
+               c.cacheAppend(client.Cfg.Name, &caches.Aliases, &cache.Aliases)
+               c.cacheAppend(client.Cfg.Name, &caches.Tags, &cache.Tags)
+               c.cacheAppend(client.Cfg.Name, &caches.Rules, &cache.Rules)
+               c.cacheAppend(client.Cfg.Name, &caches.RuleIndexes, 
&cache.RuleIndexes)
+               c.cacheAppend(client.Cfg.Name, &caches.DependencyRules, 
&cache.DependencyRules)
+               c.cacheAppend(client.Cfg.Name, &caches.Summaries, 
&cache.Summaries)
+               c.cacheAppend(client.Cfg.Name, &caches.Instances, 
&cache.Instances)
        }
-       return &caches, nil
+       return caches, errs
+}
+
+func (c *SCClientAggregate) cacheAppend(name string, setter model.Setter, 
getter model.Getter) {
+       getter.ForEach(func(_ int, v *model.KV) bool {
+               // overwrite the cluster from remote to local
+               v.ClusterName = name
+               setter.SetValue(v)
+               return true
+       })
 }
 
 func (c *SCClientAggregate) GetSchemasByServiceId(domainProject, serviceId 
string) ([]*pb.Schema, *scerr.Error) {
@@ -97,7 +112,7 @@ func NewSCClientAggregate() *SCClientAggregate {
                if len(name) == 0 || name == 
registry.Configuration().ClusterName {
                        continue
                }
-               client, err := sc.NewSCClient(sc.Config{Endpoints: endpoints})
+               client, err := sc.NewSCClient(sc.Config{Name: name, Endpoints: 
endpoints})
                if err != nil {
                        log.Errorf(err, "new service center[%s]%v client 
failed", name, endpoints)
                        continue
diff --git a/server/plugin/pkg/discovery/servicecenter/cacher.go 
b/server/plugin/pkg/discovery/servicecenter/cacher.go
index 01eb9b4e..d8acd29f 100644
--- a/server/plugin/pkg/discovery/servicecenter/cacher.go
+++ b/server/plugin/pkg/discovery/servicecenter/cacher.go
@@ -24,7 +24,7 @@ type ServiceCenterCacher struct {
 }
 
 func (c *ServiceCenterCacher) Ready() <-chan struct{} {
-       return ServiceCenter().Ready()
+       return GetOrCreateClusterIndexer().Ready()
 }
 
 func NewServiceCenterCacher(cfg *discovery.Config, cache discovery.Cache) 
*ServiceCenterCacher {
@@ -35,6 +35,6 @@ func NewServiceCenterCacher(cfg *discovery.Config, cache 
discovery.Cache) *Servi
 
 func BuildCacher(t discovery.Type, cfg *discovery.Config, cache 
discovery.Cache) discovery.Cacher {
        cr := NewServiceCenterCacher(cfg, cache)
-       ServiceCenter().AddCacher(t, cr)
+       GetOrCreateClusterIndexer().AddCacher(t, cr)
        return cr
 }
diff --git a/server/plugin/pkg/discovery/servicecenter/cluster.go 
b/server/plugin/pkg/discovery/servicecenter/indexer.go
similarity index 53%
rename from server/plugin/pkg/discovery/servicecenter/cluster.go
rename to server/plugin/pkg/discovery/servicecenter/indexer.go
index e6f4be93..4cf6563c 100644
--- a/server/plugin/pkg/discovery/servicecenter/cluster.go
+++ b/server/plugin/pkg/discovery/servicecenter/indexer.go
@@ -33,22 +33,22 @@ import (
 )
 
 var (
-       cluster     *ServiceCenterCluster
+       cluster     *ClusterIndexer
        clusterOnce sync.Once
 )
 
-type ServiceCenterCluster struct {
+type ClusterIndexer struct {
        Client *SCClientAggregate
 
        cachers map[discovery.Type]*ServiceCenterCacher
 }
 
-func (c *ServiceCenterCluster) Initialize() {
+func (c *ClusterIndexer) Initialize() {
        c.cachers = make(map[discovery.Type]*ServiceCenterCacher)
        c.Client = NewSCClientAggregate()
 }
 
-func (c *ServiceCenterCluster) Search(ctx context.Context, opts 
...registry.PluginOpOption) (r *discovery.Response, err error) {
+func (c *ClusterIndexer) Search(ctx context.Context, opts 
...registry.PluginOpOption) (r *discovery.Response, err error) {
        op := registry.OpGet(opts...)
        key := util.BytesToStringWithNoCopy(op.Key)
        switch {
@@ -85,9 +85,10 @@ func (c *ServiceCenterCluster) Search(ctx context.Context, 
opts ...registry.Plug
        }
 }
 
-func (c *ServiceCenterCluster) Sync(ctx context.Context) error {
-       cache, err := c.Client.GetScCache()
-       if err != nil {
+func (c *ClusterIndexer) Sync(ctx context.Context) error {
+       cache, errs := c.Client.GetScCache()
+       if cache == nil && len(errs) > 0 {
+               err := fmt.Errorf("%v", errs)
                log.Errorf(err, "sync failed")
                return err
        }
@@ -95,60 +96,76 @@ func (c *ServiceCenterCluster) Sync(ctx context.Context) 
error {
        // microservice
        serviceCacher, ok := c.cachers[backend.SERVICE]
        if ok {
-               c.check(serviceCacher, &cache.Microservices)
+               c.check(serviceCacher, &cache.Microservices, errs)
        }
        aliasCacher, ok := c.cachers[backend.SERVICE_ALIAS]
        if ok {
-               c.check(aliasCacher, &cache.Aliases)
+               c.checkWithConflictHandleFunc(aliasCacher, &cache.Aliases, 
errs, c.logConflictFunc)
        }
        indexCacher, ok := c.cachers[backend.SERVICE_INDEX]
        if ok {
-               c.check(indexCacher, &cache.Indexes)
+               c.checkWithConflictHandleFunc(indexCacher, &cache.Indexes, 
errs, c.logConflictFunc)
        }
        // instance
        instCacher, ok := c.cachers[backend.INSTANCE]
        if ok {
-               c.check(instCacher, &cache.Instances)
+               c.check(instCacher, &cache.Instances, errs)
        }
        // microservice meta
        tagCacher, ok := c.cachers[backend.SERVICE_TAG]
        if ok {
-               c.check(tagCacher, &cache.Tags)
+               c.check(tagCacher, &cache.Tags, errs)
        }
        ruleCacher, ok := c.cachers[backend.RULE]
        if ok {
-               c.check(ruleCacher, &cache.Rules)
+               c.check(ruleCacher, &cache.Rules, errs)
        }
        ruleIndexCacher, ok := c.cachers[backend.RULE_INDEX]
        if ok {
-               c.check(ruleIndexCacher, &cache.RuleIndexes)
+               c.check(ruleIndexCacher, &cache.RuleIndexes, errs)
        }
        depRuleCacher, ok := c.cachers[backend.DEPENDENCY_RULE]
        if ok {
-               c.check(depRuleCacher, &cache.DependencyRules)
+               c.check(depRuleCacher, &cache.DependencyRules, errs)
        }
        return nil
 }
 
-func (c *ServiceCenterCluster) check(local *ServiceCenterCacher, remote 
model.Getter) {
-       exists := make(map[string]struct{})
-       remote.ForEach(func(_ int, v *model.KV) bool {
-               if _, ok := exists[v.Key]; ok {
+func (c *ClusterIndexer) check(local *ServiceCenterCacher, remote 
model.Getter, skipClusters map[string]error) {
+       c.checkWithConflictHandleFunc(local, remote, skipClusters, 
c.skipHandleFunc)
+}
+
+func (c *ClusterIndexer) checkWithConflictHandleFunc(local 
*ServiceCenterCacher, remote model.Getter, skipClusters map[string]error,
+       conflictHandleFunc func(origin *model.KV, conflict model.Getter, index 
int)) {
+       exists := make(map[string]*model.KV)
+       remote.ForEach(func(i int, v *model.KV) bool {
+               if kv, ok := exists[v.Key]; ok {
+                       conflictHandleFunc(kv, remote, i)
                        return true
                }
-               exists[v.Key] = struct{}{}
+               exists[v.Key] = v
                kv := local.Cache().Get(v.Key)
                newKv := &discovery.KeyValue{
-                       Key:            util.StringToBytesWithNoCopy(v.Key),
-                       Value:          v.Value,
-                       Version:        v.Rev,
-                       CreateRevision: v.Rev,
-                       ModRevision:    v.Rev,
+                       Key:         util.StringToBytesWithNoCopy(v.Key),
+                       Value:       v.Value,
+                       ModRevision: v.Rev,
+                       ClusterName: v.ClusterName,
                }
                switch {
                case kv == nil:
+                       newKv.Version = 1
+                       newKv.CreateRevision = v.Rev
                        local.Notify(pb.EVT_CREATE, v.Key, newKv)
                case kv.ModRevision != v.Rev:
+                       // if lose some cluster kvs, then skip to notify 
changes of this cluster
+                       // to prevent publish the wrong changes events of kvs
+                       if err, ok := skipClusters[kv.ClusterName]; ok {
+                               log.Errorf(err, "cluster[%s] temporarily 
unavailable, skip cluster[%s] event %s %s",
+                                       kv.ClusterName, v.ClusterName, 
pb.EVT_UPDATE, v.Key)
+                               break
+                       }
+                       newKv.Version = kv.ModRevision - kv.ModRevision
+                       newKv.CreateRevision = kv.CreateRevision
                        local.Notify(pb.EVT_UPDATE, v.Key, newKv)
                }
                return true
@@ -162,6 +179,11 @@ func (c *ServiceCenterCluster) check(local 
*ServiceCenterCacher, remote model.Ge
                        return !exist
                })
                if !exist {
+                       if err, ok := skipClusters[v.ClusterName]; ok {
+                               log.Errorf(err, "cluster[%s] temporarily 
unavailable, skip event %s %s",
+                                       v.ClusterName, pb.EVT_DELETE, v.Key)
+                               return true
+                       }
                        deletes = append(deletes, v)
                }
                return true
@@ -171,17 +193,47 @@ func (c *ServiceCenterCluster) check(local 
*ServiceCenterCacher, remote model.Ge
        }
 }
 
-func (c *ServiceCenterCluster) loop(ctx context.Context) {
+func (c *ClusterIndexer) skipHandleFunc(origin *model.KV, conflict 
model.Getter, index int) {
+}
+
+func (c *ClusterIndexer) logConflictFunc(origin *model.KV, conflict 
model.Getter, index int) {
+       switch conflict.(type) {
+       case *model.MicroserviceIndexSlice:
+               slice := conflict.(*model.MicroserviceIndexSlice)
+               kv := (*slice)[index]
+               if serviceId := origin.Value.(string); kv.Value != serviceId {
+                       key := 
core.GetInfoFromSvcIndexKV(util.StringToBytesWithNoCopy(kv.Key))
+                       log.Warnf("conflict! can not merge microservice 
index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
+                               kv.ClusterName, kv.Value, key.Environment, 
key.AppId, key.ServiceName, key.Version,
+                               serviceId, origin.ClusterName)
+               }
+       case *model.MicroserviceAliasSlice:
+               slice := conflict.(*model.MicroserviceAliasSlice)
+               kv := (*slice)[index]
+               if serviceId := origin.Value.(string); kv.Value != serviceId {
+                       key := 
core.GetInfoFromSvcAliasKV(util.StringToBytesWithNoCopy(kv.Key))
+                       log.Warnf("conflict! can not merge microservice 
alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]",
+                               kv.ClusterName, kv.Value, key.Environment, 
key.AppId, key.ServiceName, key.Version,
+                               serviceId, origin.ClusterName)
+               }
+       }
+}
+
+func (c *ClusterIndexer) loop(ctx context.Context) {
        select {
        case <-ctx.Done():
        case <-time.After(minWaitInterval):
                c.Sync(ctx)
+               d := registry.Configuration().AutoSyncInterval
+               if d == 0 {
+                       return
+               }
        loop:
                for {
                        select {
                        case <-ctx.Done():
                                break loop
-                       case 
<-time.After(registry.Configuration().AutoSyncInterval):
+                       case <-time.After(d):
                                // TODO support watching sc
                                c.Sync(ctx)
                        }
@@ -192,24 +244,24 @@ func (c *ServiceCenterCluster) loop(ctx context.Context) {
 }
 
 // unsafe
-func (c *ServiceCenterCluster) AddCacher(t discovery.Type, cacher 
*ServiceCenterCacher) {
+func (c *ClusterIndexer) AddCacher(t discovery.Type, cacher 
*ServiceCenterCacher) {
        c.cachers[t] = cacher
 }
 
-func (c *ServiceCenterCluster) Run() {
+func (c *ClusterIndexer) Run() {
        c.Initialize()
        gopool.Go(c.loop)
 }
 
-func (c *ServiceCenterCluster) Stop() {}
+func (c *ClusterIndexer) Stop() {}
 
-func (c *ServiceCenterCluster) Ready() <-chan struct{} {
+func (c *ClusterIndexer) Ready() <-chan struct{} {
        return closedCh
 }
 
-func ServiceCenter() *ServiceCenterCluster {
+func GetOrCreateClusterIndexer() *ClusterIndexer {
        clusterOnce.Do(func() {
-               cluster = &ServiceCenterCluster{}
+               cluster = &ClusterIndexer{}
                cluster.Run()
        })
        return cluster
diff --git a/server/plugin/pkg/discovery/types.go 
b/server/plugin/pkg/discovery/types.go
index fd918af6..9ca1f408 100644
--- a/server/plugin/pkg/discovery/types.go
+++ b/server/plugin/pkg/discovery/types.go
@@ -20,6 +20,7 @@ import (
        "fmt"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
        "strconv"
 )
 
@@ -62,12 +63,17 @@ type KeyValue struct {
        Version        int64
        CreateRevision int64
        ModRevision    int64
+       ClusterName    string
 }
 
 func (kv *KeyValue) String() string {
        b, _ := json.Marshal(kv.Value)
-       return fmt.Sprintf("{key: '%s', value: %s, version: %d}",
-               util.BytesToStringWithNoCopy(kv.Key), 
util.BytesToStringWithNoCopy(b), kv.Version)
+       return fmt.Sprintf("{key: '%s', value: %s, version: %d, cluster: '%s'}",
+               util.BytesToStringWithNoCopy(kv.Key), 
util.BytesToStringWithNoCopy(b), kv.Version, kv.ClusterName)
+}
+
+func NewKeyValue() *KeyValue {
+       return &KeyValue{ClusterName: registry.Configuration().ClusterName}
 }
 
 type Response struct {
diff --git a/server/plugin/pkg/discovery/types_test.go 
b/server/plugin/pkg/discovery/types_test.go
index e99f764f..b1e25952 100644
--- a/server/plugin/pkg/discovery/types_test.go
+++ b/server/plugin/pkg/discovery/types_test.go
@@ -42,8 +42,8 @@ func TestTypes(t *testing.T) {
                t.Fatalf("TestTypes failed")
        }
 
-       var kv KeyValue
-       if kv.String() != "{key: '', value: null, version: 0}" {
+       kv := NewKeyValue()
+       if kv.String() != "{key: '', value: null, version: 0, cluster: 
'default'}" {
                t.Fatalf("TestTypes failed, %v", kv.String())
        }
 }
diff --git a/server/service/event/service_event_handler.go 
b/server/service/event/service_event_handler.go
index 8daf89c5..a18ce632 100644
--- a/server/service/event/service_event_handler.go
+++ b/server/service/event/service_event_handler.go
@@ -60,8 +60,8 @@ func (h *ServiceEventHandler) OnEvent(evt discovery.KvEvent) {
                return
        }
 
-       log.Infof("caught [%s] service[%s/%s/%s/%s] event",
-               evt.Type, ms.Environment, ms.AppId, ms.ServiceName, ms.Version)
+       log.Infof("caught [%s] service[%s][%s/%s/%s/%s] event",
+               evt.Type, ms.ServiceId, ms.Environment, ms.AppId, 
ms.ServiceName, ms.Version)
 
        // cache
        providerKey := pb.MicroServiceToKey(domainProject, ms)
diff --git a/server/service/instance.go b/server/service/instance.go
index 893bfb81..533f67b4 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -66,9 +66,8 @@ func (s *InstanceService) preProcessRegisterInstance(ctx 
context.Context, instan
                // Health 
check对象仅用于呈现服务健康检查逻辑,如果CHECK_BY_PLATFORM类型,表明由sidecar代发心跳,实例120s超时
                switch instance.HealthCheck.Mode {
                case pb.CHECK_BY_HEARTBEAT:
-                       if instance.HealthCheck.Interval <= 0 || 
instance.HealthCheck.Interval >= math.MaxInt32 ||
-                               instance.HealthCheck.Times <= 0 || 
instance.HealthCheck.Times >= math.MaxInt32 ||
-                               
instance.HealthCheck.Interval*(instance.HealthCheck.Times+1) >= math.MaxInt32 {
+                       d := instance.HealthCheck.Interval * 
(instance.HealthCheck.Times + 1)
+                       if d <= 0 || d >= math.MaxInt32 {
                                return scerr.NewError(scerr.ErrInvalidParams, 
"Invalid 'healthCheck' settings in request body.")
                        }
                case pb.CHECK_BY_PLATFORM:
diff --git a/server/service/instance_test.go b/server/service/instance_test.go
index 23f6abe2..4fb6cd21 100644
--- a/server/service/instance_test.go
+++ b/server/service/instance_test.go
@@ -264,6 +264,24 @@ var _ = Describe("'Instance' service", func() {
                                Expect(err).To(BeNil())
                                
Expect(resp.Response.Code).To(Equal(pb.Response_SUCCESS))
 
+                               resp, err = 
instanceResource.Register(getContext(), &pb.RegisterInstanceRequest{
+                                       Instance: &pb.MicroServiceInstance{
+                                               ServiceId: serviceId1,
+                                               Endpoints: []string{
+                                                       
"checkpush:127.0.0.1:8081",
+                                               },
+                                               HostName: "UT-HOST",
+                                               Status:   pb.MSI_UP,
+                                               HealthCheck: &pb.HealthCheck{
+                                                       Mode:     "push",
+                                                       Interval: 30,
+                                                       Times:    0,
+                                               },
+                                       },
+                               })
+                               Expect(err).To(BeNil())
+                               
Expect(resp.Response.Code).To(Equal(pb.Response_SUCCESS))
+
                                By("check normal pull healthChceck")
                                resp, err = 
instanceResource.Register(getContext(), &pb.RegisterInstanceRequest{
                                        Instance: &pb.MicroServiceInstance{
@@ -296,7 +314,7 @@ var _ = Describe("'Instance' service", func() {
                                                HealthCheck: &pb.HealthCheck{
                                                        Mode:     "push",
                                                        Interval: 30,
-                                                       Times:    0,
+                                                       Times:    -1,
                                                },
                                        },
                                })


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Bug fixes
> ---------
>
>                 Key: SCB-993
>                 URL: https://issues.apache.org/jira/browse/SCB-993
>             Project: Apache ServiceComb
>          Issue Type: Bug
>          Components: Service-Center
>            Reporter: little-cui
>            Assignee: little-cui
>            Priority: Major
>             Fix For: service-center-1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to