This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 37ae980  SCB-544 Convenient store extension (#339)
37ae980 is described below

commit 37ae980a8d01a88194c9edf02707e5345e810f5e
Author: little-cui <sure_0...@qq.com>
AuthorDate: Mon May 7 15:47:45 2018 +0800

    SCB-544 Convenient store extension (#339)
    
    * SCB-544 Convenient store extension
    
    (cherry picked from commit d2dbe7c)
    
    * SCB-544 Convenient store extension
---
 server/broker/broker_suite_test.go                 |  45 ++++++
 server/broker/service.go                           |   2 +-
 server/broker/service_test.go                      |   3 -
 server/broker/store.go                             | 171 ++++-----------------
 server/core/backend/store/cache_kv.go              |  34 ++--
 server/core/backend/store/cacher.go                |   1 +
 server/core/backend/store/common.go                |  21 ++-
 server/core/backend/store/{opt.go => config.go}    |  52 ++++---
 server/core/backend/store/event.go                 |  37 -----
 .../backend/store/{event.go => event_proxy.go}     |  42 ++---
 server/core/backend/store/extend.go                |  84 ++++++++++
 .../backend/store/{cacher.go => extend_test.go}    |  39 +++--
 server/core/backend/store/indexer.go               |  25 ++-
 server/core/backend/store/listwatch.go             |  34 ++--
 server/core/backend/store/store.go                 |  33 +++-
 server/error/error.go                              |  14 +-
 server/rest/controller/rest_util.go                |  17 +-
 server/rest/controller/v3/main_controller.go       |  24 ++-
 server/rest/controller/v4/main_controller.go       |  11 +-
 server/service/schema.go                           |   1 -
 20 files changed, 353 insertions(+), 337 deletions(-)

diff --git a/server/broker/broker_suite_test.go 
b/server/broker/broker_suite_test.go
new file mode 100644
index 0000000..8bc652e
--- /dev/null
+++ b/server/broker/broker_suite_test.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package broker
+
+import (
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/quota/buildin"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/registry/etcd"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin"
+       _ 
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin"
+       "github.com/apache/incubator-servicecomb-service-center/server/service"
+       . "github.com/onsi/ginkgo"
+       "github.com/onsi/ginkgo/reporters"
+       . "github.com/onsi/gomega"
+       "testing"
+)
+
+var serviceResource pb.ServiceCtrlServer
+var instanceResource pb.SerivceInstanceCtrlServerEx
+var brokerResource = BrokerServiceAPI
+
+var _ = BeforeSuite(func() {
+       //init plugin
+       serviceResource, instanceResource = service.AssembleResources()
+})
+
+func TestBroker(t *testing.T) {
+       RegisterFailHandler(Fail)
+       junitReporter := reporters.NewJUnitReporter("model.junit.xml")
+       RunSpecsWithDefaultAndCustomReporters(t, "model Suite", 
[]Reporter{junitReporter})
+}
diff --git a/server/broker/service.go b/server/broker/service.go
index 3ae63c6..f4ea526 100644
--- a/server/broker/service.go
+++ b/server/broker/service.go
@@ -62,7 +62,7 @@ func (*BrokerService) GetPactsOfProvider(ctx context.Context,
                PactLogger.Errorf(nil, "Get pacts of provider failed: %s\n",
                        resp.Response.Message)
                return &GetProviderConsumerVersionPactResponse{
-                       Response: pb.CreateResponse(scerr.ErrInvalidParams, 
err.Error()),
+                       Response: resp.GetResponse(),
                }, err
        }
 
diff --git a/server/broker/service_test.go b/server/broker/service_test.go
index 516b8c3..ac910a1 100644
--- a/server/broker/service_test.go
+++ b/server/broker/service_test.go
@@ -20,7 +20,6 @@ import (
        "fmt"
 
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
-       "github.com/apache/incubator-servicecomb-service-center/server/core"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        . "github.com/onsi/ginkgo"
        . "github.com/onsi/gomega"
@@ -41,8 +40,6 @@ const (
        TEST_BROKER_PROVIDER_APP     = "broker_group_provider"
 )
 
-var brokerResource = BrokerServiceAPI
-var serviceResource = core.ServiceAPI
 var consumerServiceId string
 var providerServiceId string
 
diff --git a/server/broker/store.go b/server/broker/store.go
index 7ed9c02..3bfe982 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -17,166 +17,63 @@
 package broker
 
 import (
-       "sync"
-
-       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
-       sstore 
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
-       "golang.org/x/net/context"
+       
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
 )
 
-const (
-       PARTICIPANT sstore.StoreType = iota
-       VERSION
-       PACT
-       PACT_VERSION
-       PACT_TAG
-       VERIFICATION
-       PACT_LATEST
-       typeEnd
+var (
+       PARTICIPANT  store.StoreType
+       VERSION      store.StoreType
+       PACT         store.StoreType
+       PACT_VERSION store.StoreType
+       PACT_TAG     store.StoreType
+       VERIFICATION store.StoreType
+       PACT_LATEST  store.StoreType
 )
 
-var TypeNames = []string{
-       PARTICIPANT:  "PARTICIPANT",
-       VERSION:      "VERSION",
-       PACT:         "PACT",
-       PACT_VERSION: "PACT_VERSION",
-       PACT_TAG:     "PACT_TAG",
-       VERIFICATION: "VERIFICATION",
-       PACT_LATEST:  "PACT_LATEST",
-}
-
-var TypeRoots = map[sstore.StoreType]string{
-       PARTICIPANT:  GetBrokerParticipantKey(""),
-       VERSION:      GetBrokerVersionKey(""),
-       PACT:         GetBrokerPactKey(""),
-       PACT_VERSION: GetBrokerPactVersionKey(""),
-       PACT_TAG:     GetBrokerTagKey(""),
-       VERIFICATION: GetBrokerVerificationKey(""),
-       PACT_LATEST:  GetBrokerLatestKey(""),
-}
-
-var store = &BKvStore{}
-
-func Store() *BKvStore {
-       return store
-}
-
-func (s *BKvStore) StoreSize(t sstore.StoreType) int {
-       return 100
-}
-
-func (s *BKvStore) newStore(t sstore.StoreType, opts 
...sstore.KvCacherCfgOption) {
-       opts = append(opts,
-               sstore.WithKey(TypeRoots[t]),
-               sstore.WithInitSize(s.StoreSize(t)),
-       )
-       s.newIndexer(t, sstore.NewKvCacher(t.String(), opts...))
-}
-
-func (s *BKvStore) store(ctx context.Context) {
-       for t := sstore.StoreType(0); t != typeEnd; t++ {
-               s.newStore(t)
-       }
-       for _, i := range s.bindexers {
-               select {
-               case <-ctx.Done():
-                       return
-               case <-i.Ready():
-               }
-       }
-       util.SafeCloseChan(s.bready)
-
-       util.Logger().Debugf("all indexers are ready")
-}
+var brokerKvStore = &BKvStore{}
 
 func init() {
-       store.Initialize()
-       store.Run()
-       store.Ready()
-}
+       PARTICIPANT = store.Store().MustInstall(store.NewEntity("PARTICIPANT", 
GetBrokerParticipantKey("")))
+       VERSION = store.Store().MustInstall(store.NewEntity("VERSION", 
GetBrokerVersionKey("")))
+       PACT = store.Store().MustInstall(store.NewEntity("PACT", 
GetBrokerPactKey("")))
+       PACT_VERSION = 
store.Store().MustInstall(store.NewEntity("PACT_VERSION", 
GetBrokerPactVersionKey("")))
+       PACT_TAG = store.Store().MustInstall(store.NewEntity("PACT_TAG", 
GetBrokerTagKey("")))
+       VERIFICATION = 
store.Store().MustInstall(store.NewEntity("VERIFICATION", 
GetBrokerVerificationKey("")))
+       PACT_LATEST = store.Store().MustInstall(store.NewEntity("PACT_LATEST", 
GetBrokerLatestKey("")))
 
-type BKvStore struct {
-       *sstore.KvStore
-       bindexers map[sstore.StoreType]*sstore.Indexer
-       block     sync.RWMutex
-       bready    chan struct{}
-       bisClose  bool
-}
-
-func (s *BKvStore) Initialize() {
-       s.KvStore = sstore.Store()
-       s.KvStore.Initialize()
-       s.bindexers = make(map[sstore.StoreType]*sstore.Indexer)
-       s.bready = make(chan struct{})
-
-       for i := sstore.StoreType(0); i != typeEnd; i++ {
-               store.newNullStore(i)
-       }
-}
-
-func (s *BKvStore) newNullStore(t sstore.StoreType) {
-       s.newIndexer(t, sstore.NullCacher)
-}
-
-func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) {
-       indexer := sstore.NewCacheIndexer(cacher)
-       s.bindexers[t] = indexer
-       indexer.Run()
 }
 
-func (s *BKvStore) Run() {
-       util.Go(func(ctx context.Context) {
-               s.store(ctx)
-               select {
-               case <-ctx.Done():
-                       s.Stop()
-               }
-       })
-}
-
-func (s *BKvStore) Ready() <-chan struct{} {
-       return s.bready
+type BKvStore struct {
 }
 
-func (s *BKvStore) Participant() *sstore.Indexer {
-       return s.bindexers[PARTICIPANT]
+func (s *BKvStore) Participant() *store.Indexer {
+       return store.Store().Entity(PARTICIPANT)
 }
 
-func (s *BKvStore) Version() *sstore.Indexer {
-       return s.bindexers[VERSION]
+func (s *BKvStore) Version() *store.Indexer {
+       return store.Store().Entity(VERSION)
 }
 
-func (s *BKvStore) Pact() *sstore.Indexer {
-       return s.bindexers[PACT]
+func (s *BKvStore) Pact() *store.Indexer {
+       return store.Store().Entity(PACT)
 }
 
-func (s *BKvStore) PactVersion() *sstore.Indexer {
-       return s.bindexers[PACT_VERSION]
+func (s *BKvStore) PactVersion() *store.Indexer {
+       return store.Store().Entity(PACT_VERSION)
 }
 
-func (s *BKvStore) PactTag() *sstore.Indexer {
-       return s.bindexers[PACT_TAG]
+func (s *BKvStore) PactTag() *store.Indexer {
+       return store.Store().Entity(PACT_TAG)
 }
 
-func (s *BKvStore) Verification() *sstore.Indexer {
-       return s.bindexers[VERIFICATION]
+func (s *BKvStore) Verification() *store.Indexer {
+       return store.Store().Entity(VERIFICATION)
 }
 
-func (s *BKvStore) PactLatest() *sstore.Indexer {
-       return s.bindexers[PACT_LATEST]
+func (s *BKvStore) PactLatest() *store.Indexer {
+       return store.Store().Entity(PACT_LATEST)
 }
 
-func (s *BKvStore) Stop() {
-       if s.bisClose {
-               return
-       }
-       s.bisClose = true
-
-       for _, i := range s.bindexers {
-               i.Stop()
-       }
-
-       util.SafeCloseChan(s.bready)
-
-       util.Logger().Debugf("broker store daemon stopped")
+func Store() *BKvStore {
+       return brokerKvStore
 }
diff --git a/server/core/backend/store/cache_kv.go 
b/server/core/backend/store/cache_kv.go
index fcced8f..9fb347e 100644
--- a/server/core/backend/store/cache_kv.go
+++ b/server/core/backend/store/cache_kv.go
@@ -95,7 +95,7 @@ func (c *KvCache) compact() {
        c.store = newCache
 
        util.Logger().Infof("cache %s is not in use over %s, compact capacity 
to size %d->%d",
-               c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
+               c.owner.Cfg.Prefix, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, 
c.size)
 
 }
 
@@ -107,7 +107,7 @@ func (c *KvCache) Size() (l int) {
 }
 
 type KvCacher struct {
-       Cfg KvCacherCfg
+       Cfg Config
 
        name         string
        lastRev      int64
@@ -143,13 +143,13 @@ func (c *KvCacher) needList() bool {
        }
 
        util.Logger().Debugf("no events come in more then %s, need to list key 
%s, rev: %d",
-               time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Key, rev)
+               time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Prefix, rev)
        c.noEventCount = 0
        return true
 }
 
-func (c *KvCacher) doList(listOps ListOptions) error {
-       kvs, err := c.lw.List(listOps)
+func (c *KvCacher) doList(cfg ListWatchConfig) error {
+       kvs, err := c.lw.List(cfg)
        if err != nil {
                return err
        }
@@ -161,13 +161,13 @@ func (c *KvCacher) doList(listOps ListOptions) error {
                util.Logger().Warnf(nil, "most of the protected data(%d/%d) are 
recovered", kc, c.cache.Size())
        }
        c.sync(evts)
-       util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: 
%d", c.Cfg.Key, len(kvs), c.lw.Revision())
+       util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: 
%d", c.Cfg.Prefix, len(kvs), c.lw.Revision())
 
        return nil
 }
 
-func (c *KvCacher) doWatch(listOps ListOptions) error {
-       watcher := c.lw.Watch(listOps)
+func (c *KvCacher) doWatch(cfg ListWatchConfig) error {
+       watcher := c.lw.Watch(cfg)
        return c.handleWatcher(watcher)
 }
 
@@ -175,19 +175,19 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) 
error {
        c.mux.Lock()
        defer c.mux.Unlock()
 
-       listOps := ListOptions{
+       cfg := ListWatchConfig{
                Timeout: c.Cfg.Timeout,
                Context: ctx,
        }
        if c.needList() {
-               if err := c.doList(listOps); err != nil {
+               if err := c.doList(cfg); err != nil {
                        return err
                }
        }
 
        util.SafeCloseChan(c.ready)
 
-       return c.doWatch(listOps)
+       return c.doWatch(cfg)
 }
 
 func (c *KvCacher) handleWatcher(watcher *Watcher) error {
@@ -336,7 +336,7 @@ func (c *KvCacher) filterDelete(store 
map[string]*mvccpb.KeyValue, newStore map[
                block[i] = KvEvent{
                        Revision: rev,
                        Type:     proto.EVT_DELETE,
-                       Prefix:   c.Cfg.Key,
+                       Prefix:   c.Cfg.Prefix,
                        Object:   v,
                }
                i++
@@ -365,7 +365,7 @@ func (c *KvCacher) filterCreateOrUpdate(store 
map[string]*mvccpb.KeyValue, newSt
                        block[i] = KvEvent{
                                Revision: rev,
                                Type:     proto.EVT_CREATE,
-                               Prefix:   c.Cfg.Key,
+                               Prefix:   c.Cfg.Prefix,
                                Object:   v,
                        }
                        i++
@@ -385,7 +385,7 @@ func (c *KvCacher) filterCreateOrUpdate(store 
map[string]*mvccpb.KeyValue, newSt
                block[i] = KvEvent{
                        Revision: rev,
                        Type:     proto.EVT_UPDATE,
-                       Prefix:   c.Cfg.Key,
+                       Prefix:   c.Cfg.Prefix,
                        Object:   v,
                }
                i++
@@ -492,8 +492,8 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
        }
 }
 
-func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher {
-       cfg := DefaultKvCacherConfig()
+func NewKvCacher(name string, opts ...ConfigOption) *KvCacher {
+       cfg := DefaultConfig()
        for _, opt := range opts {
                opt(&cfg)
        }
@@ -504,7 +504,7 @@ func NewKvCacher(name string, opts ...KvCacherCfgOption) 
*KvCacher {
                ready: make(chan struct{}),
                lw: ListWatcher{
                        Client: backend.Registry(),
-                       Key:    cfg.Key,
+                       Prefix: cfg.Prefix,
                },
                goroutine: util.NewGo(context.Background()),
        }
diff --git a/server/core/backend/store/cacher.go 
b/server/core/backend/store/cacher.go
index 94fbe46..898964c 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -24,6 +24,7 @@ type Cache interface {
 }
 
 type Cacher interface {
+       // Name is the cache size metric name
        Name() string
        Cache() Cache
        Run()
diff --git a/server/core/backend/store/common.go 
b/server/core/backend/store/common.go
index fc04898..0de9e05 100644
--- a/server/core/backend/store/common.go
+++ b/server/core/backend/store/common.go
@@ -25,12 +25,17 @@ import (
 type StoreType int
 
 func (st StoreType) String() string {
+       if int(st) < 0 {
+               return "NONEXIST"
+       }
        if int(st) < len(TypeNames) {
                return TypeNames[st]
        }
        return "TYPE" + strconv.Itoa(int(st))
 }
 
+const NONEXIST = StoreType(-1)
+
 const (
        DOMAIN StoreType = iota
        PROJECT
@@ -48,25 +53,25 @@ const (
        INSTANCE
        LEASE
        ENDPOINTS
-       typeEnd
+       typeEnd // end of the base store types
 )
 
 var TypeNames = []string{
-       SERVICE:          "SERVICE",
-       INSTANCE:         "INSTANCE",
        DOMAIN:           "DOMAIN",
-       SCHEMA:           "SCHEMA",
-       SCHEMA_SUMMARY:   "SCHEMA_SUMMARY",
-       RULE:             "RULE",
-       LEASE:            "LEASE",
+       PROJECT:          "PROJECT",
+       SERVICE:          "SERVICE",
        SERVICE_INDEX:    "SERVICE_INDEX",
        SERVICE_ALIAS:    "SERVICE_ALIAS",
        SERVICE_TAG:      "SERVICE_TAG",
+       RULE:             "RULE",
        RULE_INDEX:       "RULE_INDEX",
        DEPENDENCY:       "DEPENDENCY",
        DEPENDENCY_RULE:  "DEPENDENCY_RULE",
        DEPENDENCY_QUEUE: "DEPENDENCY_QUEUE",
-       PROJECT:          "PROJECT",
+       SCHEMA:           "SCHEMA",
+       SCHEMA_SUMMARY:   "SCHEMA_SUMMARY",
+       INSTANCE:         "INSTANCE",
+       LEASE:            "LEASE",
        ENDPOINTS:        "ENDPOINTS",
 }
 
diff --git a/server/core/backend/store/opt.go 
b/server/core/backend/store/config.go
similarity index 55%
rename from server/core/backend/store/opt.go
rename to server/core/backend/store/config.go
index 59fbabb..9f0b2dd 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/config.go
@@ -18,11 +18,12 @@ package store
 
 import (
        "fmt"
+       "golang.org/x/net/context"
        "time"
 )
 
-type KvCacherCfg struct {
-       Key                string
+type Config struct {
+       Prefix             string
        InitSize           int
        NoEventMaxInterval int
        Timeout            time.Duration
@@ -31,43 +32,52 @@ type KvCacherCfg struct {
        DeferHandler       DeferHandler
 }
 
-func (cfg KvCacherCfg) String() string {
-       return fmt.Sprintf("{key: %s, timeout: %s, period: %s}",
-               cfg.Key, cfg.Timeout, cfg.Period)
+func (cfg Config) String() string {
+       return fmt.Sprintf("{prefix: %s, timeout: %s, period: %s}",
+               cfg.Prefix, cfg.Timeout, cfg.Period)
 }
 
-type KvCacherCfgOption func(*KvCacherCfg)
+type ConfigOption func(*Config)
 
-func WithKey(key string) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.Key = key }
+func WithPrefix(key string) ConfigOption {
+       return func(cfg *Config) { cfg.Prefix = key }
 }
 
-func WithInitSize(size int) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.InitSize = size }
+func WithInitSize(size int) ConfigOption {
+       return func(cfg *Config) { cfg.InitSize = size }
 }
 
-func WithTimeout(ot time.Duration) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.Timeout = ot }
+func WithTimeout(ot time.Duration) ConfigOption {
+       return func(cfg *Config) { cfg.Timeout = ot }
 }
 
-func WithPeriod(ot time.Duration) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.Period = ot }
+func WithPeriod(ot time.Duration) ConfigOption {
+       return func(cfg *Config) { cfg.Period = ot }
 }
 
-func WithEventFunc(f KvEventFunc) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.OnEvent = f }
+func WithEventFunc(f KvEventFunc) ConfigOption {
+       return func(cfg *Config) { cfg.OnEvent = f }
 }
 
-func WithDeferHandler(h DeferHandler) KvCacherCfgOption {
-       return func(cfg *KvCacherCfg) { cfg.DeferHandler = h }
+func WithDeferHandler(h DeferHandler) ConfigOption {
+       return func(cfg *Config) { cfg.DeferHandler = h }
 }
 
-func DefaultKvCacherConfig() KvCacherCfg {
-       return KvCacherCfg{
-               Key:                "/",
+func DefaultConfig() Config {
+       return Config{
+               Prefix:             "/",
                Timeout:            DEFAULT_LISTWATCH_TIMEOUT,
                Period:             time.Second,
                NoEventMaxInterval: DEFAULT_MAX_NO_EVENT_INTERVAL,
                InitSize:           DEFAULT_CACHE_INIT_SIZE,
        }
 }
+
+type ListWatchConfig struct {
+       Timeout time.Duration
+       Context context.Context
+}
+
+func (lo *ListWatchConfig) String() string {
+       return fmt.Sprintf("{timeout: %s}", lo.Timeout)
+}
diff --git a/server/core/backend/store/event.go 
b/server/core/backend/store/event.go
index 504aba1..67714a0 100644
--- a/server/core/backend/store/event.go
+++ b/server/core/backend/store/event.go
@@ -18,22 +18,8 @@ package store
 
 import (
        
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
-       "sync"
 )
 
-var (
-       evtProxies map[StoreType]*KvEventProxy
-)
-
-func init() {
-       evtProxies = make(map[StoreType]*KvEventProxy, typeEnd)
-       for i := StoreType(0); i != typeEnd; i++ {
-               evtProxies[i] = &KvEventProxy{
-                       evtHandleFuncs: make([]KvEventFunc, 0, 5),
-               }
-       }
-}
-
 type KvEventFunc func(evt KvEvent)
 
 type KvEvent struct {
@@ -48,29 +34,6 @@ type KvEventHandler interface {
        OnEvent(evt KvEvent)
 }
 
-type KvEventProxy struct {
-       evtHandleFuncs []KvEventFunc
-       lock           sync.RWMutex
-}
-
-func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) {
-       h.lock.Lock()
-       h.evtHandleFuncs = append(h.evtHandleFuncs, f)
-       h.lock.Unlock()
-}
-
-func (h *KvEventProxy) OnEvent(evt KvEvent) {
-       h.lock.RLock()
-       for _, f := range h.evtHandleFuncs {
-               f(evt)
-       }
-       h.lock.RUnlock()
-}
-
-func EventProxy(t StoreType) *KvEventProxy {
-       return evtProxies[t]
-}
-
 // the event handler/func must be good performance, or will block the event 
bus.
 func AddEventHandleFunc(t StoreType, f KvEventFunc) {
        EventProxy(t).AddHandleFunc(f)
diff --git a/server/core/backend/store/event.go 
b/server/core/backend/store/event_proxy.go
similarity index 62%
copy from server/core/backend/store/event.go
copy to server/core/backend/store/event_proxy.go
index 504aba1..e66583d 100644
--- a/server/core/backend/store/event.go
+++ b/server/core/backend/store/event_proxy.go
@@ -16,38 +16,19 @@
  */
 package store
 
-import (
-       
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
-       "sync"
-)
+import "sync"
 
 var (
-       evtProxies map[StoreType]*KvEventProxy
+       EventProxies map[StoreType]*KvEventProxy
 )
 
 func init() {
-       evtProxies = make(map[StoreType]*KvEventProxy, typeEnd)
+       EventProxies = make(map[StoreType]*KvEventProxy, typeEnd)
        for i := StoreType(0); i != typeEnd; i++ {
-               evtProxies[i] = &KvEventProxy{
-                       evtHandleFuncs: make([]KvEventFunc, 0, 5),
-               }
+               EventProxies[i] = NewEventProxy()
        }
 }
 
-type KvEventFunc func(evt KvEvent)
-
-type KvEvent struct {
-       Revision int64
-       Type     proto.EventType
-       Prefix   string
-       Object   interface{}
-}
-
-type KvEventHandler interface {
-       Type() StoreType
-       OnEvent(evt KvEvent)
-}
-
 type KvEventProxy struct {
        evtHandleFuncs []KvEventFunc
        lock           sync.RWMutex
@@ -67,15 +48,12 @@ func (h *KvEventProxy) OnEvent(evt KvEvent) {
        h.lock.RUnlock()
 }
 
-func EventProxy(t StoreType) *KvEventProxy {
-       return evtProxies[t]
-}
-
-// the event handler/func must be good performance, or will block the event 
bus.
-func AddEventHandleFunc(t StoreType, f KvEventFunc) {
-       EventProxy(t).AddHandleFunc(f)
+func NewEventProxy() *KvEventProxy {
+       return &KvEventProxy{
+               evtHandleFuncs: make([]KvEventFunc, 0, 5),
+       }
 }
 
-func AddEventHandler(h KvEventHandler) {
-       AddEventHandleFunc(h.Type(), h.OnEvent)
+func EventProxy(t StoreType) *KvEventProxy {
+       return EventProxies[t]
 }
diff --git a/server/core/backend/store/extend.go 
b/server/core/backend/store/extend.go
new file mode 100644
index 0000000..bffd0c1
--- /dev/null
+++ b/server/core/backend/store/extend.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+       "errors"
+       "fmt"
+)
+
+type Entity interface {
+       Name() string
+       Prefix() string
+       InitSize() int
+}
+
+type entity struct {
+       name     string
+       prefix   string
+       initSize int
+}
+
+func (e *entity) Name() string {
+       return e.name
+}
+
+func (e *entity) Prefix() string {
+       return e.prefix
+}
+
+func (e *entity) InitSize() int {
+       return e.initSize
+}
+
+func InstallType(e Entity) (id StoreType, err error) {
+       if e == nil {
+               return NONEXIST, errors.New("invalid parameter")
+       }
+       for _, n := range TypeNames {
+               if n == e.Name() {
+                       return NONEXIST, fmt.Errorf("redeclare store type 
'%s'", n)
+               }
+       }
+       for _, r := range TypeRoots {
+               if r == e.Prefix() {
+                       return NONEXIST, fmt.Errorf("redeclare store root 
'%s'", r)
+               }
+       }
+
+       TypeNames = append(TypeNames, e.Name())
+       id = StoreType(len(TypeNames) + 1) // +1 for typeEnd
+
+       TypeRoots[id] = e.Prefix()
+       TypeInitSize[id] = e.InitSize()
+
+       EventProxies[id] = NewEventProxy()
+       return
+}
+
+func NewEntity(name, prefix string, opts ...ConfigOption) Entity {
+       cfg := DefaultConfig()
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+       cfg.Prefix = prefix
+       return &entity{
+               name:     name,
+               prefix:   cfg.Prefix,
+               initSize: cfg.InitSize,
+       }
+}
diff --git a/server/core/backend/store/cacher.go 
b/server/core/backend/store/extend_test.go
similarity index 64%
copy from server/core/backend/store/cacher.go
copy to server/core/backend/store/extend_test.go
index 94fbe46..8359dd2 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/extend_test.go
@@ -16,17 +16,34 @@
  */
 package store
 
-type Cache interface {
-       Version() int64
-       Data(interface{}) interface{}
-       Have(interface{}) bool
-       Size() int
+import "testing"
+
+type extend struct {
+}
+
+func (e *extend) Name() string {
+       return "test"
+}
+
+func (e *extend) Prefix() string {
+       return "/test"
+}
+
+func (e *extend) InitSize() int {
+       return 0
 }
 
-type Cacher interface {
-       Name() string
-       Cache() Cache
-       Run()
-       Stop()
-       Ready() <-chan struct{}
+func TestInstallType(t *testing.T) {
+       id, err := InstallType(&extend{})
+       if err != nil {
+               t.Fatal(err)
+       }
+       if id == NONEXIST {
+               t.Fatal(err)
+       }
+
+       id, err = InstallType(&extend{})
+       if id != NONEXIST || err == nil {
+               t.Fatal("InstallType fail", err)
+       }
 }
diff --git a/server/core/backend/store/indexer.go 
b/server/core/backend/store/indexer.go
index 3f8cd7d..2b35a97 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -28,23 +28,16 @@ import (
        "time"
 )
 
-var defaultRootKeys map[string]struct{}
-
-func init() {
-       defaultRootKeys = make(map[string]struct{}, len(defaultRootKeys))
-       for _, root := range TypeRoots {
-               defaultRootKeys[root] = struct{}{}
-       }
-}
-
 type Indexer struct {
-       BuildTimeout     time.Duration
+       BuildTimeout time.Duration
+       Root         string
+
        cacher           Cacher
-       prefixIndex      map[string]map[string]struct{}
-       prefixLock       sync.RWMutex
-       prefixBuildQueue chan KvEvent
        goroutine        *util.GoRoutine
        ready            chan struct{}
+       prefixIndex      map[string]map[string]struct{}
+       prefixBuildQueue chan KvEvent
+       prefixLock       sync.RWMutex
        isClose          bool
 }
 
@@ -266,8 +259,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix 
string) (count int) {
 }
 
 func (i *Indexer) addPrefixKey(prefix, key string) {
-       _, ok := defaultRootKeys[key]
-       if ok {
+       if i.Root == key {
                return
        }
 
@@ -342,9 +334,10 @@ func (i *Indexer) Ready() <-chan struct{} {
        return i.ready
 }
 
-func NewCacheIndexer(cr Cacher) *Indexer {
+func NewCacheIndexer(root string, cr Cacher) *Indexer {
        return &Indexer{
                BuildTimeout:     DEFAULT_ADD_QUEUE_TIMEOUT,
+               Root:             root,
                cacher:           cr,
                prefixIndex:      make(map[string]map[string]struct{}, 
DEFAULT_CACHE_INIT_SIZE),
                prefixBuildQueue: make(chan KvEvent, DEFAULT_MAX_EVENT_COUNT),
diff --git a/server/core/backend/store/listwatch.go 
b/server/core/backend/store/listwatch.go
index 3cf1fe7..42fbde5 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -24,30 +24,20 @@ import (
        "github.com/coreos/etcd/mvcc/mvccpb"
        "golang.org/x/net/context"
        "sync"
-       "time"
 )
 
-type ListOptions struct {
-       Timeout time.Duration
-       Context context.Context
-}
-
-func (lo *ListOptions) String() string {
-       return fmt.Sprintf("{timeout: %s}", lo.Timeout)
-}
-
 type ListWatcher struct {
        Client registry.Registry
-       Key    string
+       Prefix string
 
        rev int64
 }
 
-func (lw *ListWatcher) List(op ListOptions) ([]*mvccpb.KeyValue, error) {
+func (lw *ListWatcher) List(op ListWatchConfig) ([]*mvccpb.KeyValue, error) {
        otCtx, _ := context.WithTimeout(op.Context, op.Timeout)
-       resp, err := lw.Client.Do(otCtx, 
registry.WatchPrefixOpOptions(lw.Key)...)
+       resp, err := lw.Client.Do(otCtx, 
registry.WatchPrefixOpOptions(lw.Prefix)...)
        if err != nil {
-               util.Logger().Errorf(err, "list key %s failed, rev: %d->0", 
lw.Key, lw.Revision())
+               util.Logger().Errorf(err, "list prefix %s failed, rev: %d->0", 
lw.Prefix, lw.Revision())
                lw.setRevision(0)
                return nil, err
        }
@@ -66,14 +56,14 @@ func (lw *ListWatcher) setRevision(rev int64) {
        lw.rev = rev
 }
 
-func (lw *ListWatcher) Watch(op ListOptions) *Watcher {
+func (lw *ListWatcher) Watch(op ListWatchConfig) *Watcher {
        return newWatcher(lw, op)
 }
 
 func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) 
error {
        rev := lw.Revision()
        opts := append(
-               registry.WatchPrefixOpOptions(lw.Key),
+               registry.WatchPrefixOpOptions(lw.Prefix),
                registry.WithRev(rev+1),
                registry.WithWatchCallback(
                        func(message string, resp *registry.PluginResponse) 
error {
@@ -81,13 +71,13 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f 
func(evt []KvEvent)) error
                                        return fmt.Errorf("unknown event %s", 
resp)
                                }
 
-                               util.Logger().Infof("watch prefix key %s, start 
rev %d+1, event: %s", lw.Key, rev, resp)
+                               util.Logger().Infof("watch prefix %s, start rev 
%d+1, event: %s", lw.Prefix, rev, resp)
 
                                lw.setRevision(resp.Revision)
 
                                evts := make([]KvEvent, len(resp.Kvs))
                                for i, kv := range resp.Kvs {
-                                       evt := KvEvent{Prefix: lw.Key, 
Revision: kv.ModRevision}
+                                       evt := KvEvent{Prefix: lw.Prefix, 
Revision: kv.ModRevision}
                                        switch {
                                        case resp.Action == registry.Put && 
kv.Version == 1:
                                                evt.Type, evt.Object = 
proto.EVT_CREATE, kv
@@ -106,16 +96,16 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f 
func(evt []KvEvent)) error
 
        err := lw.Client.Watch(ctx, opts...)
        if err != nil { // compact可能会导致watch失败 or message body size lager than 
4MB
-               util.Logger().Errorf(err, "watch key %s failed, start rev: 
%d+1->%d->0", lw.Key, rev, lw.Revision())
+               util.Logger().Errorf(err, "watch prefix %s failed, start rev: 
%d+1->%d->0", lw.Prefix, rev, lw.Revision())
 
                lw.setRevision(0)
-               f([]KvEvent{errEvent(lw.Key, err)})
+               f([]KvEvent{errEvent(lw.Prefix, err)})
        }
        return err
 }
 
 type Watcher struct {
-       ListOps ListOptions
+       ListOps ListWatchConfig
        lw      *ListWatcher
        bus     chan []KvEvent
        stopCh  chan struct{}
@@ -169,7 +159,7 @@ func errEvent(key string, err error) KvEvent {
        }
 }
 
-func newWatcher(lw *ListWatcher, listOps ListOptions) *Watcher {
+func newWatcher(lw *ListWatcher, listOps ListWatchConfig) *Watcher {
        w := &Watcher{
                ListOps: listOps,
                lw:      lw,
diff --git a/server/core/backend/store/store.go 
b/server/core/backend/store/store.go
index e549cd4..6684946 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -61,7 +61,7 @@ func (s *KvStore) dispatchEvent(t StoreType, evt KvEvent) {
 }
 
 func (s *KvStore) newIndexBuilder(t StoreType, cacher Cacher) {
-       indexer := NewCacheIndexer(cacher)
+       indexer := NewCacheIndexer(TypeRoots[t], cacher)
        s.indexers[t] = indexer
        indexer.Run()
 }
@@ -71,7 +71,7 @@ func (s *KvStore) Run() {
        s.taskService.Run()
 }
 
-func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts 
[]KvCacherCfgOption) {
+func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []ConfigOption) {
        switch t {
        case INSTANCE:
                opts = append(opts, 
WithDeferHandler(s.SelfPreservationHandler()))
@@ -81,7 +81,7 @@ func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts 
[]KvCacherCfgOption)
                opts = append(opts, WithInitSize(sz))
        }
        opts = append(opts,
-               WithKey(TypeRoots[t]),
+               WithPrefix(TypeRoots[t]),
                WithEventFunc(func(evt KvEvent) { s.dispatchEvent(t, evt) }))
        return
 }
@@ -246,6 +246,33 @@ func (s *KvStore) KeepAlive(ctx context.Context, opts 
...registry.PluginOpOption
        return pt.TTL, pt.Err()
 }
 
+func (s *KvStore) Entity(id StoreType) *Indexer {
+       return s.indexers[id]
+}
+
+func (s *KvStore) Install(e Entity) (id StoreType, err error) {
+       if id, err = InstallType(e); err != nil {
+               return
+       }
+
+       util.Logger().Infof("install new store entity %d:%s->%s", id, e.Name(), 
e.Prefix())
+
+       if !core.ServerInfo.Config.EnableCache {
+               s.newIndexBuilder(id, NullCacher)
+               return
+       }
+       s.newIndexBuilder(id, NewKvCacher(id.String(), 
s.getKvCacherCfgOptions(id)...))
+       return
+}
+
+func (s *KvStore) MustInstall(e Entity) StoreType {
+       id, err := s.Install(e)
+       if err != nil {
+               panic(err)
+       }
+       return id
+}
+
 func Store() *KvStore {
        return store
 }
diff --git a/server/error/error.go b/server/error/error.go
index 5ee5bf1..b30356e 100644
--- a/server/error/error.go
+++ b/server/error/error.go
@@ -18,8 +18,6 @@ package error
 
 import (
        "encoding/json"
-       "fmt"
-       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        "net/http"
 )
 
@@ -104,9 +102,9 @@ func (e Error) Error() string {
        return e.Message + "(" + e.Detail + ")"
 }
 
-func (e Error) toJson() string {
+func (e Error) Marshal() []byte {
        bs, _ := json.Marshal(e)
-       return util.BytesToStringWithNoCopy(bs)
+       return bs
 }
 
 func (e Error) StatusCode() int {
@@ -123,14 +121,6 @@ func (e Error) InternalError() bool {
        return false
 }
 
-func (e Error) HttpWrite(w http.ResponseWriter) {
-       status := e.StatusCode()
-       w.Header().Add("X-Response-Status", fmt.Sprint(status))
-       w.Header().Set("Content-Type", "application/json; charset=UTF-8")
-       w.WriteHeader(status)
-       fmt.Fprintln(w, e.toJson())
-}
-
 func NewError(code int32, detail string) *Error {
        return &Error{
                Code:    code,
diff --git a/server/rest/controller/rest_util.go 
b/server/rest/controller/rest_util.go
index be626d5..00f93f7 100644
--- a/server/rest/controller/rest_util.go
+++ b/server/rest/controller/rest_util.go
@@ -25,15 +25,23 @@ import (
        "net/http"
 )
 
+const (
+       contentTypeJson = "application/json; charset=UTF-8"
+       contentTypeText = "text/plain; charset=UTF-8"
+)
+
 func WriteError(w http.ResponseWriter, code int32, detail string) {
        err := error.NewError(code, detail)
-       err.HttpWrite(w)
+       w.Header().Add("X-Response-Status", fmt.Sprint(err.StatusCode()))
+       w.Header().Set("Content-Type", contentTypeJson)
+       w.WriteHeader(err.StatusCode())
+       fmt.Fprintln(w, util.BytesToStringWithNoCopy(err.Marshal()))
 }
 
 func WriteJsonObject(w http.ResponseWriter, obj interface{}) {
        if obj == nil {
                w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
-               w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
+               w.Header().Set("Content-Type", contentTypeText)
                w.WriteHeader(http.StatusOK)
                return
        }
@@ -44,7 +52,7 @@ func WriteJsonObject(w http.ResponseWriter, obj interface{}) {
                return
        }
        w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
-       w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+       w.Header().Set("Content-Type", contentTypeJson)
        w.WriteHeader(http.StatusOK)
        fmt.Fprintln(w, util.BytesToStringWithNoCopy(objJson))
 }
@@ -61,11 +69,10 @@ func WriteResponse(w http.ResponseWriter, resp 
*pb.Response, obj interface{}) {
 func WriteBytes(w http.ResponseWriter, resp *pb.Response, json []byte) {
        if resp.GetCode() == pb.Response_SUCCESS {
                w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK))
-               w.Header().Set("Content-Type", "application/json; 
charset=UTF-8")
+               w.Header().Set("Content-Type", contentTypeJson)
                w.WriteHeader(http.StatusOK)
                w.Write(json)
                return
        }
        WriteError(w, resp.GetCode(), resp.GetMessage())
 }
-
diff --git a/server/rest/controller/v3/main_controller.go 
b/server/rest/controller/v3/main_controller.go
index 5bb4410..e20e323 100644
--- a/server/rest/controller/v3/main_controller.go
+++ b/server/rest/controller/v3/main_controller.go
@@ -19,13 +19,29 @@ package v3
 import (
        "encoding/json"
        "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
+       pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       
"github.com/apache/incubator-servicecomb-service-center/server/rest/controller"
        
"github.com/apache/incubator-servicecomb-service-center/server/rest/controller/v4"
        "github.com/apache/incubator-servicecomb-service-center/version"
        "net/http"
 )
 
+var (
+       versionJsonCache []byte
+       versionResp      *pb.Response
+)
+
 const API_VERSION = "3.0.0"
 
+func init() {
+       result := v4.Result{
+               VersionSet: version.Ver(),
+               ApiVersion: API_VERSION,
+       }
+       versionJsonCache, _ = json.Marshal(result)
+       versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version 
successfully")
+}
+
 type MainService struct {
        v4.MainService
 }
@@ -38,11 +54,5 @@ func (this *MainService) URLPatterns() []rest.Route {
 }
 
 func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
-       result := v4.Result{
-               VersionSet: version.Ver(),
-               ApiVersion: API_VERSION,
-       }
-       resultJSON, _ := json.Marshal(result)
-       w.Header().Set("Content-Type", "application/json;charset=utf-8")
-       w.Write(resultJSON)
+       controller.WriteBytes(w, versionResp, versionJsonCache)
 }
diff --git a/server/rest/controller/v4/main_controller.go 
b/server/rest/controller/v4/main_controller.go
index d2af4d4..cbf5ed5 100644
--- a/server/rest/controller/v4/main_controller.go
+++ b/server/rest/controller/v4/main_controller.go
@@ -28,7 +28,10 @@ import (
        "net/http"
 )
 
-var resultJSON []byte
+var (
+       versionJsonCache []byte
+       versionResp      *pb.Response
+)
 
 const API_VERSION = "4.0.0"
 
@@ -48,7 +51,8 @@ func init() {
                API_VERSION,
                core.ServerInfo.Config,
        }
-       resultJSON, _ = json.Marshal(result)
+       versionJsonCache, _ = json.Marshal(result)
+       versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version 
successfully")
 }
 
 func (this *MainService) URLPatterns() []rest.Route {
@@ -72,6 +76,5 @@ func (this *MainService) ClusterHealth(w http.ResponseWriter, 
r *http.Request) {
 }
 
 func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) {
-       w.Header().Set("Content-Type", "application/json;charset=utf-8")
-       w.Write(resultJSON)
+       controller.WriteBytes(w, versionResp, versionJsonCache)
 }
diff --git a/server/service/schema.go b/server/service/schema.go
index 3d7da42..dd9c4d9 100644
--- a/server/service/schema.go
+++ b/server/service/schema.go
@@ -122,7 +122,6 @@ func (s *MicroServiceService) GetAllSchemaInfo(ctx 
context.Context, in *pb.GetAl
 
        schemasList := service.Schemas
        if schemasList == nil || len(schemasList) == 0 {
-               util.Logger().Infof("service %s schemaId set is empty.", 
in.ServiceId)
                return &pb.GetAllSchemaResponse{
                        Response: pb.CreateResponse(pb.Response_SUCCESS, "Do 
not have this schema info."),
                        Schemas:  []*pb.Schema{},

-- 
To stop receiving notification emails like this one, please contact
little...@apache.org.

Reply via email to