This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new fcde10e  restructure cache (#911)
fcde10e is described below

commit fcde10e1fe3598666cabe28353922c8753568895
Author: panqian <[email protected]>
AuthorDate: Fri Apr 16 10:36:32 2021 +0800

    restructure cache (#911)
---
 datasource/cache/ms_cache.go                       | 147 ++++++++++
 datasource/mongo/client/dao/dep.go                 |  74 +++++
 datasource/mongo/client/dao/instance.go            |   9 +
 datasource/mongo/client/dao/microservice.go        |  46 ++-
 datasource/mongo/client/dao/rule.go                |   8 +
 datasource/mongo/dep_util.go                       |  42 +--
 datasource/mongo/dependency_query.go               |   4 +-
 datasource/mongo/event/instance_event_handler.go   |  46 ++-
 .../mongo/event/instance_event_handler_test.go     |   2 -
 datasource/mongo/ms.go                             | 149 ++++------
 datasource/mongo/rule_util.go                      |  12 +-
 datasource/mongo/sd/dep_cache.go                   | 132 +++++++++
 datasource/mongo/sd/depcache_test.go               |  93 ++++++
 datasource/mongo/sd/{cache.go => doc_cache.go}     |  50 ++--
 .../mongo/sd/{options_test.go => docc_test.go}     |  51 ++--
 datasource/mongo/sd/event_proxy_test.go            |   1 -
 datasource/mongo/sd/{cache.go => hset.go}          |  62 ++--
 .../mongo/sd/{options_test.go => hset_test.go}     |  47 +--
 datasource/mongo/sd/index_cache.go                 |  71 +++++
 datasource/mongo/sd/indexc_test.go                 |  21 ++
 datasource/mongo/sd/instance_cache.go              | 129 +++++++++
 datasource/mongo/sd/instancec_test.go              |  85 ++++++
 datasource/mongo/sd/listwatch_inner.go             |  58 +---
 datasource/mongo/sd/listwatch_test.go              |  27 +-
 datasource/mongo/sd/mongo_cache.go                 | 177 ++----------
 datasource/mongo/sd/mongo_cacher.go                |  83 ++----
 datasource/mongo/sd/mongo_cacher_test.go           | 317 ---------------------
 datasource/mongo/sd/options_test.go                |   1 -
 datasource/mongo/sd/rule_cache.go                  | 124 ++++++++
 datasource/mongo/sd/service_cache.go               | 133 +++++++++
 datasource/mongo/sd/servicec_test.go               |  94 ++++++
 datasource/mongo/sd/types.go                       |  29 +-
 datasource/mongo/sd/typestore.go                   |  25 +-
 datasource/sdcommon/types.go                       |   2 -
 34 files changed, 1475 insertions(+), 876 deletions(-)

diff --git a/datasource/cache/ms_cache.go b/datasource/cache/ms_cache.go
new file mode 100644
index 0000000..fa51f0c
--- /dev/null
+++ b/datasource/cache/ms_cache.go
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cache
+
+import (
+       "context"
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/go-chassis/cari/discovery"
+       "strings"
+)
+
+const (
+       Provider = "p"
+)
+
+func GetProviderServiceOfDeps(provider *discovery.MicroService) 
(*discovery.MicroServiceDependency, bool) {
+       res := sd.Store().Dep().Cache().GetValue(genDepserivceKey(Provider, 
provider))
+       deps, ok := transCacheToDep(res)
+       if !ok {
+               return nil, false
+       }
+       return deps[0], true
+}
+
+func transCacheToDep(cache []interface{}) 
([]*discovery.MicroServiceDependency, bool) {
+       res := make([]*discovery.MicroServiceDependency, 0, len(cache))
+       for _, v := range cache {
+               t, ok := v.(model.DependencyRule)
+               if !ok {
+                       return nil, false
+               }
+               res = append(res, t.Dep)
+       }
+       if len(res) == 0 {
+               return nil, false
+       }
+       return res, true
+}
+
+func genDepserivceKey(ruleType string, service *discovery.MicroService) string 
{
+       return strings.Join([]string{ruleType, service.AppId, 
service.ServiceName, service.Version}, "/")
+}
+
+func GetMicroServiceInstancesByID(serviceID string) 
([]*discovery.MicroServiceInstance, bool) {
+       cacheInstances := sd.Store().Instance().Cache().GetValue(serviceID)
+       insts, ok := transCacheToInsts(cacheInstances)
+       if !ok {
+               return nil, false
+       }
+       return insts, true
+}
+
+func transCacheToInsts(cache []interface{}) 
([]*discovery.MicroServiceInstance, bool) {
+       res := make([]*discovery.MicroServiceInstance, 0, len(cache))
+       for _, iter := range cache {
+               inst, ok := iter.(model.Instance)
+               if !ok {
+                       return nil, false
+               }
+               res = append(res, inst.Instance)
+       }
+       if len(res) == 0 {
+               return nil, false
+       }
+       return res, true
+}
+
+func GetRulesByServiceID(serviceID string) ([]*model.Rule, bool){
+       cacheRes := sd.Store().Rule().Cache().GetValue(serviceID)
+       return transCacheToRules(cacheRes)
+}
+
+func transCacheToRules(cacheRules []interface{}) ([]*model.Rule, bool) {
+       res := make([]*model.Rule, 0, len(cacheRules))
+       for _, v := range cacheRules {
+               t, ok := v.(model.Rule)
+               if !ok {
+                       return nil, false
+               }
+               res = append(res, &model.Rule{
+                       Domain:    t.Domain,
+                       Project:   t.Project,
+                       ServiceID: t.ServiceID,
+                       Rule:      t.Rule,
+               })
+       }
+       if len(res)==0{
+               return nil,false
+       }
+       return res, true
+}
+
+func GetServiceByID(serviceID string) (*model.Service, bool) {
+       cacheRes := sd.Store().Service().Cache().GetValue(serviceID)
+       res, ok := transCacheToService(cacheRes)
+       if !ok {
+               return nil ,false
+       }
+       return res[0], true
+}
+
+func GetServiceID(ctx context.Context,key *discovery.MicroServiceKey) 
(serviceID string, exist bool) {
+       cacheIndex := strings.Join([]string{util.ParseDomain(ctx), 
util.ParseProject(ctx), key.AppId, key.ServiceName, key.Version}, "/")
+       res := sd.Store().Service().Cache().GetValue(cacheIndex)
+       cacheService, ok := transCacheToService(res)
+       if !ok {
+               return
+       }
+       return cacheService[0].Service.ServiceId, true
+}
+
+func transCacheToService(services []interface{}) ([]*model.Service, bool) {
+       res := make([]*model.Service, 0, len(services))
+       for _, v := range services {
+               t, ok := v.(model.Service)
+               if !ok {
+                       return nil, false
+               }
+               res = append(res, &model.Service{
+                       Domain:  t.Domain,
+                       Project: t.Project,
+                       Tags:    t.Tags,
+                       Service: t.Service,
+               })
+       }
+       if len(res)==0{
+               return nil,false
+       }
+       return res, true
+}
\ No newline at end of file
diff --git a/datasource/mongo/client/dao/dep.go 
b/datasource/mongo/client/dao/dep.go
new file mode 100644
index 0000000..16bc4c4
--- /dev/null
+++ b/datasource/mongo/client/dao/dep.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dao
+
+import (
+       "context"
+       "strings"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       mutil 
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/go-chassis/cari/discovery"
+)
+
+const (
+       Provider = "p"
+       Consumer = "c"
+)
+
+func GetProviderDeps(ctx context.Context, provider *discovery.MicroService) 
(*discovery.MicroServiceDependency, error) {
+       return getServiceofDeps(ctx, Provider, provider)
+}
+
+func getServiceofDeps(ctx context.Context, ruleType string, provider 
*discovery.MicroService) (*discovery.MicroServiceDependency, error) {
+       filter := mutil.NewFilter(
+               mutil.ServiceType(ruleType),
+               mutil.ServiceKeyTenant(util.ParseDomainProject(ctx)),
+               mutil.ServiceKeyAppID(provider.AppId),
+               mutil.ServiceKeyServiceName(provider.ServiceName),
+               mutil.ServiceKeyServiceVersion(provider.Version),
+       )
+       depRule, err := getDeps(ctx, filter)
+       if err != nil {
+               return nil, err
+       }
+       return depRule.Dep, nil
+}
+
+func genDepserivceKey(ruleType string, service *discovery.MicroService) string 
{
+       return strings.Join([]string{ruleType, service.AppId, 
service.ServiceName, service.Version}, "/")
+}
+
+func getDeps(ctx context.Context, filter interface{}) (*model.DependencyRule, 
error) {
+       findRes, err := client.GetMongoClient().FindOne(ctx, 
model.CollectionDep, filter)
+       if err != nil {
+               return nil, err
+       }
+       var depRule *model.DependencyRule
+       if findRes.Err() != nil {
+               return nil, datasource.ErrNoData
+       }
+       err = findRes.Decode(&depRule)
+       if err != nil {
+               return nil, err
+       }
+       return depRule, nil
+}
diff --git a/datasource/mongo/client/dao/instance.go 
b/datasource/mongo/client/dao/instance.go
index a59be3e..47a7efa 100644
--- a/datasource/mongo/client/dao/instance.go
+++ b/datasource/mongo/client/dao/instance.go
@@ -20,6 +20,9 @@ package dao
 import (
        "context"
 
+       mutil 
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
+       "go.mongodb.org/mongo-driver/bson"
+
        "github.com/go-chassis/cari/discovery"
        "go.mongodb.org/mongo-driver/mongo/options"
 
@@ -61,6 +64,12 @@ func GetInstances(ctx context.Context, filter interface{}) 
([]*model.Instance, e
        return instances, nil
 }
 
+func GetMicroServiceInstancesByID(ctx context.Context, serviceID string) 
([]*discovery.MicroServiceInstance, error) {
+       filter := mutil.NewFilter(mutil.InstanceServiceID(serviceID))
+       option := &options.FindOptions{Sort: 
bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance, 
model.ColumnVersion}): -1}}
+       return GetMicroServiceInstances(ctx, filter, option)
+}
+
 func GetMicroServiceInstances(ctx context.Context, filter interface{}, opts 
...*options.FindOptions) ([]*discovery.MicroServiceInstance, error) {
        res, err := client.GetMongoClient().Find(ctx, model.CollectionInstance, 
filter, opts...)
        if err != nil {
diff --git a/datasource/mongo/client/dao/microservice.go 
b/datasource/mongo/client/dao/microservice.go
index 51b1c58..7cd39ec 100644
--- a/datasource/mongo/client/dao/microservice.go
+++ b/datasource/mongo/client/dao/microservice.go
@@ -19,7 +19,11 @@ package dao
 
 import (
        "context"
+       "errors"
+
+       mutil 
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
        "github.com/go-chassis/cari/discovery"
+       "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo/options"
 
        "github.com/apache/servicecomb-service-center/datasource"
@@ -27,6 +31,10 @@ import (
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
 )
 
+func GetServiceByID(ctx context.Context, serviceID string) (*model.Service, 
error) {
+       return GetService(ctx, mutil.NewBasicFilter(ctx, 
mutil.ServiceServiceID(serviceID)))
+}
+
 func GetService(ctx context.Context, filter interface{}, opts 
...*options.FindOneOptions) (*model.Service, error) {
        result, err := client.GetMongoClient().FindOne(ctx, 
model.CollectionService, filter, opts...)
        if err != nil {
@@ -34,7 +42,6 @@ func GetService(ctx context.Context, filter interface{}, opts 
...*options.FindOn
        }
        var svc *model.Service
        if result.Err() != nil {
-               //not get any service,not db err
                return nil, datasource.ErrNoData
        }
        err = result.Decode(&svc)
@@ -44,6 +51,43 @@ func GetService(ctx context.Context, filter interface{}, 
opts ...*options.FindOn
        return svc, nil
 }
 
+func GetServiceID(ctx context.Context, key *discovery.MicroServiceKey) 
(string, error) {
+       filter := mutil.NewBasicFilter(
+               ctx,
+               mutil.ServiceEnv(key.Environment),
+               mutil.ServiceAppID(key.AppId),
+               mutil.ServiceServiceName(key.ServiceName),
+               mutil.ServiceVersion(key.Version),
+       )
+       id, err := getServiceID(ctx, filter)
+       if err != nil && !errors.Is(err, datasource.ErrNoData) {
+               return "", err
+       }
+       if len(id) == 0 && len(key.Alias) != 0 {
+               filter = mutil.NewBasicFilter(
+                       ctx,
+                       mutil.ServiceEnv(key.Environment),
+                       mutil.ServiceAppID(key.AppId),
+                       mutil.ServiceAlias(key.Alias),
+                       mutil.ServiceVersion(key.Version),
+               )
+               return getServiceID(ctx, filter)
+       }
+       return id, nil
+}
+
+func getServiceID(ctx context.Context, filter bson.M) (serviceID string, err 
error) {
+       svc, err := GetService(ctx, filter)
+       if err != nil {
+               return
+       }
+       if svc != nil {
+               serviceID = svc.Service.ServiceId
+               return
+       }
+       return
+}
+
 func GetServices(ctx context.Context, filter interface{}, opts 
...*options.FindOptions) ([]*model.Service, error) {
        res, err := client.GetMongoClient().Find(ctx, model.CollectionService, 
filter, opts...)
        if err != nil {
diff --git a/datasource/mongo/client/dao/rule.go 
b/datasource/mongo/client/dao/rule.go
index 522a119..4179ae6 100644
--- a/datasource/mongo/client/dao/rule.go
+++ b/datasource/mongo/client/dao/rule.go
@@ -20,6 +20,9 @@ package dao
 import (
        "context"
 
+       mutil 
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+
        "github.com/go-chassis/cari/discovery"
        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo/options"
@@ -29,6 +32,11 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
 )
 
+func GetRulesByServiceID(ctx context.Context, serviceID string) 
([]*model.Rule, error) {
+       filter := mutil.NewDomainProjectFilter(util.ParseDomain(ctx), 
util.ParseDomain(ctx), mutil.ServiceID(serviceID))
+       return GetRules(ctx, filter)
+}
+
 func GetRules(ctx context.Context, filter interface{}) ([]*model.Rule, error) {
        cursor, err := client.GetMongoClient().Find(ctx, model.CollectionRule, 
filter)
        if err != nil {
diff --git a/datasource/mongo/dep_util.go b/datasource/mongo/dep_util.go
index 6a39b4e..f2b7c3a 100644
--- a/datasource/mongo/dep_util.go
+++ b/datasource/mongo/dep_util.go
@@ -20,28 +20,25 @@ package mongo
 import (
        "context"
        "fmt"
-
+       "github.com/apache/servicecomb-service-center/datasource/cache"
        pb "github.com/go-chassis/cari/discovery"
 
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
-       mutil 
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
-       "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-func GetAllConsumerIds(ctx context.Context, provider *pb.MicroService) (allow 
[]string, deny []string, _ error) {
+func GetAllConsumerIds(ctx context.Context, provider *pb.MicroService) (allow 
[]string, deny []string, err error) {
        if provider == nil || len(provider.ServiceId) == 0 {
                return nil, nil, fmt.Errorf("invalid provider")
        }
 
        //todo 删除服务,最后实例推送有误差
-       domain := util.ParseDomainProject(ctx)
-       project := util.ParseProject(ctx)
-       filter := mutil.NewDomainProjectFilter(domain, project, 
mutil.ServiceID(provider.ServiceId))
-       providerRules, err := dao.GetRules(ctx, filter)
-       if err != nil {
-               return nil, nil, err
+       providerRules, ok := cache.GetRulesByServiceID(provider.ServiceId)
+       if !ok {
+               providerRules, err = dao.GetRulesByServiceID(ctx, 
provider.ServiceId)
+               if err != nil {
+                       return nil, nil, err
+               }
        }
 
        allow, deny, err = GetConsumerIDsWithFilter(ctx, provider, 
providerRules)
@@ -52,12 +49,23 @@ func GetAllConsumerIds(ctx context.Context, provider 
*pb.MicroService) (allow []
 }
 
 func GetConsumerIDsWithFilter(ctx context.Context, provider *pb.MicroService, 
rules []*model.Rule) (allow []string, deny []string, err error) {
-       domainProject := util.ParseDomainProject(ctx)
-       dr := NewProviderDependencyRelation(ctx, domainProject, provider)
-       consumerIDs, err := dr.GetDependencyConsumerIds()
-       if err != nil {
-               log.Error(fmt.Sprintf("get service[%s]'s consumerIds failed", 
provider.ServiceId), err)
-               return nil, nil, err
+       serviceDeps, ok := cache.GetProviderServiceOfDeps(provider)
+       if !ok {
+               serviceDeps, err = dao.GetProviderDeps(ctx, provider)
+               if err != nil {
+                       return nil, nil, err
+               }
+       }
+       consumerIDs := make([]string, len(serviceDeps.Dependency))
+       for _, serviceKeys := range serviceDeps.Dependency {
+               id, ok := cache.GetServiceID(ctx, serviceKeys)
+               if !ok {
+                       id, err = dao.GetServiceID(ctx, serviceKeys)
+                       if err != nil {
+                               return nil, nil, err
+                       }
+               }
+               consumerIDs = append(consumerIDs, id)
        }
        return FilterAll(ctx, consumerIDs, rules)
 }
diff --git a/datasource/mongo/dependency_query.go 
b/datasource/mongo/dependency_query.go
index f307680..7c60355 100644
--- a/datasource/mongo/dependency_query.go
+++ b/datasource/mongo/dependency_query.go
@@ -635,7 +635,7 @@ func GenerateRuleKeyWithSameServiceNameAndAppID(serviceType 
string, domainProjec
                util.ServiceType(serviceType),
                util.ServiceKeyTenant(domainProject),
                util.ServiceKeyAppID(in.AppId),
-               util.ServiceServiceName(in.ServiceName),
+               util.ServiceKeyServiceName(in.ServiceName),
        )
 }
 
@@ -660,6 +660,6 @@ func GenerateServiceDependencyRuleKey(serviceType string, 
domainProject string,
                util.ServiceKeyServiceEnv(in.Environment),
                util.ServiceKeyAppID(in.AppId),
                util.ServiceKeyServiceVersion(in.Version),
-               util.ServiceServiceName(in.ServiceName),
+               util.ServiceKeyServiceName(in.ServiceName),
        )
 }
diff --git a/datasource/mongo/event/instance_event_handler.go 
b/datasource/mongo/event/instance_event_handler.go
index aaad685..f367c15 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -19,13 +19,10 @@ package event
 
 import (
        "context"
-       "errors"
        "fmt"
+       "github.com/apache/servicecomb-service-center/datasource/cache"
        "time"
 
-       "github.com/go-chassis/cari/discovery"
-       "go.mongodb.org/mongo-driver/bson"
-
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/mongo"
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
@@ -38,6 +35,7 @@ import (
        "github.com/apache/servicecomb-service-center/server/metrics"
        "github.com/apache/servicecomb-service-center/server/notify"
        "github.com/apache/servicecomb-service-center/server/syncernotify"
+       "github.com/go-chassis/cari/discovery"
 )
 
 // InstanceEventHandler is the handler to handle events
@@ -51,15 +49,27 @@ func (h InstanceEventHandler) Type() string {
 
 func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
        action := evt.Type
+       if evt.Type == discovery.EVT_UPDATE {
+               return
+       }
        instance := evt.Value.(model.Instance)
        providerID := instance.Instance.ServiceId
        providerInstanceID := instance.Instance.InstanceId
        domainProject := instance.Domain + "/" + instance.Project
-       cacheService := sd.Store().Service().Cache().Get(providerID)
-       var microService *discovery.MicroService
-       if cacheService != nil {
-               microService = cacheService.(model.Service).Service
+       ctx := util.SetDomainProject(context.Background(), instance.Domain, 
instance.Project)
+       res, ok := cache.GetServiceByID(providerID)
+       var err error
+       if !ok {
+               res, err = dao.GetServiceByID(ctx, providerID)
+               if err != nil {
+                       log.Error(fmt.Sprintf("caught [%s] instance[%s/%s] 
event, endpoints %v, get provider's file failed from db\n",
+                               action, providerID, providerInstanceID, 
instance.Instance.Endpoints), err)
+               }
        }
+       if res == nil {
+               return
+       }
+       microService := res.Service
        switch action {
        case discovery.EVT_INIT:
                metrics.ReportInstances(instance.Domain, increaseOne)
@@ -69,30 +79,10 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
                metrics.ReportInstances(instance.Domain, increaseOne)
        case discovery.EVT_DELETE:
                metrics.ReportInstances(instance.Domain, decreaseOne)
-               // to report quota
-       }
-       if microService == nil {
-               log.Info("get cached service failed, then get from database")
-               service, err := dao.GetService(context.Background(), 
bson.M{"serviceinfo.serviceid": providerID})
-               if err != nil {
-                       if errors.Is(err, datasource.ErrNoData) {
-                               log.Warn(fmt.Sprintf("there is no service with 
id [%s] in the database", providerID))
-                       } else {
-                               log.Error("query database error", err)
-                       }
-                       return
-               }
-               microService = service.Service // service in the cache may not 
ready, query from db once
-               if microService == nil {
-                       log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s] 
event, endpoints %v, get provider's file failed from db\n",
-                               action, providerID, providerInstanceID, 
instance.Instance.Endpoints))
-                       return
-               }
        }
        if !syncernotify.GetSyncerNotifyCenter().Closed() {
                NotifySyncerInstanceEvent(evt, microService)
        }
-       ctx := util.SetDomainProject(context.Background(), instance.Domain, 
instance.Project)
        consumerIDS, _, err := mongo.GetAllConsumerIds(ctx, microService)
        if err != nil {
                log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s 
consumerIDs failed",
diff --git a/datasource/mongo/event/instance_event_handler_test.go 
b/datasource/mongo/event/instance_event_handler_test.go
index a701795..c802c5f 100644
--- a/datasource/mongo/event/instance_event_handler_test.go
+++ b/datasource/mongo/event/instance_event_handler_test.go
@@ -92,7 +92,6 @@ func mongoAssign() sd.MongoEvent {
        mongoEvent.DocumentID = "5fdc483b4a885f69317e3505"
        mongoEvent.Value = mongoInstance
        mongoEvent.Type = discovery.EVT_CREATE
-       mongoEvent.ResourceID = "f73dceb440f711eba63ffa163e7cdcb8"
        return mongoEvent
 }
 
@@ -112,7 +111,6 @@ func mongoEventWronServiceId() sd.MongoEvent {
        mongoEvent.DocumentID = "5fdc483b4a885f69317e3505"
        mongoEvent.Value = mongoInstance
        mongoEvent.Type = discovery.EVT_CREATE
-       mongoEvent.ResourceID = "f73dceb440f711eba63ffa163e7cdcb8"
        return mongoEvent
 }
 
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 3d224b8..4d6c508 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -23,6 +23,7 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "github.com/apache/servicecomb-service-center/datasource/cache"
        "reflect"
        "regexp"
        "sort"
@@ -41,7 +42,6 @@ import (
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
        
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
-       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
        mutil 
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
@@ -197,18 +197,22 @@ func (ds *DataSource) GetApplications(ctx 
context.Context, request *discovery.Ge
 }
 
 func (ds *DataSource) GetService(ctx context.Context, request 
*discovery.GetServiceRequest) (*discovery.GetServiceResponse, error) {
-       svc, err := GetServiceByID(ctx, request.ServiceId)
-       if err != nil {
-               if errors.Is(err, datasource.ErrNoData) {
-                       log.Debug(fmt.Sprintf("service %s not exist in db", 
request.ServiceId))
+       svc, ok := cache.GetServiceByID(request.ServiceId)
+       if !ok {
+               var err error
+               svc, err = dao.GetServiceByID(ctx, request.ServiceId)
+               if err != nil {
+                       if errors.Is(err, datasource.ErrNoData) {
+                               log.Debug(fmt.Sprintf("service %s not exist in 
db", request.ServiceId))
+                               return &discovery.GetServiceResponse{
+                                       Response: 
discovery.CreateResponse(discovery.ErrServiceNotExists, "Service not exist."),
+                               }, nil
+                       }
+                       log.Error(fmt.Sprintf("failed to get single service %s 
from mongo", request.ServiceId), err)
                        return &discovery.GetServiceResponse{
-                               Response: 
discovery.CreateResponse(discovery.ErrServiceNotExists, "Service not exist."),
-                       }, nil
+                               Response: 
discovery.CreateResponse(discovery.ErrInternal, "get service data from mongodb 
failed."),
+                       }, err
                }
-               log.Error(fmt.Sprintf("failed to get single service %s from 
mongo", request.ServiceId), err)
-               return &discovery.GetServiceResponse{
-                       Response: 
discovery.CreateResponse(discovery.ErrInternal, "get service data from mongodb 
failed."),
-               }, err
        }
        return &discovery.GetServiceResponse{
                Response: discovery.CreateResponse(discovery.ResponseSuccess, 
"Get service successfully."),
@@ -1521,13 +1525,17 @@ func (ds *DataSource) GetInstance(ctx context.Context, 
request *discovery.GetOne
                        Response: 
discovery.CreateResponse(discovery.ErrInstanceNotExists, mes.Error()),
                }, nil
        }
-       instances, err := GetInstancesByServiceID(ctx, 
request.ProviderServiceId)
-       if err != nil {
-               log.Error(fmt.Sprintf("get instance failed %s", findFlag()), 
err)
-               return &discovery.GetOneInstanceResponse{
-                       Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
-               }, err
+       instances, ok := 
cache.GetMicroServiceInstancesByID(request.ProviderServiceId)
+       if !ok {
+               instances, err = dao.GetMicroServiceInstancesByID(ctx, 
request.ProviderServiceId)
+               if err != nil {
+                       log.Error(fmt.Sprintf("get instance failed %s", 
findFlag()), err)
+                       return &discovery.GetOneInstanceResponse{
+                               Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
+                       }, err
+               }
        }
+
        instance := instances[0]
        // use explicit instanceId to query
        if len(request.ProviderInstanceId) != 0 {
@@ -1566,17 +1574,41 @@ func (ds *DataSource) GetInstances(ctx context.Context, 
request *discovery.GetIn
        var err error
 
        if len(request.ConsumerServiceId) > 0 {
-               service, err = GetServiceByID(ctx, request.ConsumerServiceId)
+               var exist bool
+               service, exist = cache.GetServiceByID(request.ConsumerServiceId)
+               if !exist {
+                       service, err = dao.GetServiceByID(ctx, 
request.ConsumerServiceId)
+                       if err != nil {
+                               if errors.Is(err, datasource.ErrNoData) {
+                                       log.Debug(fmt.Sprintf("consumer does 
not exist, consumer %s find provider %s instances",
+                                               request.ConsumerServiceId, 
request.ProviderServiceId))
+                                       return &discovery.GetInstancesResponse{
+                                               Response: 
discovery.CreateResponse(discovery.ErrServiceNotExists,
+                                                       
fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)),
+                                       }, nil
+                               }
+                               log.Error(fmt.Sprintf("get consumer failed, 
consumer %s find provider %s instances",
+                                       request.ConsumerServiceId, 
request.ProviderServiceId), err)
+                               return &discovery.GetInstancesResponse{
+                                       Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
+                               }, err
+                       }
+               }
+       }
+
+       provider, ok := cache.GetServiceByID(request.ProviderServiceId)
+       if !ok {
+               provider, err = dao.GetServiceByID(ctx, 
request.ProviderServiceId)
                if err != nil {
                        if errors.Is(err, datasource.ErrNoData) {
-                               log.Debug(fmt.Sprintf("consumer does not exist, 
consumer %s find provider %s instances",
+                               log.Debug(fmt.Sprintf("provider does not exist, 
consumer %s find provider %s  instances",
                                        request.ConsumerServiceId, 
request.ProviderServiceId))
                                return &discovery.GetInstancesResponse{
                                        Response: 
discovery.CreateResponse(discovery.ErrServiceNotExists,
-                                               fmt.Sprintf("Consumer[%s] does 
not exist.", request.ConsumerServiceId)),
+                                               fmt.Sprintf("provider[%s] does 
not exist.", request.ProviderServiceId)),
                                }, nil
                        }
-                       log.Error(fmt.Sprintf("get consumer failed, consumer %s 
find provider %s instances",
+                       log.Error(fmt.Sprintf("get provider failed, consumer %s 
find provider instances %s",
                                request.ConsumerServiceId, 
request.ProviderServiceId), err)
                        return &discovery.GetInstancesResponse{
                                Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
@@ -1584,23 +1616,6 @@ func (ds *DataSource) GetInstances(ctx context.Context, 
request *discovery.GetIn
                }
        }
 
-       provider, err := GetServiceByID(ctx, request.ProviderServiceId)
-       if err != nil {
-               if errors.Is(err, datasource.ErrNoData) {
-                       log.Debug(fmt.Sprintf("provider does not exist, 
consumer %s find provider %s  instances",
-                               request.ConsumerServiceId, 
request.ProviderServiceId))
-                       return &discovery.GetInstancesResponse{
-                               Response: 
discovery.CreateResponse(discovery.ErrServiceNotExists,
-                                       fmt.Sprintf("provider[%s] does not 
exist.", request.ProviderServiceId)),
-                       }, nil
-               }
-               log.Error(fmt.Sprintf("get provider failed, consumer %s find 
provider instances %s",
-                       request.ConsumerServiceId, request.ProviderServiceId), 
err)
-               return &discovery.GetInstancesResponse{
-                       Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
-               }, err
-       }
-
        findFlag := func() string {
                return fmt.Sprintf("consumer[%s][%s/%s/%s/%s] find 
provider[%s][%s/%s/%s/%s] instances",
                        request.ConsumerServiceId, service.Service.Environment, 
service.Service.AppId, service.Service.ServiceName, service.Service.Version,
@@ -1616,12 +1631,15 @@ func (ds *DataSource) GetInstances(ctx context.Context, 
request *discovery.GetIn
                        Response: 
discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()),
                }, nil
        }
-       instances, err := GetInstancesByServiceID(ctx, 
request.ProviderServiceId)
-       if err != nil {
-               log.Error(fmt.Sprintf("get instances failed %s", findFlag()), 
err)
-               return &discovery.GetInstancesResponse{
-                       Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
-               }, err
+       instances, ok := 
cache.GetMicroServiceInstancesByID(request.ProviderServiceId)
+       if !ok {
+               instances, err = dao.GetMicroServiceInstancesByID(ctx, 
request.ProviderServiceId)
+               if err != nil {
+                       log.Error(fmt.Sprintf("get instances failed %s", 
findFlag()), err)
+                       return &discovery.GetInstancesResponse{
+                               Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
+                       }, err
+               }
        }
        newRev, _ := formatRevision(request.ConsumerServiceId, instances)
        if rev == newRev {
@@ -2603,49 +2621,6 @@ func allowAcrossDimension(ctx context.Context, 
providerService *model.Service, c
        return nil
 }
 
-func GetServiceByID(ctx context.Context, serviceID string) (*model.Service, 
error) {
-       cacheService, ok := 
sd.Store().Service().Cache().Get(serviceID).(model.Service)
-       if !ok {
-               //no service in cache,get it from mongodb
-               filter := mutil.NewBasicFilter(ctx, 
mutil.ServiceServiceID(serviceID))
-               return dao.GetService(ctx, filter)
-       }
-       return cacheToService(cacheService), nil
-}
-
-func cacheToService(service model.Service) *model.Service {
-       return &model.Service{
-               Domain:  service.Domain,
-               Project: service.Project,
-               Tags:    service.Tags,
-               Service: service.Service,
-       }
-}
-
-func GetInstancesByServiceID(ctx context.Context, serviceID string) 
([]*discovery.MicroServiceInstance, error) {
-       var res []*discovery.MicroServiceInstance
-       var cacheUnavailable bool
-       cacheInstances := sd.Store().Instance().Cache().GetIndexData(serviceID)
-       for _, instID := range cacheInstances {
-               inst, ok := 
sd.Store().Instance().Cache().Get(instID).(model.Instance)
-               if !ok {
-                       cacheUnavailable = true
-                       break
-               }
-               res = append(res, inst.Instance)
-       }
-       if cacheUnavailable || len(res) == 0 {
-               filter := mutil.NewFilter(mutil.InstanceServiceID(serviceID))
-               option := &options.FindOptions{Sort: 
bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance, 
model.ColumnVersion}): -1}}
-               res, err := dao.GetMicroServiceInstances(ctx, filter, option)
-               if err != nil {
-                       return nil, err
-               }
-               return res, nil
-       }
-       return res, nil
-}
-
 func DeleteDependencyForDeleteService(domainProject string, serviceID string, 
service *discovery.MicroServiceKey) error {
        conDep := new(discovery.ConsumerDependency)
        conDep.Consumer = service
diff --git a/datasource/mongo/rule_util.go b/datasource/mongo/rule_util.go
index e4bd435..a6fbf2c 100644
--- a/datasource/mongo/rule_util.go
+++ b/datasource/mongo/rule_util.go
@@ -19,7 +19,7 @@ package mongo
 
 import (
        "context"
-
+       "github.com/apache/servicecomb-service-center/datasource/cache"
        "github.com/go-chassis/cari/discovery"
 
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
@@ -29,9 +29,13 @@ import (
 )
 
 func Filter(ctx context.Context, rules []*model.Rule, consumerID string) 
(bool, error) {
-       consumer, err := GetServiceByID(ctx, consumerID)
-       if consumer == nil {
-               return false, err
+       consumer, ok := cache.GetServiceByID(consumerID)
+       if !ok {
+               var err error
+               consumer, err = dao.GetServiceByID(ctx, consumerID)
+               if err != nil {
+                       return false, err
+               }
        }
 
        if len(rules) == 0 {
diff --git a/datasource/mongo/sd/dep_cache.go b/datasource/mongo/sd/dep_cache.go
new file mode 100644
index 0000000..9619c5f
--- /dev/null
+++ b/datasource/mongo/sd/dep_cache.go
@@ -0,0 +1,132 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+       "strings"
+
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "go.mongodb.org/mongo-driver/bson"
+)
+
+type depStore struct {
+       dirty      bool
+       d          *DocStore
+       indexCache *indexCache
+}
+
+func init() {
+       RegisterCacher(dep, newDepStore)
+}
+
+func newDepStore() *MongoCacher {
+       options := DefaultOptions().SetTable(dep)
+       cache := &depStore{
+               dirty:      false,
+               d:          NewDocStore(),
+               indexCache: NewIndexCache(),
+       }
+       depUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+               docID := MongoDocument{}
+               err := bson.Unmarshal(doc, &docID)
+               if err != nil {
+                       return
+               }
+               dep := model.DependencyRule{}
+               err = bson.Unmarshal(doc, &dep)
+               if err != nil {
+                       return
+               }
+               resource.Value = dep
+               resource.Key = docID.ID.Hex()
+               return
+       }
+       return NewMongoCacher(options, cache, depUnmarshal)
+}
+
+func (s *depStore) Name() string {
+       return dep
+}
+
+func (s *depStore) Size() int {
+       return s.d.Size()
+}
+
+func (s *depStore) Get(key string) interface{} {
+       return s.d.Get(key)
+}
+
+func (s *depStore) ForEach(iter func(k string, v interface{}) (next bool)) {
+       s.d.ForEach(iter)
+}
+
+func (s *depStore) GetValue(index string) []interface{} {
+       docs := s.indexCache.Get(index)
+       res := make([]interface{}, 0, len(docs))
+       for _, v := range docs {
+               res = append(res, s.d.Get(v))
+       }
+       return res
+}
+
+func (s *depStore) Dirty() bool {
+       return s.dirty
+}
+
+func (s *depStore) MarkDirty() {
+       s.dirty = true
+}
+
+func (s *depStore) Clear() {
+       s.dirty = false
+       s.d.store.Flush()
+}
+
+func (s *depStore) ProcessUpdate(event MongoEvent) {
+       dep, ok := event.Value.(model.DependencyRule)
+       if !ok {
+               return
+       }
+       if dep.ServiceKey == nil {
+               return
+       }
+       s.d.Put(event.DocumentID, event.Value)
+       s.indexCache.Put(genDepServiceKey(dep), event.DocumentID)
+       return
+}
+
+func (s *depStore) ProcessDelete(event MongoEvent) {
+       dep, ok := s.d.Get(event.DocumentID).(model.DependencyRule)
+       if !ok {
+               return
+       }
+       if dep.ServiceKey == nil {
+               return
+       }
+       s.d.DeleteDoc(event.DocumentID)
+       s.indexCache.Delete(genDepServiceKey(dep), event.DocumentID)
+}
+
+func (s *depStore) isValueNotUpdated(value interface{}, newValue interface{}) 
bool {
+       return false
+}
+
+func genDepServiceKey(dep model.DependencyRule) string {
+       return strings.Join([]string{dep.Type, dep.ServiceKey.AppId, 
dep.ServiceKey.ServiceName, dep.ServiceKey.Version}, "/")
+}
diff --git a/datasource/mongo/sd/depcache_test.go 
b/datasource/mongo/sd/depcache_test.go
new file mode 100644
index 0000000..fa7f47e
--- /dev/null
+++ b/datasource/mongo/sd/depcache_test.go
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package sd
+
+import (
+       "testing"
+
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+)
+
+var depCache *MongoCacher
+
+var depRule1 = model.DependencyRule{
+       Type:    "p",
+       Domain:  "default",
+       Project: "default",
+       ServiceKey: &discovery.MicroServiceKey{
+               Tenant:      "default/default",
+               Environment: "prod",
+               AppId:       "appid1",
+               ServiceName: "svc1",
+               Alias:       "alias1",
+               Version:     "1.0",
+       },
+}
+
+var depRule2 = model.DependencyRule{
+       Type:    "p",
+       Domain:  "default",
+       Project: "default",
+       ServiceKey: &discovery.MicroServiceKey{
+               Tenant:      "default/default",
+               Environment: "prod",
+               AppId:       "appid1",
+               ServiceName: "svc1",
+               Alias:       "alias1",
+               Version:     "1.0",
+       },
+}
+
+func init() {
+       depCache = newDepStore()
+}
+
+func TestDepCacheBasicFunc(t *testing.T) {
+       t.Run("init depcache, should pass", func(t *testing.T) {
+               depCache := newDepStore()
+               assert.NotNil(t, depCache)
+               assert.Equal(t, dep, depCache.cache.Name())
+       })
+       event1 := MongoEvent{
+               DocumentID: "id1",
+               Value:      depRule1,
+       }
+       event2 := MongoEvent{
+               DocumentID: "id2",
+               Value:      depRule2,
+       }
+       t.Run("update&&delete depcache, should pass", func(t *testing.T) {
+               depCache.cache.ProcessUpdate(event1)
+               assert.Equal(t, depCache.cache.Size(), 1)
+               assert.Nil(t, depCache.cache.Get("id_not_exist"))
+               assert.Equal(t, depRule1.ServiceKey.ServiceName, 
depCache.cache.Get("id1").(model.DependencyRule).ServiceKey.ServiceName)
+               assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 1)
+               depCache.cache.ProcessUpdate(event2)
+               assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 2)
+               depCache.cache.ProcessDelete(event1)
+               assert.Nil(t, depCache.cache.Get("id1"))
+               assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 1)
+               depCache.cache.ProcessDelete(event2)
+               assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 0)
+               assert.Nil(t, depCache.cache.Get("id2"))
+       })
+}
diff --git a/datasource/mongo/sd/cache.go b/datasource/mongo/sd/doc_cache.go
similarity index 53%
copy from datasource/mongo/sd/cache.go
copy to datasource/mongo/sd/doc_cache.go
index 9819041..d594238 100644
--- a/datasource/mongo/sd/cache.go
+++ b/datasource/mongo/sd/doc_cache.go
@@ -17,30 +17,44 @@
 
 package sd
 
-// Cache stores db data.
-type Cache interface {
-       CacheReader
+import "github.com/patrickmn/go-cache"
 
-       Put(id string, v interface{})
-
-       Remove(id string)
-
-       MarkDirty()
+type DocStore struct {
+       store *cache.Cache
+}
 
-       Dirty() bool
+func NewDocStore() *DocStore {
+       return &DocStore{store: cache.New(cache.NoExpiration, 0)}
+}
 
-       Clear()
+func (c *DocStore) Get(key string) (v interface{}) {
+       v, ok := c.store.Get(key)
+       if !ok {
+               return nil
+       }
+       return
 }
 
-// CacheReader reads k-v data.
-type CacheReader interface {
-       Name() string // The name of implementation
+func (c *DocStore) Put(key string, v interface{}) {
+       c.store.Set(key, v, cache.NoExpiration)
+}
 
-       Size() int // the bytes size of the cache
+func (c *DocStore) DeleteDoc(key string) {
+       c.store.Delete(key)
+}
 
-       // Get gets a value by id
-       Get(id string) interface{}
+func (c *DocStore) ForEach(iter func(k string, v interface{}) (next bool)) {
+       items := c.store.Items()
+       for k, v := range items {
+               if v.Object == nil {
+                       continue
+               }
+               if !iter(k, v) {
+                       break
+               }
+       }
+}
 
-       // ForEach executes the given function for each of the k-v
-       ForEach(iter func(k string, v interface{}) (next bool))
+func (c *DocStore) Size() int {
+       return c.store.ItemCount()
 }
diff --git a/datasource/mongo/sd/options_test.go 
b/datasource/mongo/sd/docc_test.go
similarity index 51%
copy from datasource/mongo/sd/options_test.go
copy to datasource/mongo/sd/docc_test.go
index 58fc401..47ce9a6 100644
--- a/datasource/mongo/sd/options_test.go
+++ b/datasource/mongo/sd/docc_test.go
@@ -17,45 +17,34 @@
  * under the License.
  */
 
-package sd
+package sd_test
 
 import (
        "testing"
 
-       "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/go-chassis/cari/discovery"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
        "github.com/stretchr/testify/assert"
 )
 
-func TestOptions(t *testing.T) {
-       options := Options{
-               Key: "",
-       }
-       assert.Empty(t, options, "config is empty")
+var docCache *sd.DocStore
 
-       options1 := options.SetTable("configKey")
-       assert.Equal(t, "configKey", options1.Key,
-               "contain key after method WithTable")
-
-       assert.Equal(t, 0, options1.InitSize,
-               "init size is zero")
-
-       mongoEventFunc = mongoEventFuncGet()
-
-       out := options1.String()
-       assert.NotNil(t, out,
-               "method String return not after methods")
+func init() {
+       docCache = sd.NewDocStore()
 }
 
-var mongoEventFunc MongoEventFunc
-
-func mongoEventFuncGet() MongoEventFunc {
-       fun := func(evt MongoEvent) {
-               evt.DocumentID = "DocumentID has changed"
-               evt.ResourceID = "BusinessID has changed"
-               evt.Value = 2
-               evt.Type = discovery.EVT_UPDATE
-               log.Info("in event func")
-       }
-       return fun
+func TestDocCache(t *testing.T) {
+       t.Run("init docCache,should pass", func(t *testing.T) {
+               docCache = sd.NewDocStore()
+               assert.NotNil(t, docCache)
+               assert.Nil(t, docCache.Get("id1"))
+       })
+       t.Run("update&&delete docCache, should pass", func(t *testing.T) {
+               docCache.Put("id1", "doc1")
+               assert.Equal(t, "doc1", docCache.Get("id1").(string))
+               assert.Equal(t, 1, docCache.Size())
+               docCache.Put("id2", "doc2")
+               assert.Equal(t, 2, docCache.Size())
+               docCache.DeleteDoc("id2")
+               assert.Equal(t, 1, docCache.Size())
+       })
 }
diff --git a/datasource/mongo/sd/event_proxy_test.go 
b/datasource/mongo/sd/event_proxy_test.go
index 7fb47e5..4b9b577 100644
--- a/datasource/mongo/sd/event_proxy_test.go
+++ b/datasource/mongo/sd/event_proxy_test.go
@@ -34,7 +34,6 @@ func TestAddHandleFuncAndOnEvent(t *testing.T) {
        }
        mongoEvent := MongoEvent{
                DocumentID: "",
-               ResourceID: "",
                Type:       discovery.EVT_CREATE,
                Value:      1,
        }
diff --git a/datasource/mongo/sd/cache.go b/datasource/mongo/sd/hset.go
similarity index 53%
rename from datasource/mongo/sd/cache.go
rename to datasource/mongo/sd/hset.go
index 9819041..0ee909c 100644
--- a/datasource/mongo/sd/cache.go
+++ b/datasource/mongo/sd/hset.go
@@ -17,30 +17,54 @@
 
 package sd
 
-// Cache stores db data.
-type Cache interface {
-       CacheReader
+import (
+       "sync"
+)
 
-       Put(id string, v interface{})
-
-       Remove(id string)
-
-       MarkDirty()
-
-       Dirty() bool
+type Hset struct {
+       m map[string]struct{}
+       l sync.RWMutex
+}
 
-       Clear()
+func Newhset(key string) *Hset {
+       mt := make(map[string]struct{})
+       mt[key] = struct{}{}
+       return &Hset{
+               m: mt,
+               l: sync.RWMutex{},
+       }
 }
 
-// CacheReader reads k-v data.
-type CacheReader interface {
-       Name() string // The name of implementation
+func (h *Hset) Insert(key string) {
+       h.l.Lock()
+       defer h.l.Unlock()
+       _, exist := h.m[key]
+       if exist {
+               return
+       }
+       h.m[key] = struct{}{}
+}
 
-       Size() int // the bytes size of the cache
+func (h *Hset) Del(key string) {
+       h.l.Lock()
+       defer h.l.Unlock()
+       _, exist := h.m[key]
+       if !exist {
+               return
+       }
+       delete(h.m, key)
+}
 
-       // Get gets a value by id
-       Get(id string) interface{}
+func (h *Hset) Len() int {
+       return len(h.m)
+}
 
-       // ForEach executes the given function for each of the k-v
-       ForEach(iter func(k string, v interface{}) (next bool))
+func (h *Hset) Iter() []string {
+       h.l.RLock()
+       defer h.l.RUnlock()
+       res := make([]string, 0, h.Len())
+       for k, _ := range h.m {
+               res = append(res, k)
+       }
+       return res
 }
diff --git a/datasource/mongo/sd/options_test.go 
b/datasource/mongo/sd/hset_test.go
similarity index 51%
copy from datasource/mongo/sd/options_test.go
copy to datasource/mongo/sd/hset_test.go
index 58fc401..b0ec440 100644
--- a/datasource/mongo/sd/options_test.go
+++ b/datasource/mongo/sd/hset_test.go
@@ -17,45 +17,24 @@
  * under the License.
  */
 
-package sd
+package sd_test
 
 import (
        "testing"
 
-       "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/go-chassis/cari/discovery"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
        "github.com/stretchr/testify/assert"
 )
 
-func TestOptions(t *testing.T) {
-       options := Options{
-               Key: "",
-       }
-       assert.Empty(t, options, "config is empty")
-
-       options1 := options.SetTable("configKey")
-       assert.Equal(t, "configKey", options1.Key,
-               "contain key after method WithTable")
-
-       assert.Equal(t, 0, options1.InitSize,
-               "init size is zero")
-
-       mongoEventFunc = mongoEventFuncGet()
-
-       out := options1.String()
-       assert.NotNil(t, out,
-               "method String return not after methods")
-}
-
-var mongoEventFunc MongoEventFunc
-
-func mongoEventFuncGet() MongoEventFunc {
-       fun := func(evt MongoEvent) {
-               evt.DocumentID = "DocumentID has changed"
-               evt.ResourceID = "BusinessID has changed"
-               evt.Value = 2
-               evt.Type = discovery.EVT_UPDATE
-               log.Info("in event func")
-       }
-       return fun
+func TestHset(t *testing.T) {
+       hset := sd.Newhset("key1")
+       assert.NotNil(t, hset)
+       assert.Equal(t, 1, hset.Len())
+       hset.Insert("key1")
+       assert.Equal(t, 1, hset.Len())
+       hset.Insert("key2")
+       assert.Equal(t, 2, hset.Len())
+       hset.Del("key1")
+       assert.Equal(t, 1, hset.Len())
+       assert.Equal(t, []string{"key2"}, hset.Iter())
 }
diff --git a/datasource/mongo/sd/index_cache.go 
b/datasource/mongo/sd/index_cache.go
new file mode 100644
index 0000000..1977bb8
--- /dev/null
+++ b/datasource/mongo/sd/index_cache.go
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+       "github.com/patrickmn/go-cache"
+)
+
+type indexCache struct {
+       store *cache.Cache
+}
+
+func NewIndexCache() *indexCache {
+       return &indexCache{
+               store: cache.New(cache.NoExpiration, 0),
+       }
+}
+
+func (i *indexCache) Get(key string) []string {
+       if v, found := i.store.Get(key); found {
+               hset, ok := v.(*Hset)
+               if ok {
+                       return hset.Iter()
+               }
+       }
+       return nil
+}
+
+func (i *indexCache) Put(key string, value string) {
+       //todo this should be atomic
+       v, found := i.store.Get(key)
+       if !found {
+               i.store.Set(key, Newhset(value), cache.NoExpiration)
+               return
+       }
+       set, ok := v.(*Hset)
+       if !ok {
+               return
+       }
+       set.Insert(value)
+}
+
+func (i *indexCache) Delete(key string, value string) {
+       v, found := i.store.Get(key)
+       if !found {
+               return
+       }
+       set, ok := v.(*Hset)
+       if !ok {
+               return
+       }
+       set.Del(value)
+       if set.Len() == 0 {
+               i.store.Delete(key)
+       }
+}
diff --git a/datasource/mongo/sd/indexc_test.go 
b/datasource/mongo/sd/indexc_test.go
new file mode 100644
index 0000000..169a7d7
--- /dev/null
+++ b/datasource/mongo/sd/indexc_test.go
@@ -0,0 +1,21 @@
+package sd_test
+
+import (
+       "testing"
+
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestIndexCache(t *testing.T) {
+       indexCache := sd.NewIndexCache()
+       assert.NotNil(t, indexCache)
+       indexCache.Put("index1", "doc1")
+       assert.Equal(t, []string{"doc1"}, indexCache.Get("index1"))
+       indexCache.Put("index1", "doc2")
+       assert.Len(t, indexCache.Get("index1"), 2)
+       indexCache.Delete("index1", "doc1")
+       assert.Len(t, indexCache.Get("index1"), 1)
+       indexCache.Delete("index1", "doc2")
+       assert.Nil(t, indexCache.Get("index1"))
+}
diff --git a/datasource/mongo/sd/instance_cache.go 
b/datasource/mongo/sd/instance_cache.go
new file mode 100644
index 0000000..a053cec
--- /dev/null
+++ b/datasource/mongo/sd/instance_cache.go
@@ -0,0 +1,129 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       rmodel "github.com/go-chassis/cari/discovery"
+       "go.mongodb.org/mongo-driver/bson"
+)
+
+type instanceStore struct {
+       dirty      bool
+       d          *DocStore
+       indexCache *indexCache
+}
+
+func init() {
+       RegisterCacher(instance, newInstanceStore)
+}
+
+func newInstanceStore() *MongoCacher {
+       options := DefaultOptions().SetTable(instance)
+       cache := &instanceStore{
+               dirty:      false,
+               d:          NewDocStore(),
+               indexCache: NewIndexCache(),
+       }
+       instanceUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+               docID := MongoDocument{}
+               err := bson.Unmarshal(doc, &docID)
+               if err != nil {
+                       return
+               }
+               inst := model.Instance{}
+               err = bson.Unmarshal(doc, &inst)
+               if err != nil {
+                       return
+               }
+               resource.Value = inst
+               resource.Key = docID.ID.Hex()
+               return
+       }
+       return NewMongoCacher(options, cache, instanceUnmarshal)
+}
+
+func (s *instanceStore) Name() string {
+       return instance
+}
+
+func (s *instanceStore) Size() int {
+       return s.d.Size()
+}
+
+func (s *instanceStore) Get(key string) interface{} {
+       return s.d.Get(key)
+}
+
+func (s *instanceStore) ForEach(iter func(k string, v interface{}) (next 
bool)) {
+       s.d.ForEach(iter)
+}
+
+func (s *instanceStore) GetValue(index string) []interface{} {
+       docs := s.indexCache.Get(index)
+       res := make([]interface{}, 0, len(docs))
+       for _, v := range docs {
+               res = append(res, s.d.Get(v))
+       }
+       return res
+}
+
+func (s *instanceStore) Dirty() bool {
+       return s.dirty
+}
+
+func (s *instanceStore) MarkDirty() {
+       s.dirty = true
+}
+
+func (s *instanceStore) Clear() {
+       s.dirty = false
+       s.d.store.Flush()
+}
+
+func (s *instanceStore) ProcessUpdate(event MongoEvent) {
+       //instance only process create and del event.
+       if event.Type == rmodel.EVT_UPDATE {
+               return
+       }
+       inst, ok := event.Value.(model.Instance)
+       if !ok {
+               return
+       }
+       s.d.Put(event.DocumentID, event.Value)
+       s.indexCache.Put(genInstServiceID(inst), event.DocumentID)
+       return
+}
+
+func (s *instanceStore) ProcessDelete(event MongoEvent) {
+       inst, ok := s.d.Get(event.DocumentID).(model.Instance)
+       if !ok {
+               return
+       }
+       s.d.DeleteDoc(event.DocumentID)
+       s.indexCache.Delete(genInstServiceID(inst), event.DocumentID)
+}
+
+func (s *instanceStore) isValueNotUpdated(value interface{}, newValue 
interface{}) bool {
+       return true
+}
+
+func genInstServiceID(inst model.Instance) string {
+       return inst.Instance.ServiceId
+}
diff --git a/datasource/mongo/sd/instancec_test.go 
b/datasource/mongo/sd/instancec_test.go
new file mode 100644
index 0000000..9b2501d
--- /dev/null
+++ b/datasource/mongo/sd/instancec_test.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package sd
+
+import (
+       "testing"
+
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+)
+
+var instanceCache *MongoCacher
+
+var inst1 = model.Instance{
+       Domain:  "default",
+       Project: "default",
+       Instance: &discovery.MicroServiceInstance{
+               InstanceId: "123456789",
+               ServiceId:  "svcid",
+       },
+}
+
+var inst2 = model.Instance{
+       Domain:  "default",
+       Project: "default",
+       Instance: &discovery.MicroServiceInstance{
+               InstanceId: "987654321",
+               ServiceId:  "svcid",
+       },
+}
+
+func init() {
+       instanceCache = newInstanceStore()
+}
+
+func TestInstCacheBasicFunc(t *testing.T) {
+       t.Run("init instCache,should pass", func(t *testing.T) {
+               instanceCache := newInstanceStore()
+               assert.NotNil(t, instanceCache)
+               assert.Equal(t, instance, instanceCache.cache.Name())
+       })
+       event1 := MongoEvent{
+               DocumentID: "id1",
+               Value:      inst1,
+       }
+       event2 := MongoEvent{
+               DocumentID: "id2",
+               Value:      inst2,
+       }
+       t.Run("update&&delete instCache, should pass", func(t *testing.T) {
+               instanceCache.cache.ProcessUpdate(event1)
+               assert.Equal(t, instanceCache.cache.Size(), 1)
+               assert.Nil(t, instanceCache.cache.Get("id_not_exist"))
+               assert.Equal(t, inst1.Instance.InstanceId, 
instanceCache.cache.Get("id1").(model.Instance).Instance.InstanceId)
+               assert.Len(t, instanceCache.cache.GetValue("svcid"), 1)
+               instanceCache.cache.ProcessUpdate(event2)
+               assert.Equal(t, instanceCache.cache.Size(), 2)
+               assert.Len(t, instanceCache.cache.GetValue("svcid"), 2)
+               instanceCache.cache.ProcessDelete(event1)
+               assert.Nil(t, instanceCache.cache.Get("id1"))
+               assert.Len(t, instanceCache.cache.GetValue("svcid"), 1)
+               instanceCache.cache.ProcessDelete(event2)
+               assert.Len(t, instanceCache.cache.GetValue("svcid"), 0)
+               assert.Nil(t, instanceCache.cache.Get("id2"))
+               assert.Len(t, instanceCache.cache.GetValue("svcid"), 0)
+       })
+}
diff --git a/datasource/mongo/sd/listwatch_inner.go 
b/datasource/mongo/sd/listwatch_inner.go
index da4612e..3d567aa 100644
--- a/datasource/mongo/sd/listwatch_inner.go
+++ b/datasource/mongo/sd/listwatch_inner.go
@@ -26,15 +26,16 @@ import (
        "go.mongodb.org/mongo-driver/mongo/options"
 
        "github.com/apache/servicecomb-service-center/datasource/mongo/client"
-       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
        "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
+type parsefunc func(doc bson.Raw) (resource sdcommon.Resource)
+
 type mongoListWatch struct {
        Key         string
        resumeToken bson.Raw
+       parseFunc   parsefunc
 }
 
 func (lw *mongoListWatch) List(op sdcommon.ListWatchConfig) 
(*sdcommon.ListWatchResp, error) {
@@ -52,7 +53,7 @@ func (lw *mongoListWatch) List(op sdcommon.ListWatchConfig) 
(*sdcommon.ListWatch
        lwRsp.Resources = make([]*sdcommon.Resource, 0)
 
        for resp.Next(context.Background()) {
-               info := lw.doParseDocumentToResource(resp.Current)
+               info := lw.parseFunc(resp.Current)
                lwRsp.Resources = append(lwRsp.Resources, &info)
        }
 
@@ -118,53 +119,6 @@ func (lw *mongoListWatch) DoWatch(ctx context.Context, f 
func(*sdcommon.ListWatc
        return err
 }
 
-func (lw *mongoListWatch) doParseDocumentToResource(fullDocument bson.Raw) 
(resource sdcommon.Resource) {
-       var err error
-
-       documentID := MongoDocument{}
-       err = bson.Unmarshal(fullDocument, &documentID)
-       if err != nil {
-               return
-       }
-
-       resource.DocumentID = documentID.ID.Hex()
-
-       switch lw.Key {
-       case instance:
-               instance := model.Instance{}
-               err = bson.Unmarshal(fullDocument, &instance)
-               if err != nil {
-                       log.Error("error to parse bson raw to documentInfo", 
err)
-                       return
-               }
-               if instance.Instance == nil {
-                       log.Error(fmt.Sprintf("unexpect instance value,the 
documentID is %s", resource.DocumentID), nil)
-                       return
-               }
-               resource.Key = instance.Instance.InstanceId
-               resource.Value = instance
-               resource.Index = instance.Instance.ServiceId
-       case service:
-               service := model.Service{}
-               err := bson.Unmarshal(fullDocument, &service)
-               if err != nil {
-                       log.Error("error to parse bson raw to documentInfo", 
err)
-                       return
-               }
-               if service.Service == nil {
-                       log.Error(fmt.Sprintf("unexpect service value,the 
documentID is %s", resource.DocumentID), nil)
-                       return
-               }
-               resource.Key = service.Service.ServiceId
-               resource.Value = service
-               resource.Index = util.StringJoin([]string{service.Domain, 
service.Project, service.Service.ServiceName, service.Service.Version, 
service.Service.AppId, service.Service.Environment}, "/")
-       default:
-               return
-       }
-
-       return
-}
-
 func (lw *mongoListWatch) ResumeToken() bson.Raw {
        return lw.resumeToken
 }
@@ -177,10 +131,10 @@ func (lw *mongoListWatch) doParseWatchRspToResource(wRsp 
*MongoWatchResponse) (r
        switch wRsp.OperationType {
        case deleteOp:
                //delete operation has no fullDocumentValue
-               resource.DocumentID = wRsp.DocumentKey.ID.Hex()
+               resource.Key = wRsp.DocumentKey.ID.Hex()
                return
        case insertOp, updateOp, replaceOp:
-               return lw.doParseDocumentToResource(wRsp.FullDocument)
+               return lw.parseFunc(wRsp.FullDocument)
        default:
                log.Warn(fmt.Sprintf("unrecognized operation:%s", 
wRsp.OperationType))
        }
diff --git a/datasource/mongo/sd/listwatch_test.go 
b/datasource/mongo/sd/listwatch_test.go
index f4e8232..a7a681c 100644
--- a/datasource/mongo/sd/listwatch_test.go
+++ b/datasource/mongo/sd/listwatch_test.go
@@ -61,23 +61,35 @@ func TestDoParseWatchRspToMongoInfo(t *testing.T) {
        }
        ilw := mongoListWatch{
                Key: instance,
+               parseFunc: func(doc bson.Raw) (resource sdcommon.Resource) {
+                       docID := MongoDocument{}
+                       err := bson.Unmarshal(doc, &docID)
+                       if err != nil {
+                               return
+                       }
+                       service := model.Instance{}
+                       err = bson.Unmarshal(doc, &service)
+                       if err != nil {
+                               return
+                       }
+                       resource.Value = service
+                       resource.Key = docID.ID.Hex()
+                       return
+               },
        }
        info := ilw.doParseWatchRspToResource(mockWatchRsp)
-       assert.Equal(t, documentID.Hex(), info.DocumentID)
-       assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+       assert.Equal(t, documentID.Hex(), info.Key)
 
        // case updateOp
        mockWatchRsp.OperationType = updateOp
        info = ilw.doParseWatchRspToResource(mockWatchRsp)
-       assert.Equal(t, documentID.Hex(), info.DocumentID)
-       assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+       assert.Equal(t, documentID.Hex(), info.Key)
        assert.Equal(t, "1608552622", 
info.Value.(model.Instance).Instance.ModTimestamp)
 
        // case delete
        mockWatchRsp.OperationType = deleteOp
        info = ilw.doParseWatchRspToResource(mockWatchRsp)
-       assert.Equal(t, documentID.Hex(), info.DocumentID)
-       assert.Equal(t, "", info.Key)
+       assert.Equal(t, documentID.Hex(), info.Key)
 
        // case service insertOp
        mockWatchRsp = &MongoWatchResponse{OperationType: insertOp,
@@ -86,8 +98,7 @@ func TestDoParseWatchRspToMongoInfo(t *testing.T) {
        }
        ilw.Key = service
        info = ilw.doParseWatchRspToResource(mockWatchRsp)
-       assert.Equal(t, documentID.Hex(), info.DocumentID)
-       assert.Equal(t, "91afbe0faa9dc1594689139f099eb293b0cd048d", info.Key)
+       assert.Equal(t, documentID.Hex(), info.Key)
 }
 
 func TestInnerListWatch_ResumeToken(t *testing.T) {
diff --git a/datasource/mongo/sd/mongo_cache.go 
b/datasource/mongo/sd/mongo_cache.go
index 1b6a895..7395d4e 100644
--- a/datasource/mongo/sd/mongo_cache.go
+++ b/datasource/mongo/sd/mongo_cache.go
@@ -17,170 +17,27 @@
 
 package sd
 
-import (
-       "github.com/patrickmn/go-cache"
-)
-
-// MongoCache implements Cache.
-// MongoCache is dedicated to stores service discovery data,
-// e.g. service, instance, lease.
-// the docStore consists of two parts.
-// 1. documentID --> bussinessID
-// 2. bussinessID --> documentID
-// the store consists of two parts.
-// 1. index --> bussinessID list
-// 2. bussinessID --> index
-type MongoCache struct {
-       Options    *Options
-       name       string
-       store      *cache.Cache
-       docStore   *cache.Cache
-       indexStore *cache.Cache
-       dirty      bool
-}
-
-func (c *MongoCache) Name() string {
-       return c.name
-}
-
-func (c *MongoCache) Size() (l int) {
-       return c.store.ItemCount()
-}
-
-func (c *MongoCache) Get(id string) (v interface{}) {
-       v, _ = c.store.Get(id)
-       return
-}
-
-func (c *MongoCache) GetKeyByDocumentID(documentKey string) (id string) {
-       if v, f := c.docStore.Get(documentKey); f {
-               t, ok := v.(string)
-               if ok {
-                       id = t
-               }
-       }
-       return
-}
-
-func (c *MongoCache) GetDocumentIDByBussinessID(id string) (documentID string) 
{
-       v, f := c.docStore.Get(id)
-       if f {
-               if id, ok := v.(string); ok {
-                       documentID = id
-               }
-       }
-       return
-}
-
-func (c *MongoCache) Put(id string, v interface{}) {
-       c.store.Set(id, v, cache.NoExpiration)
-}
-
-func (c *MongoCache) PutDocumentID(id string, documentID string) {
-       //store docID-->ID&ID-->docID
-       c.docStore.Set(documentID, id, cache.NoExpiration)
-       c.docStore.Set(id, documentID, cache.NoExpiration)
+type MongoCache interface {
+       MongoCacheReader
+       Dirty() bool
+       Clear()
+       MarkDirty()
+       ProcessUpdate(event MongoEvent)
+       ProcessDelete(event MongoEvent)
+       isValueNotUpdated(value interface{}, newValue interface{}) bool
 }
 
-func (c *MongoCache) Remove(id string) {
-       c.store.Delete(id)
-       c.docStore.Delete(id)
-       c.indexStore.Delete(id)
-}
-
-func (c *MongoCache) RemoveDocumentID(documentID string) {
-       c.docStore.Delete(documentID)
-}
-
-func (c *MongoCache) GetIndexData(index string) (res []string) {
-       if p, found := c.indexStore.Get(index); found {
-               res, ok := p.([]string)
-               if ok {
-                       return res
-               }
-       }
-       return
-}
+type MongoCacheReader interface {
+       Name() string // The name of implementation
 
-func (c *MongoCache) GetIndexByBussinessID(id string) (index string) {
-       if v, found := c.indexStore.Get(id); found {
-               if t, ok := v.(string); ok {
-                       index = t
-               }
-       }
-       return
-}
+       Size() int // the bytes size of the cache
 
-func (c *MongoCache) PutIndex(index string, newID string) {
-       v, found := c.indexStore.Get(index)
-       if !found {
-               c.indexStore.Set(index, []string{newID}, cache.NoExpiration)
-       } else {
-               if ids, ok := v.([]string); ok {
-                       for _, id := range ids {
-                               if id == newID {
-                                       return
-                               }
-                       }
-                       ids = append(ids, newID)
-                       c.indexStore.Set(index, ids, cache.NoExpiration)
-               }
-       }
-       //set id-->index for filterdelete
-       c.indexStore.Set(newID, index, cache.NoExpiration)
-}
+       // Get gets a value by docid
+       Get(id string) interface{}
 
-func (c *MongoCache) RemoveIndex(index string, oldID string) {
-       if v, found := c.indexStore.Get(index); found {
-               ids, ok := v.([]string)
-               if ok {
-                       var newIDs []string
-                       for _, id := range ids {
-                               if id == oldID {
-                                       continue
-                               }
-                               newIDs = append(newIDs, id)
-                       }
-                       if len(newIDs) == 0 {
-                               c.indexStore.Delete(index)
-                       } else {
-                               c.indexStore.Set(index, newIDs, 
cache.NoExpiration)
-                       }
-               }
-       }
-}
-
-func (c *MongoCache) MarkDirty() {
-       c.dirty = true
-}
-
-func (c *MongoCache) Dirty() bool { return c.dirty }
-
-func (c *MongoCache) Clear() {
-       c.dirty = false
-       c.store.Flush()
-       c.docStore.Flush()
-       c.indexStore.Flush()
-}
-
-func (c *MongoCache) ForEach(iter func(k string, v interface{}) (next bool)) {
-       items := c.store.Items()
-       for k, v := range items {
-               if v.Object == nil {
-                       continue
-               }
-               if !iter(k, v) {
-                       break
-               }
-       }
-}
+       // GetValue gets a value by index
+       GetValue(index string) []interface{}
 
-func NewMongoCache(name string, options *Options) *MongoCache {
-       return &MongoCache{
-               Options:    options,
-               name:       name,
-               store:      cache.New(cache.NoExpiration, 0),
-               docStore:   cache.New(cache.NoExpiration, 0),
-               indexStore: cache.New(cache.NoExpiration, 0),
-       }
+       // ForEach executes the given function for each of the k-v
+       ForEach(iter func(k string, v interface{}) (next bool))
 }
diff --git a/datasource/mongo/sd/mongo_cacher.go 
b/datasource/mongo/sd/mongo_cacher.go
index 0faeeac..986d15f 100644
--- a/datasource/mongo/sd/mongo_cacher.go
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -26,7 +26,6 @@ import (
 
        rmodel "github.com/go-chassis/cari/discovery"
 
-       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
        "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/apache/servicecomb-service-center/pkg/backoff"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
@@ -43,7 +42,7 @@ type MongoCacher struct {
        Options     *Options
        reListCount int
        isFirstTime bool
-       cache       *MongoCache
+       cache       MongoCache
        ready       chan struct{}
        lw          sdcommon.ListWatch
        mux         sync.Mutex
@@ -51,7 +50,7 @@ type MongoCacher struct {
        goroutine   *gopool.Pool
 }
 
-func (c *MongoCacher) Cache() *MongoCache {
+func (c *MongoCacher) Cache() MongoCache {
        return c.cache
 }
 
@@ -202,7 +201,6 @@ func (c *MongoCacher) handleEventBus(eventbus 
*sdcommon.EventBus) error {
                        case sdcommon.ActionUpdate:
                                event = NewMongoEventByResource(resource, 
rmodel.EVT_UPDATE)
                        case sdcommon.ActionDelete:
-                               resource.Key = 
c.cache.GetKeyByDocumentID(resource.DocumentID)
                                resource.Value = c.cache.Get(resource.Key)
                                event = NewMongoEventByResource(resource, 
rmodel.EVT_DELETE)
                        }
@@ -247,28 +245,21 @@ func (c *MongoCacher) sync(evts []MongoEvent) {
                return
        }
 
-       c.onEvents(evts)
+       go c.onEvents(evts)
 }
 
 func (c *MongoCacher) filter(infos []*sdcommon.Resource) []MongoEvent {
        nc := len(infos)
        newStore := make(map[string]interface{}, nc)
-       documentIDRecord := make(map[string]string, nc)
-       indexRecord := make(map[string]string, nc)
-
        for _, info := range infos {
-               event := NewMongoEventByResource(info, rmodel.EVT_CREATE)
-               newStore[event.ResourceID] = info.Value
-               documentIDRecord[event.ResourceID] = info.DocumentID
-               indexRecord[event.ResourceID] = info.Index
+               newStore[info.Key] = info.Value
        }
 
        filterStopCh := make(chan struct{})
        eventsCh := make(chan [sdcommon.EventBlockSize]MongoEvent, 2)
-
        go c.filterDelete(newStore, eventsCh, filterStopCh)
 
-       go c.filterCreateOrUpdate(newStore, documentIDRecord, indexRecord, 
eventsCh, filterStopCh)
+       go c.filterCreateOrUpdate(newStore, eventsCh, filterStopCh)
 
        events := make([]MongoEvent, 0, nc)
        for block := range eventsCh {
@@ -303,9 +294,7 @@ func (c *MongoCacher) filterDelete(newStore 
map[string]interface{},
                        i = 0
                }
 
-               documentID := c.cache.GetDocumentIDByBussinessID(k)
-               index := c.cache.GetIndexByBussinessID(k)
-               block[i] = NewMongoEvent(k, documentID, index, 
rmodel.EVT_DELETE, v)
+               block[i] = NewMongoEvent(k, rmodel.EVT_DELETE, v)
                i++
                return
        })
@@ -317,8 +306,7 @@ func (c *MongoCacher) filterDelete(newStore 
map[string]interface{},
        close(filterStopCh)
 }
 
-func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{}, 
newDocumentStore map[string]string, indexRecord map[string]string,
-       eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan 
struct{}) {
+func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{}, 
eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan struct{}) {
        var block [sdcommon.EventBlockSize]MongoEvent
        i := 0
 
@@ -331,13 +319,13 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore 
map[string]interface{}, newD
                                i = 0
                        }
 
-                       block[i] = NewMongoEvent(k, newDocumentStore[k], 
indexRecord[k], rmodel.EVT_CREATE, v)
+                       block[i] = NewMongoEvent(k, rmodel.EVT_CREATE, v)
                        i++
 
                        continue
                }
 
-               if c.isValueNotUpdated(v, ov) {
+               if c.cache.isValueNotUpdated(v, ov) {
                        continue
                }
 
@@ -348,8 +336,7 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore 
map[string]interface{}, newD
                        block = [sdcommon.EventBlockSize]MongoEvent{}
                        i = 0
                }
-
-               block[i] = NewMongoEvent(k, newDocumentStore[k], 
indexRecord[k], rmodel.EVT_UPDATE, v)
+               block[i] = NewMongoEvent(k, rmodel.EVT_UPDATE, v)
                i++
        }
 
@@ -362,36 +349,6 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore 
map[string]interface{}, newD
        close(eventsCh)
 }
 
-func (c *MongoCacher) isValueNotUpdated(value interface{}, newValue 
interface{}) bool {
-       var modTime string
-       var newModTime string
-
-       switch c.Options.Key {
-       case instance:
-               instance := value.(model.Instance)
-               newInstance := newValue.(model.Instance)
-               if instance.Instance == nil || newInstance.Instance == nil {
-                       return true
-               }
-               modTime = instance.Instance.ModTimestamp
-               newModTime = newInstance.Instance.ModTimestamp
-       case service:
-               service := value.(model.Service)
-               newService := newValue.(model.Service)
-               if service.Service == nil || newService.Service == nil {
-                       return true
-               }
-               modTime = service.Service.ModTimestamp
-               newModTime = newService.Service.ModTimestamp
-       }
-
-       if newModTime == "" || modTime == newModTime {
-               return true
-       }
-
-       return false
-}
-
 func (c *MongoCacher) onEvents(events []MongoEvent) {
        c.buildCache(events)
 
@@ -400,7 +357,7 @@ func (c *MongoCacher) onEvents(events []MongoEvent) {
 
 func (c *MongoCacher) buildCache(events []MongoEvent) {
        for i, evt := range events {
-               key := evt.ResourceID
+               key := evt.DocumentID
                value := c.cache.Get(key)
                ok := value != nil
 
@@ -418,11 +375,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
                                        evt.Type, rmodel.EVT_UPDATE, key))
                                evt.Type = rmodel.EVT_UPDATE
                        }
-
-                       c.cache.Put(key, evt.Value)
-                       c.cache.PutDocumentID(key, evt.DocumentID)
-                       c.cache.PutIndex(evt.Index, evt.ResourceID)
-
+                       c.cache.ProcessUpdate(evt)
                        events[i] = evt
                case rmodel.EVT_DELETE:
                        if !ok {
@@ -430,10 +383,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
                                        evt.Type, key))
                        } else {
                                evt.Value = value
-
-                               c.cache.Remove(key)
-                               c.cache.RemoveDocumentID(evt.DocumentID)
-                               c.cache.RemoveIndex(evt.Index, evt.ResourceID)
+                               c.cache.ProcessDelete(evt)
                        }
                        events[i] = evt
                }
@@ -451,21 +401,22 @@ func (c *MongoCacher) notify(evts []MongoEvent) {
 
        for _, evt := range evts {
                if evt.Type == rmodel.EVT_DELETE && evt.Value == nil {
-                       log.Warn(fmt.Sprintf("caught delete event:%s, but value 
can't get from caches, it may be deleted by last list", evt.ResourceID))
+                       log.Warn(fmt.Sprintf("caught delete event:%s, but value 
can't get from caches, it may be deleted by last list", evt.DocumentID))
                        continue
                }
                eventProxy.OnEvent(evt)
        }
 }
 
-func NewMongoCacher(options *Options, cache *MongoCache) *MongoCacher {
+func NewMongoCacher(options *Options, cache MongoCache, pf parsefunc) 
*MongoCacher {
        return &MongoCacher{
                Options:     options,
                isFirstTime: true,
                cache:       cache,
                ready:       make(chan struct{}),
                lw: &mongoListWatch{
-                       Key: options.Key,
+                       Key:       options.Key,
+                       parseFunc: pf,
                },
                goroutine: gopool.New(context.Background()),
        }
diff --git a/datasource/mongo/sd/mongo_cacher_test.go 
b/datasource/mongo/sd/mongo_cacher_test.go
deleted file mode 100644
index 6fc9870..0000000
--- a/datasource/mongo/sd/mongo_cacher_test.go
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package sd
-
-import (
-       "context"
-       "fmt"
-       "testing"
-
-       pb "github.com/go-chassis/cari/discovery"
-       "github.com/stretchr/testify/assert"
-       "go.mongodb.org/mongo-driver/bson"
-
-       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
-       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
-       "github.com/apache/servicecomb-service-center/pkg/gopool"
-)
-
-type MockListWatch struct {
-       ListResponse  *sdcommon.ListWatchResp
-       WatchResponse *sdcommon.ListWatchResp
-       resumeToken   bson.Raw
-}
-
-func (lw *MockListWatch) List(sdcommon.ListWatchConfig) 
(*sdcommon.ListWatchResp, error) {
-       if lw.ListResponse == nil {
-               return nil, fmt.Errorf("list error")
-       }
-       return lw.ListResponse, nil
-}
-
-func (lw *MockListWatch) DoWatch(ctx context.Context, f 
func(*sdcommon.ListWatchResp)) error {
-       if lw.WatchResponse == nil {
-               return fmt.Errorf("error")
-       }
-
-       f(lw.WatchResponse)
-       <-ctx.Done()
-       return nil
-}
-
-func (lw *MockListWatch) EventBus(op sdcommon.ListWatchConfig) 
*sdcommon.EventBus {
-       return sdcommon.NewEventBus(lw, op)
-}
-
-func TestNewMongoCacher(t *testing.T) {
-       mockMongoCache := NewMongoCache("test", DefaultOptions())
-       lw := &MockListWatch{}
-
-       cr := &MongoCacher{
-               Options:     DefaultOptions(),
-               ready:       make(chan struct{}),
-               isFirstTime: true,
-               lw:          lw,
-               goroutine:   gopool.New(context.Background()),
-               cache:       mockMongoCache,
-       }
-
-       ctx, cancel := context.WithCancel(context.Background())
-       cancel()
-
-       // case: cause list internal error before initialized
-       t.Run("case list: internal error before initialized", func(t 
*testing.T) {
-               cr.refresh(ctx)
-               if cr.IsReady() {
-                       t.Fatalf("TestNewKvCacher failed")
-               }
-
-       })
-
-       // prepare mock data
-       var evt MongoEvent
-       cr = &MongoCacher{
-               Options:   DefaultOptions().SetTable(instance),
-               ready:     make(chan struct{}),
-               lw:        lw,
-               goroutine: gopool.New(context.Background()),
-               cache:     mockMongoCache,
-       }
-
-       EventProxy(instance).AddHandleFunc(func(e MongoEvent) {
-               evt = e
-       })
-
-       mockDocumentID := "5fcf2f1a4ea1e6d2f4c61d47"
-       mockResourceID := "95cd8dbf3e8411eb92d8fa163e8a81c9"
-       mockResumeToken, _ :=
-               bson.Marshal(bson.M{"_data": 
"825FDB4272000000012B022C0100296E5A10043E2D15AC82D9484C8090E68AF36FED2A46645F696400645FD76265066A6D2DF2AAC8D80004"})
-
-       var resources []*sdcommon.Resource
-       resource := &sdcommon.Resource{Key: mockResourceID, DocumentID: 
mockDocumentID, Value: model.Instance{Domain: "default", Project: "default",
-               Instance: &pb.MicroServiceInstance{InstanceId: mockResourceID, 
ModTimestamp: "100000"}}}
-       resources = append(resources, resource)
-       test := &sdcommon.ListWatchResp{
-               Action:    sdcommon.ActionCreate,
-               Resources: resources,
-       }
-
-       evtExpect := MongoEvent{
-               DocumentID: mockDocumentID,
-               ResourceID: mockResourceID,
-               Value:      resource.Value,
-               Type:       pb.EVT_INIT,
-       }
-
-       // case: list 1 resp and watch 0 event
-       t.Run("case list: first time list init cache", func(t *testing.T) {
-               // case: resume token is nil, first time list event is init
-               cr.isFirstTime = true
-               cr.cache.Remove(mockResourceID)
-               cr.cache.RemoveDocumentID(mockDocumentID)
-
-               lw.ListResponse = test
-               lw.resumeToken = nil
-               lw.WatchResponse = nil
-
-               cr.refresh(ctx)
-
-               // check ready
-               assert.Equal(t, true, cr.IsReady())
-
-               //check config
-               assert.Equal(t, instance, cr.Options.Key)
-
-               // check event
-               assert.Equal(t, evtExpect, evt)
-
-               // check cache
-               cache := cr.cache.Get(mockResourceID)
-               assert.Equal(t, resource.Value, cache)
-       })
-
-       t.Run("case list: re-list and updateOp cache", func(t *testing.T) {
-               // case: re-list and should be no event
-               lw.WatchResponse = nil
-               evt.Value = nil
-               cr.refresh(ctx)
-
-               //check events
-               assert.Nil(t, evt.Value)
-
-               // check cache
-               cache := cr.cache.Get(mockResourceID)
-               assert.Equal(t, resource.Value, cache)
-
-               // prepare updateOp data
-               dataUpdate := &sdcommon.Resource{Key: mockResourceID, 
DocumentID: mockDocumentID,
-                       Value: model.Instance{Domain: "default", Project: 
"default",
-                               Instance: &pb.MicroServiceInstance{InstanceId: 
mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
-
-               var mongoUpdateResources []*sdcommon.Resource
-               mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
-               testUpdate := &sdcommon.ListWatchResp{
-                       Action:    sdcommon.ActionUpdate,
-                       Resources: mongoUpdateResources,
-               }
-
-               lw.ListResponse = testUpdate
-               lw.resumeToken = mockResumeToken
-
-               // case: re-list and over no event times, and then event should 
be updateOp
-               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
-                       lw.WatchResponse = nil
-               }
-
-               evt.Value = nil
-               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
-                       cr.refresh(ctx)
-               }
-               // check event
-               evtExpect.Type = pb.EVT_UPDATE
-               evtExpect.Value = dataUpdate.Value
-               assert.Equal(t, evtExpect, evt)
-
-               // check cache
-               cache = cr.cache.Get(mockResourceID)
-               assert.Equal(t, dataUpdate.Value, cache)
-       })
-
-       t.Run("case list: no infos list and delete cache", func(t *testing.T) {
-               // case: no infos list, event should be delete
-               lw.ListResponse = &sdcommon.ListWatchResp{}
-               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
-                       lw.WatchResponse = nil
-               }
-               evt.Value = nil
-               for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
-                       cr.refresh(ctx)
-               }
-
-               // check event
-               evtExpect.Type = pb.EVT_DELETE
-               assert.Equal(t, evtExpect, evt)
-
-               // check cache
-               cache := cr.cache.Get(mockResourceID)
-               assert.Nil(t, nil, cache)
-       })
-
-       t.Run("case list: mark cache dirty and reset cache", func(t *testing.T) 
{
-               lw.ListResponse = test
-               cr.isFirstTime = true
-               evt.Value = nil
-
-               cr.cache.MarkDirty()
-               cr.refresh(ctx)
-
-               // check event
-               if evt.Value != nil {
-                       t.Fatalf("TestNewMongoCacher failed, %v", evt)
-               }
-
-               // check cache
-               cache := cr.cache.Get(mockResourceID)
-               assert.Equal(t, resource.Value, cache)
-       })
-
-       t.Run("case watch: caught create event", func(t *testing.T) {
-               cr.cache.Remove(mockResourceID)
-               cr.cache.RemoveDocumentID(mockDocumentID)
-               lw.WatchResponse = test
-               lw.ListResponse = nil
-               lw.resumeToken = mockResumeToken
-
-               cr.refresh(ctx)
-
-               // check event
-               evtExpect.Type = pb.EVT_CREATE
-               evtExpect.Value = resource.Value
-               assert.Equal(t, evtExpect, evt)
-
-               // check cache
-               cache := cr.cache.Get(mockResourceID)
-               assert.Equal(t, resource.Value, cache)
-
-       })
-
-       t.Run("case watch: caught updateOp event", func(t *testing.T) {
-               // prepare updateOp data
-               dataUpdate := &sdcommon.Resource{Key: mockResourceID, 
DocumentID: mockDocumentID,
-                       Value: model.Instance{Domain: "default", Project: 
"default",
-                               Instance: &pb.MicroServiceInstance{InstanceId: 
mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
-
-               var mongoUpdateResources []*sdcommon.Resource
-               mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
-               testUpdate := &sdcommon.ListWatchResp{
-                       Action:    sdcommon.ActionUpdate,
-                       Resources: mongoUpdateResources,
-               }
-               lw.WatchResponse = testUpdate
-               lw.ListResponse = nil
-
-               cr.refresh(ctx)
-
-               // check event
-               evtExpect.Type = pb.EVT_UPDATE
-               evtExpect.Value = dataUpdate.Value
-               assert.Equal(t, evtExpect, evt)
-
-               // check cache
-               cache := cr.cache.Get(mockResourceID)
-               assert.Equal(t, dataUpdate.Value, cache)
-       })
-
-       t.Run("case watch: caught delete event but value is nil", func(t 
*testing.T) {
-               test.Action = sdcommon.ActionDelete
-               lw.WatchResponse = test
-               lw.ListResponse = nil
-
-               cr.refresh(ctx)
-
-               // check event
-               evtExpect.Type = pb.EVT_DELETE
-               assert.Equal(t, evtExpect, evt)
-
-               // check cache
-               cache := cr.cache.Get(mockResourceID)
-               assert.Nil(t, cache)
-
-       })
-}
-
-func TestMongoCacher_Run(t *testing.T) {
-       lw := &MockListWatch{}
-
-       mockMongoCache := NewMongoCache("test", DefaultOptions())
-       cr := &MongoCacher{
-               Options:   DefaultOptions().SetTable(instance),
-               ready:     make(chan struct{}),
-               lw:        lw,
-               goroutine: gopool.New(context.Background()),
-               cache:     mockMongoCache,
-       }
-
-       cr.Run()
-
-       // check cache
-       cache := cr.cache
-       assert.NotNil(t, cache)
-
-       cr.Stop()
-}
diff --git a/datasource/mongo/sd/options_test.go 
b/datasource/mongo/sd/options_test.go
index 58fc401..7051d2a 100644
--- a/datasource/mongo/sd/options_test.go
+++ b/datasource/mongo/sd/options_test.go
@@ -52,7 +52,6 @@ var mongoEventFunc MongoEventFunc
 func mongoEventFuncGet() MongoEventFunc {
        fun := func(evt MongoEvent) {
                evt.DocumentID = "DocumentID has changed"
-               evt.ResourceID = "BusinessID has changed"
                evt.Value = 2
                evt.Type = discovery.EVT_UPDATE
                log.Info("in event func")
diff --git a/datasource/mongo/sd/rule_cache.go 
b/datasource/mongo/sd/rule_cache.go
new file mode 100644
index 0000000..5ccbe61
--- /dev/null
+++ b/datasource/mongo/sd/rule_cache.go
@@ -0,0 +1,124 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "go.mongodb.org/mongo-driver/bson"
+)
+
+type ruleStore struct {
+       dirty      bool
+       d          *DocStore
+       indexCache *indexCache
+}
+
+func init() {
+       RegisterCacher(rule, newRuleStore)
+}
+
+func newRuleStore() *MongoCacher {
+       options := DefaultOptions().SetTable(rule)
+       cache := &ruleStore{
+               dirty:      false,
+               d:          NewDocStore(),
+               indexCache: NewIndexCache(),
+       }
+       ruleUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+               docID := MongoDocument{}
+               err := bson.Unmarshal(doc, &docID)
+               if err != nil {
+                       return
+               }
+               rule := model.Rule{}
+               err = bson.Unmarshal(doc, &rule)
+               if err != nil {
+                       return
+               }
+               resource.Value = rule
+               resource.Key = docID.ID.Hex()
+               return
+       }
+       return NewMongoCacher(options, cache, ruleUnmarshal)
+}
+
+func (s *ruleStore) Name() string {
+       return rule
+}
+
+func (s *ruleStore) Size() int {
+       return s.d.Size()
+}
+
+func (s *ruleStore) Get(key string) interface{} {
+       return s.d.Get(key)
+}
+
+func (s *ruleStore) ForEach(iter func(k string, v interface{}) (next bool)) {
+       s.d.ForEach(iter)
+}
+
+func (s *ruleStore) GetValue(index string) []interface{} {
+       docs := s.indexCache.Get(index)
+       res := make([]interface{}, 0, len(docs))
+       for _, v := range docs {
+               res = append(res, s.d.Get(v))
+       }
+       return res
+}
+
+func (s *ruleStore) Dirty() bool {
+       return s.dirty
+}
+
+func (s *ruleStore) MarkDirty() {
+       s.dirty = true
+}
+
+func (s *ruleStore) Clear() {
+       s.dirty = false
+       s.d.store.Flush()
+}
+
+func (s *ruleStore) ProcessUpdate(event MongoEvent) {
+       rule, ok := event.Value.(model.Rule)
+       if !ok {
+               return
+       }
+       s.d.Put(event.DocumentID, event.Value)
+       s.indexCache.Put(genRuleServiceID(rule), event.DocumentID)
+       return
+}
+
+func (s *ruleStore) ProcessDelete(event MongoEvent) {
+       rule, ok := s.d.Get(event.DocumentID).(model.Rule)
+       if !ok {
+               return
+       }
+       s.d.DeleteDoc(event.DocumentID)
+       s.indexCache.Delete(genRuleServiceID(rule), event.DocumentID)
+}
+
+func (s *ruleStore) isValueNotUpdated(value interface{}, newValue interface{}) 
bool {
+       return false
+}
+
+func genRuleServiceID(rule model.Rule) string {
+       return rule.ServiceID
+}
diff --git a/datasource/mongo/sd/service_cache.go 
b/datasource/mongo/sd/service_cache.go
new file mode 100644
index 0000000..3aba3c0
--- /dev/null
+++ b/datasource/mongo/sd/service_cache.go
@@ -0,0 +1,133 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+       "strings"
+
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+       "go.mongodb.org/mongo-driver/bson"
+)
+
+type serviceStore struct {
+       dirty      bool
+       docCache   *DocStore
+       indexCache *indexCache
+}
+
+func init() {
+       RegisterCacher(service, newServiceStore)
+}
+
+func newServiceStore() *MongoCacher {
+       options := DefaultOptions().SetTable(service)
+       cache := &serviceStore{
+               dirty:      false,
+               docCache:   NewDocStore(),
+               indexCache: NewIndexCache(),
+       }
+       serviceUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+               docID := MongoDocument{}
+               err := bson.Unmarshal(doc, &docID)
+               if err != nil {
+                       return
+               }
+               service := model.Service{}
+               err = bson.Unmarshal(doc, &service)
+               if err != nil {
+                       return
+               }
+               resource.Value = service
+               resource.Key = docID.ID.Hex()
+               return
+       }
+       return NewMongoCacher(options, cache, serviceUnmarshal)
+}
+
+func (s *serviceStore) Name() string {
+       return service
+}
+
+func (s *serviceStore) Size() int {
+       return s.docCache.Size()
+}
+
+func (s *serviceStore) Get(key string) interface{} {
+       return s.docCache.Get(key)
+}
+
+func (s *serviceStore) ForEach(iter func(k string, v interface{}) (next bool)) 
{
+       s.docCache.ForEach(iter)
+}
+
+func (s *serviceStore) GetValue(index string) []interface{} {
+       var docs []string
+       docs = s.indexCache.Get(index)
+       res := make([]interface{}, 0, len(docs))
+       for _, v := range docs {
+               res = append(res, s.docCache.Get(v))
+       }
+       return res
+}
+
+func (s *serviceStore) Dirty() bool {
+       return s.dirty
+}
+
+func (s *serviceStore) MarkDirty() {
+       s.dirty = true
+}
+
+func (s *serviceStore) Clear() {
+       s.dirty = false
+       s.docCache.store.Flush()
+}
+
+func (s *serviceStore) ProcessUpdate(event MongoEvent) {
+       service, ok := event.Value.(model.Service)
+       if !ok {
+               return
+       }
+       s.docCache.Put(event.DocumentID, event.Value)
+       s.indexCache.Put(genServiceID(service), event.DocumentID)
+       s.indexCache.Put(getServiceInfo(service), event.DocumentID)
+       return
+}
+
+func (s *serviceStore) ProcessDelete(event MongoEvent) {
+       service, ok := s.docCache.Get(event.DocumentID).(model.Service)
+       if !ok {
+               return
+       }
+       s.docCache.DeleteDoc(event.DocumentID)
+       s.indexCache.Delete(genServiceID(service), event.DocumentID)
+       s.indexCache.Delete(getServiceInfo(service), event.DocumentID)
+}
+
+func (s *serviceStore) isValueNotUpdated(value interface{}, newValue 
interface{}) bool {
+       return false
+}
+
+func genServiceID(svc model.Service) string {
+       return svc.Service.ServiceId
+}
+
+func getServiceInfo(svc model.Service) string {
+       return strings.Join([]string{svc.Domain, svc.Project, 
svc.Service.AppId, svc.Service.ServiceName, svc.Service.Version}, "/")
+}
diff --git a/datasource/mongo/sd/servicec_test.go 
b/datasource/mongo/sd/servicec_test.go
new file mode 100644
index 0000000..f705ddc
--- /dev/null
+++ b/datasource/mongo/sd/servicec_test.go
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package sd
+
+import (
+       "testing"
+
+       
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+)
+
+var serviceCache *MongoCacher
+
+var svc1 = model.Service{
+       Domain:  "default",
+       Project: "default",
+       Tags:    nil,
+       Service: &discovery.MicroService{
+               ServiceId:   "123456789",
+               AppId:       "appid1",
+               ServiceName: "svc1",
+               Version:     "1.0",
+       },
+}
+
+var svc2 = model.Service{
+       Domain:  "default",
+       Project: "default",
+       Tags:    nil,
+       Service: &discovery.MicroService{
+               ServiceId:   "987654321",
+               AppId:       "appid1",
+               ServiceName: "svc1",
+               Version:     "1.0",
+       },
+}
+
+func init() {
+       serviceCache = newServiceStore()
+}
+
+func TestServiceCacheBasicFunc(t *testing.T) {
+       t.Run("init serviceCache,should pass", func(t *testing.T) {
+               serviceCache := newServiceStore()
+               assert.NotNil(t, serviceCache)
+               assert.Equal(t, service, serviceCache.cache.Name())
+       })
+       event1 := MongoEvent{
+               DocumentID: "id1",
+               Value:      svc1,
+       }
+       event2 := MongoEvent{
+               DocumentID: "id2",
+               Value:      svc2,
+       }
+       t.Run("update&&delete serviceCache, should pass", func(t *testing.T) {
+               serviceCache.cache.ProcessUpdate(event1)
+               assert.Equal(t, serviceCache.cache.Size(), 1)
+               assert.Nil(t, serviceCache.cache.Get("id_not_exist"))
+               assert.Equal(t, svc1.Service.ServiceName, 
serviceCache.cache.Get("id1").(model.Service).Service.ServiceName)
+               assert.Len(t, 
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 1)
+               serviceCache.cache.ProcessUpdate(event2)
+               assert.Equal(t, serviceCache.cache.Size(), 2)
+               assert.Len(t, 
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 2)
+               assert.Len(t, serviceCache.cache.GetValue("987654321"), 1)
+               assert.Len(t, serviceCache.cache.GetValue("123456789"), 1)
+               serviceCache.cache.ProcessDelete(event1)
+               assert.Nil(t, serviceCache.cache.Get("id1"))
+               assert.Len(t, 
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 1)
+               serviceCache.cache.ProcessDelete(event2)
+               assert.Len(t, 
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 0)
+               assert.Nil(t, serviceCache.cache.Get("id2"))
+               assert.Len(t, serviceCache.cache.GetValue("987654321"), 0)
+               assert.Len(t, serviceCache.cache.GetValue("123456789"), 0)
+       })
+}
diff --git a/datasource/mongo/sd/types.go b/datasource/mongo/sd/types.go
index fe4b21e..a244910 100644
--- a/datasource/mongo/sd/types.go
+++ b/datasource/mongo/sd/types.go
@@ -21,6 +21,7 @@ import (
        "github.com/go-chassis/cari/discovery"
        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/bson/primitive"
+       "sync"
 
        "github.com/apache/servicecomb-service-center/datasource/sdcommon"
 )
@@ -28,20 +29,26 @@ import (
 const (
        service  = "service"
        instance = "instance"
+       rule     = "rule"
+       dep      = "dependency"
 )
 
-var (
-       Types []string
-)
+type cacherRegisterInitiallizer func() (cacher *MongoCacher)
+
+var cacherRegisterMutex sync.Mutex
+var CacherRegister = make(map[string]cacherRegisterInitiallizer)
 
-func RegisterType(name string) {
-       Types = append(Types, name)
+func RegisterCacher(t string, f cacherRegisterInitiallizer) {
+       cacherRegisterMutex.Lock()
+       defer cacherRegisterMutex.Unlock()
+       if _, exist := CacherRegister[t]; exist {
+               return
+       }
+       CacherRegister[t] = f
 }
 
 type MongoEvent struct {
        DocumentID string
-       ResourceID string
-       Index      string
        Value      interface{}
        Type       discovery.EventType
 }
@@ -56,18 +63,14 @@ type MongoEventHandler interface {
 func NewMongoEventByResource(resource *sdcommon.Resource, action 
discovery.EventType) MongoEvent {
        return MongoEvent{
                Type:       action,
-               Index:      resource.Index,
                Value:      resource.Value,
-               ResourceID: resource.Key,
-               DocumentID: resource.DocumentID,
+               DocumentID: resource.Key,
        }
 }
 
-func NewMongoEvent(id string, documentID string, index string, action 
discovery.EventType, v interface{}) MongoEvent {
+func NewMongoEvent(documentID string, action discovery.EventType, v 
interface{}) MongoEvent {
        event := MongoEvent{}
-       event.ResourceID = id
        event.DocumentID = documentID
-       event.Index = index
        event.Type = action
        event.Value = v
        return event
diff --git a/datasource/mongo/sd/typestore.go b/datasource/mongo/sd/typestore.go
index 806e6b2..5997d46 100644
--- a/datasource/mongo/sd/typestore.go
+++ b/datasource/mongo/sd/typestore.go
@@ -33,7 +33,6 @@ var store = &TypeStore{}
 
 func init() {
        store.Initialize()
-       registerInnerTypes()
 }
 
 type TypeStore struct {
@@ -48,11 +47,6 @@ func (s *TypeStore) Initialize() {
        s.goroutine = gopool.New(context.Background())
 }
 
-func registerInnerTypes() {
-       RegisterType(service)
-       RegisterType(instance)
-}
-
 func (s *TypeStore) Run() {
        s.goroutine.Do(s.store)
        s.goroutine.Do(s.autoClearCache)
@@ -60,7 +54,7 @@ func (s *TypeStore) Run() {
 
 func (s *TypeStore) store(ctx context.Context) {
        // new all types
-       for _, t := range Types {
+       for t, _ := range CacherRegister {
                select {
                case <-ctx.Done():
                        return
@@ -84,7 +78,7 @@ func (s *TypeStore) autoClearCache(ctx context.Context) {
                case <-ctx.Done():
                        return
                case <-time.After(ttl):
-                       for _, t := range Types {
+                       for t, _ := range CacherRegister {
                                cache := s.getOrCreateCache(t).Cache()
                                cache.MarkDirty()
                        }
@@ -98,15 +92,18 @@ func (s *TypeStore) getOrCreateCache(t string) *MongoCacher 
{
        if ok {
                return cache.(*MongoCacher)
        }
-
-       options := DefaultOptions().SetTable(t)
-
-       cache = NewMongoCache(t, options)
-       cacher := NewMongoCacher(options, cache.(*MongoCache))
+       f, ok := CacherRegister[t]
+       if !ok {
+               log.Fatal(fmt.Sprintf("unexpected type store "+t), nil)
+       }
+       cacher := f()
        cacher.Run()
 
        s.caches.Put(t, cacher)
        return cacher
+
+       s.caches.Put(t, cacher)
+       return cacher
 }
 
 func (s *TypeStore) Stop() {
@@ -129,6 +126,8 @@ func (s *TypeStore) Ready() <-chan struct{} {
 func (s *TypeStore) TypeCacher(id string) *MongoCacher { return 
s.getOrCreateCache(id) }
 func (s *TypeStore) Service() *MongoCacher             { return 
s.TypeCacher(service) }
 func (s *TypeStore) Instance() *MongoCacher            { return 
s.TypeCacher(instance) }
+func (s *TypeStore) Rule() *MongoCacher                { return 
s.TypeCacher(rule) }
+func (s *TypeStore) Dep() *MongoCacher                 { return 
s.TypeCacher(dep) }
 
 func Store() *TypeStore {
        return store
diff --git a/datasource/sdcommon/types.go b/datasource/sdcommon/types.go
index f8db7b6..120f547 100644
--- a/datasource/sdcommon/types.go
+++ b/datasource/sdcommon/types.go
@@ -57,8 +57,6 @@ type ListWatchResp struct {
 type Resource struct {
        // Key in etcd is prefix, in mongo is resourceId
        Key string
-       // DocumentID is only for mongo
-       DocumentID string
 
        // Index is the index of the resource
        Index string

Reply via email to