This is an automated email from the ASF dual-hosted git repository. littlecui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new 37ae980 SCB-544 Convenient store extension (#339) 37ae980 is described below commit 37ae980a8d01a88194c9edf02707e5345e810f5e Author: little-cui <sure_0...@qq.com> AuthorDate: Mon May 7 15:47:45 2018 +0800 SCB-544 Convenient store extension (#339) * SCB-544 Convenient store extension (cherry picked from commit d2dbe7c) * SCB-544 Convenient store extension --- server/broker/broker_suite_test.go | 45 ++++++ server/broker/service.go | 2 +- server/broker/service_test.go | 3 - server/broker/store.go | 171 ++++----------------- server/core/backend/store/cache_kv.go | 34 ++-- server/core/backend/store/cacher.go | 1 + server/core/backend/store/common.go | 21 ++- server/core/backend/store/{opt.go => config.go} | 52 ++++--- server/core/backend/store/event.go | 37 ----- .../backend/store/{event.go => event_proxy.go} | 42 ++--- server/core/backend/store/extend.go | 84 ++++++++++ .../backend/store/{cacher.go => extend_test.go} | 39 +++-- server/core/backend/store/indexer.go | 25 ++- server/core/backend/store/listwatch.go | 34 ++-- server/core/backend/store/store.go | 33 +++- server/error/error.go | 14 +- server/rest/controller/rest_util.go | 17 +- server/rest/controller/v3/main_controller.go | 24 ++- server/rest/controller/v4/main_controller.go | 11 +- server/service/schema.go | 1 - 20 files changed, 353 insertions(+), 337 deletions(-) diff --git a/server/broker/broker_suite_test.go b/server/broker/broker_suite_test.go new file mode 100644 index 0000000..8bc652e --- /dev/null +++ b/server/broker/broker_suite_test.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package broker + +import ( + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/quota/buildin" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/registry/etcd" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin" + "github.com/apache/incubator-servicecomb-service-center/server/service" + . "github.com/onsi/ginkgo" + "github.com/onsi/ginkgo/reporters" + . "github.com/onsi/gomega" + "testing" +) + +var serviceResource pb.ServiceCtrlServer +var instanceResource pb.SerivceInstanceCtrlServerEx +var brokerResource = BrokerServiceAPI + +var _ = BeforeSuite(func() { + //init plugin + serviceResource, instanceResource = service.AssembleResources() +}) + +func TestBroker(t *testing.T) { + RegisterFailHandler(Fail) + junitReporter := reporters.NewJUnitReporter("model.junit.xml") + RunSpecsWithDefaultAndCustomReporters(t, "model Suite", []Reporter{junitReporter}) +} diff --git a/server/broker/service.go b/server/broker/service.go index 3ae63c6..f4ea526 100644 --- a/server/broker/service.go +++ b/server/broker/service.go @@ -62,7 +62,7 @@ func (*BrokerService) GetPactsOfProvider(ctx context.Context, PactLogger.Errorf(nil, "Get pacts of provider failed: %s\n", resp.Response.Message) return &GetProviderConsumerVersionPactResponse{ - Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()), + Response: resp.GetResponse(), }, err } diff --git a/server/broker/service_test.go b/server/broker/service_test.go index 516b8c3..ac910a1 100644 --- a/server/broker/service_test.go +++ b/server/broker/service_test.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/util" - "github.com/apache/incubator-servicecomb-service-center/server/core" pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -41,8 +40,6 @@ const ( TEST_BROKER_PROVIDER_APP = "broker_group_provider" ) -var brokerResource = BrokerServiceAPI -var serviceResource = core.ServiceAPI var consumerServiceId string var providerServiceId string diff --git a/server/broker/store.go b/server/broker/store.go index 7ed9c02..3bfe982 100644 --- a/server/broker/store.go +++ b/server/broker/store.go @@ -17,166 +17,63 @@ package broker import ( - "sync" - - "github.com/apache/incubator-servicecomb-service-center/pkg/util" - sstore "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store" - "golang.org/x/net/context" + "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store" ) -const ( - PARTICIPANT sstore.StoreType = iota - VERSION - PACT - PACT_VERSION - PACT_TAG - VERIFICATION - PACT_LATEST - typeEnd +var ( + PARTICIPANT store.StoreType + VERSION store.StoreType + PACT store.StoreType + PACT_VERSION store.StoreType + PACT_TAG store.StoreType + VERIFICATION store.StoreType + PACT_LATEST store.StoreType ) -var TypeNames = []string{ - PARTICIPANT: "PARTICIPANT", - VERSION: "VERSION", - PACT: "PACT", - PACT_VERSION: "PACT_VERSION", - PACT_TAG: "PACT_TAG", - VERIFICATION: "VERIFICATION", - PACT_LATEST: "PACT_LATEST", -} - -var TypeRoots = map[sstore.StoreType]string{ - PARTICIPANT: GetBrokerParticipantKey(""), - VERSION: GetBrokerVersionKey(""), - PACT: GetBrokerPactKey(""), - PACT_VERSION: GetBrokerPactVersionKey(""), - PACT_TAG: GetBrokerTagKey(""), - VERIFICATION: GetBrokerVerificationKey(""), - PACT_LATEST: GetBrokerLatestKey(""), -} - -var store = &BKvStore{} - -func Store() *BKvStore { - return store -} - -func (s *BKvStore) StoreSize(t sstore.StoreType) int { - return 100 -} - -func (s *BKvStore) newStore(t sstore.StoreType, opts ...sstore.KvCacherCfgOption) { - opts = append(opts, - sstore.WithKey(TypeRoots[t]), - sstore.WithInitSize(s.StoreSize(t)), - ) - s.newIndexer(t, sstore.NewKvCacher(t.String(), opts...)) -} - -func (s *BKvStore) store(ctx context.Context) { - for t := sstore.StoreType(0); t != typeEnd; t++ { - s.newStore(t) - } - for _, i := range s.bindexers { - select { - case <-ctx.Done(): - return - case <-i.Ready(): - } - } - util.SafeCloseChan(s.bready) - - util.Logger().Debugf("all indexers are ready") -} +var brokerKvStore = &BKvStore{} func init() { - store.Initialize() - store.Run() - store.Ready() -} + PARTICIPANT = store.Store().MustInstall(store.NewEntity("PARTICIPANT", GetBrokerParticipantKey(""))) + VERSION = store.Store().MustInstall(store.NewEntity("VERSION", GetBrokerVersionKey(""))) + PACT = store.Store().MustInstall(store.NewEntity("PACT", GetBrokerPactKey(""))) + PACT_VERSION = store.Store().MustInstall(store.NewEntity("PACT_VERSION", GetBrokerPactVersionKey(""))) + PACT_TAG = store.Store().MustInstall(store.NewEntity("PACT_TAG", GetBrokerTagKey(""))) + VERIFICATION = store.Store().MustInstall(store.NewEntity("VERIFICATION", GetBrokerVerificationKey(""))) + PACT_LATEST = store.Store().MustInstall(store.NewEntity("PACT_LATEST", GetBrokerLatestKey(""))) -type BKvStore struct { - *sstore.KvStore - bindexers map[sstore.StoreType]*sstore.Indexer - block sync.RWMutex - bready chan struct{} - bisClose bool -} - -func (s *BKvStore) Initialize() { - s.KvStore = sstore.Store() - s.KvStore.Initialize() - s.bindexers = make(map[sstore.StoreType]*sstore.Indexer) - s.bready = make(chan struct{}) - - for i := sstore.StoreType(0); i != typeEnd; i++ { - store.newNullStore(i) - } -} - -func (s *BKvStore) newNullStore(t sstore.StoreType) { - s.newIndexer(t, sstore.NullCacher) -} - -func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) { - indexer := sstore.NewCacheIndexer(cacher) - s.bindexers[t] = indexer - indexer.Run() } -func (s *BKvStore) Run() { - util.Go(func(ctx context.Context) { - s.store(ctx) - select { - case <-ctx.Done(): - s.Stop() - } - }) -} - -func (s *BKvStore) Ready() <-chan struct{} { - return s.bready +type BKvStore struct { } -func (s *BKvStore) Participant() *sstore.Indexer { - return s.bindexers[PARTICIPANT] +func (s *BKvStore) Participant() *store.Indexer { + return store.Store().Entity(PARTICIPANT) } -func (s *BKvStore) Version() *sstore.Indexer { - return s.bindexers[VERSION] +func (s *BKvStore) Version() *store.Indexer { + return store.Store().Entity(VERSION) } -func (s *BKvStore) Pact() *sstore.Indexer { - return s.bindexers[PACT] +func (s *BKvStore) Pact() *store.Indexer { + return store.Store().Entity(PACT) } -func (s *BKvStore) PactVersion() *sstore.Indexer { - return s.bindexers[PACT_VERSION] +func (s *BKvStore) PactVersion() *store.Indexer { + return store.Store().Entity(PACT_VERSION) } -func (s *BKvStore) PactTag() *sstore.Indexer { - return s.bindexers[PACT_TAG] +func (s *BKvStore) PactTag() *store.Indexer { + return store.Store().Entity(PACT_TAG) } -func (s *BKvStore) Verification() *sstore.Indexer { - return s.bindexers[VERIFICATION] +func (s *BKvStore) Verification() *store.Indexer { + return store.Store().Entity(VERIFICATION) } -func (s *BKvStore) PactLatest() *sstore.Indexer { - return s.bindexers[PACT_LATEST] +func (s *BKvStore) PactLatest() *store.Indexer { + return store.Store().Entity(PACT_LATEST) } -func (s *BKvStore) Stop() { - if s.bisClose { - return - } - s.bisClose = true - - for _, i := range s.bindexers { - i.Stop() - } - - util.SafeCloseChan(s.bready) - - util.Logger().Debugf("broker store daemon stopped") +func Store() *BKvStore { + return brokerKvStore } diff --git a/server/core/backend/store/cache_kv.go b/server/core/backend/store/cache_kv.go index fcced8f..9fb347e 100644 --- a/server/core/backend/store/cache_kv.go +++ b/server/core/backend/store/cache_kv.go @@ -95,7 +95,7 @@ func (c *KvCache) compact() { c.store = newCache util.Logger().Infof("cache %s is not in use over %s, compact capacity to size %d->%d", - c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size) + c.owner.Cfg.Prefix, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size) } @@ -107,7 +107,7 @@ func (c *KvCache) Size() (l int) { } type KvCacher struct { - Cfg KvCacherCfg + Cfg Config name string lastRev int64 @@ -143,13 +143,13 @@ func (c *KvCacher) needList() bool { } util.Logger().Debugf("no events come in more then %s, need to list key %s, rev: %d", - time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Key, rev) + time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Prefix, rev) c.noEventCount = 0 return true } -func (c *KvCacher) doList(listOps ListOptions) error { - kvs, err := c.lw.List(listOps) +func (c *KvCacher) doList(cfg ListWatchConfig) error { + kvs, err := c.lw.List(cfg) if err != nil { return err } @@ -161,13 +161,13 @@ func (c *KvCacher) doList(listOps ListOptions) error { util.Logger().Warnf(nil, "most of the protected data(%d/%d) are recovered", kc, c.cache.Size()) } c.sync(evts) - util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Key, len(kvs), c.lw.Revision()) + util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Prefix, len(kvs), c.lw.Revision()) return nil } -func (c *KvCacher) doWatch(listOps ListOptions) error { - watcher := c.lw.Watch(listOps) +func (c *KvCacher) doWatch(cfg ListWatchConfig) error { + watcher := c.lw.Watch(cfg) return c.handleWatcher(watcher) } @@ -175,19 +175,19 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) error { c.mux.Lock() defer c.mux.Unlock() - listOps := ListOptions{ + cfg := ListWatchConfig{ Timeout: c.Cfg.Timeout, Context: ctx, } if c.needList() { - if err := c.doList(listOps); err != nil { + if err := c.doList(cfg); err != nil { return err } } util.SafeCloseChan(c.ready) - return c.doWatch(listOps) + return c.doWatch(cfg) } func (c *KvCacher) handleWatcher(watcher *Watcher) error { @@ -336,7 +336,7 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[ block[i] = KvEvent{ Revision: rev, Type: proto.EVT_DELETE, - Prefix: c.Cfg.Key, + Prefix: c.Cfg.Prefix, Object: v, } i++ @@ -365,7 +365,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt block[i] = KvEvent{ Revision: rev, Type: proto.EVT_CREATE, - Prefix: c.Cfg.Key, + Prefix: c.Cfg.Prefix, Object: v, } i++ @@ -385,7 +385,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt block[i] = KvEvent{ Revision: rev, Type: proto.EVT_UPDATE, - Prefix: c.Cfg.Key, + Prefix: c.Cfg.Prefix, Object: v, } i++ @@ -492,8 +492,8 @@ func NewKvCache(c *KvCacher, size int) *KvCache { } } -func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher { - cfg := DefaultKvCacherConfig() +func NewKvCacher(name string, opts ...ConfigOption) *KvCacher { + cfg := DefaultConfig() for _, opt := range opts { opt(&cfg) } @@ -504,7 +504,7 @@ func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher { ready: make(chan struct{}), lw: ListWatcher{ Client: backend.Registry(), - Key: cfg.Key, + Prefix: cfg.Prefix, }, goroutine: util.NewGo(context.Background()), } diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go index 94fbe46..898964c 100644 --- a/server/core/backend/store/cacher.go +++ b/server/core/backend/store/cacher.go @@ -24,6 +24,7 @@ type Cache interface { } type Cacher interface { + // Name is the cache size metric name Name() string Cache() Cache Run() diff --git a/server/core/backend/store/common.go b/server/core/backend/store/common.go index fc04898..0de9e05 100644 --- a/server/core/backend/store/common.go +++ b/server/core/backend/store/common.go @@ -25,12 +25,17 @@ import ( type StoreType int func (st StoreType) String() string { + if int(st) < 0 { + return "NONEXIST" + } if int(st) < len(TypeNames) { return TypeNames[st] } return "TYPE" + strconv.Itoa(int(st)) } +const NONEXIST = StoreType(-1) + const ( DOMAIN StoreType = iota PROJECT @@ -48,25 +53,25 @@ const ( INSTANCE LEASE ENDPOINTS - typeEnd + typeEnd // end of the base store types ) var TypeNames = []string{ - SERVICE: "SERVICE", - INSTANCE: "INSTANCE", DOMAIN: "DOMAIN", - SCHEMA: "SCHEMA", - SCHEMA_SUMMARY: "SCHEMA_SUMMARY", - RULE: "RULE", - LEASE: "LEASE", + PROJECT: "PROJECT", + SERVICE: "SERVICE", SERVICE_INDEX: "SERVICE_INDEX", SERVICE_ALIAS: "SERVICE_ALIAS", SERVICE_TAG: "SERVICE_TAG", + RULE: "RULE", RULE_INDEX: "RULE_INDEX", DEPENDENCY: "DEPENDENCY", DEPENDENCY_RULE: "DEPENDENCY_RULE", DEPENDENCY_QUEUE: "DEPENDENCY_QUEUE", - PROJECT: "PROJECT", + SCHEMA: "SCHEMA", + SCHEMA_SUMMARY: "SCHEMA_SUMMARY", + INSTANCE: "INSTANCE", + LEASE: "LEASE", ENDPOINTS: "ENDPOINTS", } diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/config.go similarity index 55% rename from server/core/backend/store/opt.go rename to server/core/backend/store/config.go index 59fbabb..9f0b2dd 100644 --- a/server/core/backend/store/opt.go +++ b/server/core/backend/store/config.go @@ -18,11 +18,12 @@ package store import ( "fmt" + "golang.org/x/net/context" "time" ) -type KvCacherCfg struct { - Key string +type Config struct { + Prefix string InitSize int NoEventMaxInterval int Timeout time.Duration @@ -31,43 +32,52 @@ type KvCacherCfg struct { DeferHandler DeferHandler } -func (cfg KvCacherCfg) String() string { - return fmt.Sprintf("{key: %s, timeout: %s, period: %s}", - cfg.Key, cfg.Timeout, cfg.Period) +func (cfg Config) String() string { + return fmt.Sprintf("{prefix: %s, timeout: %s, period: %s}", + cfg.Prefix, cfg.Timeout, cfg.Period) } -type KvCacherCfgOption func(*KvCacherCfg) +type ConfigOption func(*Config) -func WithKey(key string) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.Key = key } +func WithPrefix(key string) ConfigOption { + return func(cfg *Config) { cfg.Prefix = key } } -func WithInitSize(size int) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.InitSize = size } +func WithInitSize(size int) ConfigOption { + return func(cfg *Config) { cfg.InitSize = size } } -func WithTimeout(ot time.Duration) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.Timeout = ot } +func WithTimeout(ot time.Duration) ConfigOption { + return func(cfg *Config) { cfg.Timeout = ot } } -func WithPeriod(ot time.Duration) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.Period = ot } +func WithPeriod(ot time.Duration) ConfigOption { + return func(cfg *Config) { cfg.Period = ot } } -func WithEventFunc(f KvEventFunc) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.OnEvent = f } +func WithEventFunc(f KvEventFunc) ConfigOption { + return func(cfg *Config) { cfg.OnEvent = f } } -func WithDeferHandler(h DeferHandler) KvCacherCfgOption { - return func(cfg *KvCacherCfg) { cfg.DeferHandler = h } +func WithDeferHandler(h DeferHandler) ConfigOption { + return func(cfg *Config) { cfg.DeferHandler = h } } -func DefaultKvCacherConfig() KvCacherCfg { - return KvCacherCfg{ - Key: "/", +func DefaultConfig() Config { + return Config{ + Prefix: "/", Timeout: DEFAULT_LISTWATCH_TIMEOUT, Period: time.Second, NoEventMaxInterval: DEFAULT_MAX_NO_EVENT_INTERVAL, InitSize: DEFAULT_CACHE_INIT_SIZE, } } + +type ListWatchConfig struct { + Timeout time.Duration + Context context.Context +} + +func (lo *ListWatchConfig) String() string { + return fmt.Sprintf("{timeout: %s}", lo.Timeout) +} diff --git a/server/core/backend/store/event.go b/server/core/backend/store/event.go index 504aba1..67714a0 100644 --- a/server/core/backend/store/event.go +++ b/server/core/backend/store/event.go @@ -18,22 +18,8 @@ package store import ( "github.com/apache/incubator-servicecomb-service-center/server/core/proto" - "sync" ) -var ( - evtProxies map[StoreType]*KvEventProxy -) - -func init() { - evtProxies = make(map[StoreType]*KvEventProxy, typeEnd) - for i := StoreType(0); i != typeEnd; i++ { - evtProxies[i] = &KvEventProxy{ - evtHandleFuncs: make([]KvEventFunc, 0, 5), - } - } -} - type KvEventFunc func(evt KvEvent) type KvEvent struct { @@ -48,29 +34,6 @@ type KvEventHandler interface { OnEvent(evt KvEvent) } -type KvEventProxy struct { - evtHandleFuncs []KvEventFunc - lock sync.RWMutex -} - -func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) { - h.lock.Lock() - h.evtHandleFuncs = append(h.evtHandleFuncs, f) - h.lock.Unlock() -} - -func (h *KvEventProxy) OnEvent(evt KvEvent) { - h.lock.RLock() - for _, f := range h.evtHandleFuncs { - f(evt) - } - h.lock.RUnlock() -} - -func EventProxy(t StoreType) *KvEventProxy { - return evtProxies[t] -} - // the event handler/func must be good performance, or will block the event bus. func AddEventHandleFunc(t StoreType, f KvEventFunc) { EventProxy(t).AddHandleFunc(f) diff --git a/server/core/backend/store/event.go b/server/core/backend/store/event_proxy.go similarity index 62% copy from server/core/backend/store/event.go copy to server/core/backend/store/event_proxy.go index 504aba1..e66583d 100644 --- a/server/core/backend/store/event.go +++ b/server/core/backend/store/event_proxy.go @@ -16,38 +16,19 @@ */ package store -import ( - "github.com/apache/incubator-servicecomb-service-center/server/core/proto" - "sync" -) +import "sync" var ( - evtProxies map[StoreType]*KvEventProxy + EventProxies map[StoreType]*KvEventProxy ) func init() { - evtProxies = make(map[StoreType]*KvEventProxy, typeEnd) + EventProxies = make(map[StoreType]*KvEventProxy, typeEnd) for i := StoreType(0); i != typeEnd; i++ { - evtProxies[i] = &KvEventProxy{ - evtHandleFuncs: make([]KvEventFunc, 0, 5), - } + EventProxies[i] = NewEventProxy() } } -type KvEventFunc func(evt KvEvent) - -type KvEvent struct { - Revision int64 - Type proto.EventType - Prefix string - Object interface{} -} - -type KvEventHandler interface { - Type() StoreType - OnEvent(evt KvEvent) -} - type KvEventProxy struct { evtHandleFuncs []KvEventFunc lock sync.RWMutex @@ -67,15 +48,12 @@ func (h *KvEventProxy) OnEvent(evt KvEvent) { h.lock.RUnlock() } -func EventProxy(t StoreType) *KvEventProxy { - return evtProxies[t] -} - -// the event handler/func must be good performance, or will block the event bus. -func AddEventHandleFunc(t StoreType, f KvEventFunc) { - EventProxy(t).AddHandleFunc(f) +func NewEventProxy() *KvEventProxy { + return &KvEventProxy{ + evtHandleFuncs: make([]KvEventFunc, 0, 5), + } } -func AddEventHandler(h KvEventHandler) { - AddEventHandleFunc(h.Type(), h.OnEvent) +func EventProxy(t StoreType) *KvEventProxy { + return EventProxies[t] } diff --git a/server/core/backend/store/extend.go b/server/core/backend/store/extend.go new file mode 100644 index 0000000..bffd0c1 --- /dev/null +++ b/server/core/backend/store/extend.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package store + +import ( + "errors" + "fmt" +) + +type Entity interface { + Name() string + Prefix() string + InitSize() int +} + +type entity struct { + name string + prefix string + initSize int +} + +func (e *entity) Name() string { + return e.name +} + +func (e *entity) Prefix() string { + return e.prefix +} + +func (e *entity) InitSize() int { + return e.initSize +} + +func InstallType(e Entity) (id StoreType, err error) { + if e == nil { + return NONEXIST, errors.New("invalid parameter") + } + for _, n := range TypeNames { + if n == e.Name() { + return NONEXIST, fmt.Errorf("redeclare store type '%s'", n) + } + } + for _, r := range TypeRoots { + if r == e.Prefix() { + return NONEXIST, fmt.Errorf("redeclare store root '%s'", r) + } + } + + TypeNames = append(TypeNames, e.Name()) + id = StoreType(len(TypeNames) + 1) // +1 for typeEnd + + TypeRoots[id] = e.Prefix() + TypeInitSize[id] = e.InitSize() + + EventProxies[id] = NewEventProxy() + return +} + +func NewEntity(name, prefix string, opts ...ConfigOption) Entity { + cfg := DefaultConfig() + for _, opt := range opts { + opt(&cfg) + } + cfg.Prefix = prefix + return &entity{ + name: name, + prefix: cfg.Prefix, + initSize: cfg.InitSize, + } +} diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/extend_test.go similarity index 64% copy from server/core/backend/store/cacher.go copy to server/core/backend/store/extend_test.go index 94fbe46..8359dd2 100644 --- a/server/core/backend/store/cacher.go +++ b/server/core/backend/store/extend_test.go @@ -16,17 +16,34 @@ */ package store -type Cache interface { - Version() int64 - Data(interface{}) interface{} - Have(interface{}) bool - Size() int +import "testing" + +type extend struct { +} + +func (e *extend) Name() string { + return "test" +} + +func (e *extend) Prefix() string { + return "/test" +} + +func (e *extend) InitSize() int { + return 0 } -type Cacher interface { - Name() string - Cache() Cache - Run() - Stop() - Ready() <-chan struct{} +func TestInstallType(t *testing.T) { + id, err := InstallType(&extend{}) + if err != nil { + t.Fatal(err) + } + if id == NONEXIST { + t.Fatal(err) + } + + id, err = InstallType(&extend{}) + if id != NONEXIST || err == nil { + t.Fatal("InstallType fail", err) + } } diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go index 3f8cd7d..2b35a97 100644 --- a/server/core/backend/store/indexer.go +++ b/server/core/backend/store/indexer.go @@ -28,23 +28,16 @@ import ( "time" ) -var defaultRootKeys map[string]struct{} - -func init() { - defaultRootKeys = make(map[string]struct{}, len(defaultRootKeys)) - for _, root := range TypeRoots { - defaultRootKeys[root] = struct{}{} - } -} - type Indexer struct { - BuildTimeout time.Duration + BuildTimeout time.Duration + Root string + cacher Cacher - prefixIndex map[string]map[string]struct{} - prefixLock sync.RWMutex - prefixBuildQueue chan KvEvent goroutine *util.GoRoutine ready chan struct{} + prefixIndex map[string]map[string]struct{} + prefixBuildQueue chan KvEvent + prefixLock sync.RWMutex isClose bool } @@ -266,8 +259,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) { } func (i *Indexer) addPrefixKey(prefix, key string) { - _, ok := defaultRootKeys[key] - if ok { + if i.Root == key { return } @@ -342,9 +334,10 @@ func (i *Indexer) Ready() <-chan struct{} { return i.ready } -func NewCacheIndexer(cr Cacher) *Indexer { +func NewCacheIndexer(root string, cr Cacher) *Indexer { return &Indexer{ BuildTimeout: DEFAULT_ADD_QUEUE_TIMEOUT, + Root: root, cacher: cr, prefixIndex: make(map[string]map[string]struct{}, DEFAULT_CACHE_INIT_SIZE), prefixBuildQueue: make(chan KvEvent, DEFAULT_MAX_EVENT_COUNT), diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go index 3cf1fe7..42fbde5 100644 --- a/server/core/backend/store/listwatch.go +++ b/server/core/backend/store/listwatch.go @@ -24,30 +24,20 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" "sync" - "time" ) -type ListOptions struct { - Timeout time.Duration - Context context.Context -} - -func (lo *ListOptions) String() string { - return fmt.Sprintf("{timeout: %s}", lo.Timeout) -} - type ListWatcher struct { Client registry.Registry - Key string + Prefix string rev int64 } -func (lw *ListWatcher) List(op ListOptions) ([]*mvccpb.KeyValue, error) { +func (lw *ListWatcher) List(op ListWatchConfig) ([]*mvccpb.KeyValue, error) { otCtx, _ := context.WithTimeout(op.Context, op.Timeout) - resp, err := lw.Client.Do(otCtx, registry.WatchPrefixOpOptions(lw.Key)...) + resp, err := lw.Client.Do(otCtx, registry.WatchPrefixOpOptions(lw.Prefix)...) if err != nil { - util.Logger().Errorf(err, "list key %s failed, rev: %d->0", lw.Key, lw.Revision()) + util.Logger().Errorf(err, "list prefix %s failed, rev: %d->0", lw.Prefix, lw.Revision()) lw.setRevision(0) return nil, err } @@ -66,14 +56,14 @@ func (lw *ListWatcher) setRevision(rev int64) { lw.rev = rev } -func (lw *ListWatcher) Watch(op ListOptions) *Watcher { +func (lw *ListWatcher) Watch(op ListWatchConfig) *Watcher { return newWatcher(lw, op) } func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error { rev := lw.Revision() opts := append( - registry.WatchPrefixOpOptions(lw.Key), + registry.WatchPrefixOpOptions(lw.Prefix), registry.WithRev(rev+1), registry.WithWatchCallback( func(message string, resp *registry.PluginResponse) error { @@ -81,13 +71,13 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error return fmt.Errorf("unknown event %s", resp) } - util.Logger().Infof("watch prefix key %s, start rev %d+1, event: %s", lw.Key, rev, resp) + util.Logger().Infof("watch prefix %s, start rev %d+1, event: %s", lw.Prefix, rev, resp) lw.setRevision(resp.Revision) evts := make([]KvEvent, len(resp.Kvs)) for i, kv := range resp.Kvs { - evt := KvEvent{Prefix: lw.Key, Revision: kv.ModRevision} + evt := KvEvent{Prefix: lw.Prefix, Revision: kv.ModRevision} switch { case resp.Action == registry.Put && kv.Version == 1: evt.Type, evt.Object = proto.EVT_CREATE, kv @@ -106,16 +96,16 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error err := lw.Client.Watch(ctx, opts...) if err != nil { // compact可能会导致watch失败 or message body size lager than 4MB - util.Logger().Errorf(err, "watch key %s failed, start rev: %d+1->%d->0", lw.Key, rev, lw.Revision()) + util.Logger().Errorf(err, "watch prefix %s failed, start rev: %d+1->%d->0", lw.Prefix, rev, lw.Revision()) lw.setRevision(0) - f([]KvEvent{errEvent(lw.Key, err)}) + f([]KvEvent{errEvent(lw.Prefix, err)}) } return err } type Watcher struct { - ListOps ListOptions + ListOps ListWatchConfig lw *ListWatcher bus chan []KvEvent stopCh chan struct{} @@ -169,7 +159,7 @@ func errEvent(key string, err error) KvEvent { } } -func newWatcher(lw *ListWatcher, listOps ListOptions) *Watcher { +func newWatcher(lw *ListWatcher, listOps ListWatchConfig) *Watcher { w := &Watcher{ ListOps: listOps, lw: lw, diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go index e549cd4..6684946 100644 --- a/server/core/backend/store/store.go +++ b/server/core/backend/store/store.go @@ -61,7 +61,7 @@ func (s *KvStore) dispatchEvent(t StoreType, evt KvEvent) { } func (s *KvStore) newIndexBuilder(t StoreType, cacher Cacher) { - indexer := NewCacheIndexer(cacher) + indexer := NewCacheIndexer(TypeRoots[t], cacher) s.indexers[t] = indexer indexer.Run() } @@ -71,7 +71,7 @@ func (s *KvStore) Run() { s.taskService.Run() } -func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []KvCacherCfgOption) { +func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []ConfigOption) { switch t { case INSTANCE: opts = append(opts, WithDeferHandler(s.SelfPreservationHandler())) @@ -81,7 +81,7 @@ func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []KvCacherCfgOption) opts = append(opts, WithInitSize(sz)) } opts = append(opts, - WithKey(TypeRoots[t]), + WithPrefix(TypeRoots[t]), WithEventFunc(func(evt KvEvent) { s.dispatchEvent(t, evt) })) return } @@ -246,6 +246,33 @@ func (s *KvStore) KeepAlive(ctx context.Context, opts ...registry.PluginOpOption return pt.TTL, pt.Err() } +func (s *KvStore) Entity(id StoreType) *Indexer { + return s.indexers[id] +} + +func (s *KvStore) Install(e Entity) (id StoreType, err error) { + if id, err = InstallType(e); err != nil { + return + } + + util.Logger().Infof("install new store entity %d:%s->%s", id, e.Name(), e.Prefix()) + + if !core.ServerInfo.Config.EnableCache { + s.newIndexBuilder(id, NullCacher) + return + } + s.newIndexBuilder(id, NewKvCacher(id.String(), s.getKvCacherCfgOptions(id)...)) + return +} + +func (s *KvStore) MustInstall(e Entity) StoreType { + id, err := s.Install(e) + if err != nil { + panic(err) + } + return id +} + func Store() *KvStore { return store } diff --git a/server/error/error.go b/server/error/error.go index 5ee5bf1..b30356e 100644 --- a/server/error/error.go +++ b/server/error/error.go @@ -18,8 +18,6 @@ package error import ( "encoding/json" - "fmt" - "github.com/apache/incubator-servicecomb-service-center/pkg/util" "net/http" ) @@ -104,9 +102,9 @@ func (e Error) Error() string { return e.Message + "(" + e.Detail + ")" } -func (e Error) toJson() string { +func (e Error) Marshal() []byte { bs, _ := json.Marshal(e) - return util.BytesToStringWithNoCopy(bs) + return bs } func (e Error) StatusCode() int { @@ -123,14 +121,6 @@ func (e Error) InternalError() bool { return false } -func (e Error) HttpWrite(w http.ResponseWriter) { - status := e.StatusCode() - w.Header().Add("X-Response-Status", fmt.Sprint(status)) - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - w.WriteHeader(status) - fmt.Fprintln(w, e.toJson()) -} - func NewError(code int32, detail string) *Error { return &Error{ Code: code, diff --git a/server/rest/controller/rest_util.go b/server/rest/controller/rest_util.go index be626d5..00f93f7 100644 --- a/server/rest/controller/rest_util.go +++ b/server/rest/controller/rest_util.go @@ -25,15 +25,23 @@ import ( "net/http" ) +const ( + contentTypeJson = "application/json; charset=UTF-8" + contentTypeText = "text/plain; charset=UTF-8" +) + func WriteError(w http.ResponseWriter, code int32, detail string) { err := error.NewError(code, detail) - err.HttpWrite(w) + w.Header().Add("X-Response-Status", fmt.Sprint(err.StatusCode())) + w.Header().Set("Content-Type", contentTypeJson) + w.WriteHeader(err.StatusCode()) + fmt.Fprintln(w, util.BytesToStringWithNoCopy(err.Marshal())) } func WriteJsonObject(w http.ResponseWriter, obj interface{}) { if obj == nil { w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK)) - w.Header().Set("Content-Type", "text/plain; charset=UTF-8") + w.Header().Set("Content-Type", contentTypeText) w.WriteHeader(http.StatusOK) return } @@ -44,7 +52,7 @@ func WriteJsonObject(w http.ResponseWriter, obj interface{}) { return } w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK)) - w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.Header().Set("Content-Type", contentTypeJson) w.WriteHeader(http.StatusOK) fmt.Fprintln(w, util.BytesToStringWithNoCopy(objJson)) } @@ -61,11 +69,10 @@ func WriteResponse(w http.ResponseWriter, resp *pb.Response, obj interface{}) { func WriteBytes(w http.ResponseWriter, resp *pb.Response, json []byte) { if resp.GetCode() == pb.Response_SUCCESS { w.Header().Add("X-Response-Status", fmt.Sprint(http.StatusOK)) - w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.Header().Set("Content-Type", contentTypeJson) w.WriteHeader(http.StatusOK) w.Write(json) return } WriteError(w, resp.GetCode(), resp.GetMessage()) } - diff --git a/server/rest/controller/v3/main_controller.go b/server/rest/controller/v3/main_controller.go index 5bb4410..e20e323 100644 --- a/server/rest/controller/v3/main_controller.go +++ b/server/rest/controller/v3/main_controller.go @@ -19,13 +19,29 @@ package v3 import ( "encoding/json" "github.com/apache/incubator-servicecomb-service-center/pkg/rest" + pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" + "github.com/apache/incubator-servicecomb-service-center/server/rest/controller" "github.com/apache/incubator-servicecomb-service-center/server/rest/controller/v4" "github.com/apache/incubator-servicecomb-service-center/version" "net/http" ) +var ( + versionJsonCache []byte + versionResp *pb.Response +) + const API_VERSION = "3.0.0" +func init() { + result := v4.Result{ + VersionSet: version.Ver(), + ApiVersion: API_VERSION, + } + versionJsonCache, _ = json.Marshal(result) + versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version successfully") +} + type MainService struct { v4.MainService } @@ -38,11 +54,5 @@ func (this *MainService) URLPatterns() []rest.Route { } func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) { - result := v4.Result{ - VersionSet: version.Ver(), - ApiVersion: API_VERSION, - } - resultJSON, _ := json.Marshal(result) - w.Header().Set("Content-Type", "application/json;charset=utf-8") - w.Write(resultJSON) + controller.WriteBytes(w, versionResp, versionJsonCache) } diff --git a/server/rest/controller/v4/main_controller.go b/server/rest/controller/v4/main_controller.go index d2af4d4..cbf5ed5 100644 --- a/server/rest/controller/v4/main_controller.go +++ b/server/rest/controller/v4/main_controller.go @@ -28,7 +28,10 @@ import ( "net/http" ) -var resultJSON []byte +var ( + versionJsonCache []byte + versionResp *pb.Response +) const API_VERSION = "4.0.0" @@ -48,7 +51,8 @@ func init() { API_VERSION, core.ServerInfo.Config, } - resultJSON, _ = json.Marshal(result) + versionJsonCache, _ = json.Marshal(result) + versionResp = pb.CreateResponse(pb.Response_SUCCESS, "get version successfully") } func (this *MainService) URLPatterns() []rest.Route { @@ -72,6 +76,5 @@ func (this *MainService) ClusterHealth(w http.ResponseWriter, r *http.Request) { } func (this *MainService) GetVersion(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json;charset=utf-8") - w.Write(resultJSON) + controller.WriteBytes(w, versionResp, versionJsonCache) } diff --git a/server/service/schema.go b/server/service/schema.go index 3d7da42..dd9c4d9 100644 --- a/server/service/schema.go +++ b/server/service/schema.go @@ -122,7 +122,6 @@ func (s *MicroServiceService) GetAllSchemaInfo(ctx context.Context, in *pb.GetAl schemasList := service.Schemas if schemasList == nil || len(schemasList) == 0 { - util.Logger().Infof("service %s schemaId set is empty.", in.ServiceId) return &pb.GetAllSchemaResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Do not have this schema info."), Schemas: []*pb.Schema{}, -- To stop receiving notification emails like this one, please contact little...@apache.org.