This is an automated email from the ASF dual-hosted git repository. littlecui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push: new 222aff0 Revert "[fix] fix inconsistency bug between cache layer and etcd. (#287)" (#298) (#299) 222aff0 is described below commit 222aff07d8db065b5b9872c9265bdfd6629c0a4d Author: little-cui <sure_0...@qq.com> AuthorDate: Mon Sep 18 19:22:26 2023 +0800 Revert "[fix] fix inconsistency bug between cache layer and etcd. (#287)" (#298) (#299) This reverts commit e005d80afeab09163289feac644dfd6a2f12fa41. Co-authored-by: tornado-ssy <64736788+tornado-...@users.noreply.github.com> Co-authored-by: songshiyuan 00649746 <songshiyu...@huawei.com> --- examples/dev/kie-conf.yaml | 8 +- server/config/struct.go | 11 +- server/datasource/etcd/kv/kv_cache.go | 152 +++++++--------- server/datasource/etcd/kv/kv_cache_test.go | 270 +++-------------------------- 4 files changed, 95 insertions(+), 346 deletions(-) diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml index d4aabb1..11b9106 100644 --- a/examples/dev/kie-conf.yaml +++ b/examples/dev/kie-conf.yaml @@ -18,10 +18,4 @@ db: # rsaPublicKeyFile: ./examples/dev/public.key sync: # turn on the synchronization switch related operations will be written to the task in the db - enabled: false -#cache: -# labels: -# - environment -# - service -# - app -# - version + enabled: false \ No newline at end of file diff --git a/server/config/struct.go b/server/config/struct.go index 44669de..83e91f0 100644 --- a/server/config/struct.go +++ b/server/config/struct.go @@ -19,10 +19,9 @@ package config // Config is yaml file struct type Config struct { - DB DB `yaml:"db"` - RBAC RBAC `yaml:"rbac"` - Sync Sync `yaml:"sync"` - Cache Cache `yaml:"cache"` + DB DB `yaml:"db"` + RBAC RBAC `yaml:"rbac"` + Sync Sync `yaml:"sync"` //config from cli ConfigFile string NodeName string @@ -60,7 +59,3 @@ type RBAC struct { type Sync struct { Enabled bool `yaml:"enabled"` } - -type Cache struct { - Labels []string `yaml:"labels"` -} diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go index 2930a99..5776bf7 100644 --- a/server/datasource/etcd/kv/kv_cache.go +++ b/server/datasource/etcd/kv/kv_cache.go @@ -11,7 +11,6 @@ import ( "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/pkg/stringutil" - "github.com/apache/servicecomb-kie/server/config" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/datasource/etcd/key" "github.com/go-chassis/foundation/backoff" @@ -38,27 +37,21 @@ const ( type IDSet map[string]struct{} -type LabelsSet map[string]struct{} - -type CacheSearchReq struct { - Domain string - Project string - Opts *datasource.FindOptions - Regex *regexp.Regexp +type Cache struct { + timeOut time.Duration + client etcdadpt.Client + revision int64 + kvIDCache sync.Map + kvDocCache *goCache.Cache } func NewKvCache() *Cache { kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval) - labelsSet := LabelsSet{} - for _, label := range config.Configurations.Cache.Labels { - labelsSet[label] = struct{}{} - } return &Cache{ timeOut: etcdWatchTimeout, client: etcdadpt.Instance(), revision: 0, kvDocCache: kvDocCache, - labelsSet: labelsSet, } } @@ -66,13 +59,11 @@ func Enabled() bool { return kvCache != nil } -type Cache struct { - timeOut time.Duration - client etcdadpt.Client - revision int64 - kvIDCache sync.Map - kvDocCache *goCache.Cache - labelsSet LabelsSet +type CacheSearchReq struct { + Domain string + Project string + Opts *datasource.FindOptions + Regex *regexp.Regexp } func (kc *Cache) Refresh(ctx context.Context) { @@ -139,7 +130,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) { return rsp, nil } -func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error { +func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error { if rsp == nil || len(rsp.Kvs) == 0 { return fmt.Errorf("unknown event") } @@ -163,9 +154,6 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) continue } - if !kc.isInLabelsSet(kvDoc.Labels) { - continue - } kc.StoreKvDoc(kvDoc.ID, kvDoc) cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) @@ -232,6 +220,46 @@ func (kc *Cache) DeleteKvDoc(kvID string) { kc.kvDocCache.Delete(kvID) } +func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { + if !req.Opts.ExactLabels { + return nil, false, nil + } + + openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) + result := &model.KVResponse{ + Data: []*model.KVDoc{}, + } + cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) + kvIds, ok := kvCache.LoadKvIDSet(cacheKey) + if !ok { + kvCache.StoreKvIDSet(cacheKey, IDSet{}) + return result, true, nil + } + + var docs []*model.KVDoc + + var kvIdsLeft []string + for kvID := range kvIds { + if doc, ok := kvCache.LoadKvDoc(kvID); ok { + docs = append(docs, doc) + continue + } + kvIdsLeft = append(kvIdsLeft, kvID) + } + + tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) + docs = append(docs, tpData...) + + for _, doc := range docs { + if isMatch(req, doc) { + datasource.ClearPart(doc) + result.Data = append(result.Data, doc) + } + } + result.Total = len(result.Data) + return result, true, nil +} + func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc { if len(kvIdsLeft) == 0 { return nil @@ -266,6 +294,19 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe return docs } +func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { + if doc == nil { + return false + } + if req.Opts.Status != "" && doc.Status != req.Opts.Status { + return false + } + if req.Regex != nil && !req.Regex.MatchString(doc.Key) { + return false + } + return true +} + func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) { kvDoc := &model.KVDoc{} err := json.Unmarshal(kv.Value, kvDoc) @@ -285,66 +326,3 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s }, "/") return inputKey } - -func (kc *Cache) isInLabelsSet(Labels map[string]string) bool { - for label := range Labels { - if _, ok := kc.labelsSet[label]; !ok { - return false - } - } - return true -} - -func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { - result := &model.KVResponse{ - Data: []*model.KVDoc{}, - } - if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) { - return result, false, nil - } - - openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) - cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) - - kvIds, ok := kvCache.LoadKvIDSet(cacheKey) - if !ok { - kvCache.StoreKvIDSet(cacheKey, IDSet{}) - return result, true, nil - } - - var docs []*model.KVDoc - - var kvIdsLeft []string - for kvID := range kvIds { - if doc, ok := kvCache.LoadKvDoc(kvID); ok { - docs = append(docs, doc) - continue - } - kvIdsLeft = append(kvIdsLeft, kvID) - } - - tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) - docs = append(docs, tpData...) - - for _, doc := range docs { - if isMatch(req, doc) { - datasource.ClearPart(doc) - result.Data = append(result.Data, doc) - } - } - result.Total = len(result.Data) - return result, true, nil -} - -func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { - if doc == nil { - return false - } - if req.Opts.Status != "" && doc.Status != req.Opts.Status { - return false - } - if req.Regex != nil && !req.Regex.MatchString(doc.Key) { - return false - } - return true -} diff --git a/server/datasource/etcd/kv/kv_cache_test.go b/server/datasource/etcd/kv/kv_cache_test.go index c128b5a..2286699 100644 --- a/server/datasource/etcd/kv/kv_cache_test.go +++ b/server/datasource/etcd/kv/kv_cache_test.go @@ -1,67 +1,46 @@ package kv import ( - "fmt" - "reflect" "testing" - "time" - "github.com/apache/servicecomb-kie/pkg/model" - "github.com/apache/servicecomb-kie/server/config" "github.com/little-cui/etcdadpt" - goCache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/mvccpb" ) -func init() { - config.Configurations.Cache.Labels = []string{"environment"} +type args struct { + rsp *etcdadpt.Response } func TestCachePut(t *testing.T) { - type args struct { - rsp *etcdadpt.Response - } tests := []struct { name string args args want int }{ - { - name: "put 0 kvDoc, cache should store 0 kvDoc", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}}, - want: 0, + {"put 0 kvDoc, cache should store 0 kvDoc", + args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}}, + 0, }, - { - name: "put 1 kvDoc, cache should store 1 kvDoc", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + {"put 1 kvDoc, cache should store 1 kvDoc", + args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, }}}, - want: 1, + 1, }, - { - name: "put 2 kvDocs with different kvIds, cache should store 2 kvDocs", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + {"put 2 kvDocs with different kvIds, cache should store 2 kvDocs", + args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, {Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)}, }}}, - want: 2, + 2, }, - { - name: "put 2 kvDocs with same kvId, cache should store 1 kvDocs", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + {"put 2 kvDocs with same kvId, cache should store 1 kvDocs", + args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, {Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)}, }}}, - want: 1, - }, - { - name: "put 2 kvDoc, but labels are not cached, cache should store 0 kvDoc", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ - {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)}, - {Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"env":"testing"}}`)}, - }}}, - want: 0, + 1, }, } for _, tt := range tests { @@ -75,40 +54,33 @@ func TestCachePut(t *testing.T) { } func TestCacheDelete(t *testing.T) { - type args struct { - rsp *etcdadpt.Response - } tests := []struct { name string args args want int }{ - { - name: "first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}}, - want: 2, + {"first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs", + args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}}, + 2, }, - { - name: "first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{ + {"first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs", + args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, }}}, - want: 1, + 1, }, - { - name: "first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + {"first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs", + args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, {Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)}, }}}, - want: 0, + 0, }, - { - name: "first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs", - args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + {"first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs", + args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"0", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, }}}, - want: 2, + 2, }, } for _, tt := range tests { @@ -124,193 +96,3 @@ func TestCacheDelete(t *testing.T) { }) } } - -func TestWatchCallBack(t *testing.T) { - type args struct { - rsp []*etcdadpt.Response - } - type want struct { - kvNum int - err error - } - tests := []struct { - name string - args args - want want - }{ - { - name: "receive 2 messages without kvs, expected: error is not nil, cache should store 0 kvDoc", - args: args{ - rsp: []*etcdadpt.Response{ - { - Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}, - }, - - { - Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}, - }, - }, - }, - want: want{ - kvNum: 0, - err: fmt.Errorf("unknown event"), - }, - }, - { - name: "receive 1 put message, put 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc", - args: args{ - rsp: []*etcdadpt.Response{ - { - Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}, - }, - }, - }, - want: want{ - kvNum: 0, - err: fmt.Errorf("unknown event"), - }, - }, - { - name: "receive 1 delete message, delete 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc", - args: args{ - rsp: []*etcdadpt.Response{{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}}, - }, - want: want{ - kvNum: 0, - err: fmt.Errorf("unknown event"), - }, - }, - { - name: "receive put message, put 1 kvDocs, expected: error is nil, cache should store 1 kvDocs", - args: args{ - rsp: []*etcdadpt.Response{ - { - Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}}, - }, - }, - want: want{ - kvNum: 1, - err: nil, - }, - }, - { - name: "receive 1 put message, 1 delete message, first put 1 kvDoc, then delete it, expected: error is nil, cache should store 0 kvDoc", - args: args{ - rsp: []*etcdadpt.Response{ - { - Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}, - }, - { - Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}, - }, - }, - }, - want: want{ - kvNum: 0, - err: nil, - }, - }, - { - name: "receive put message put 1 kvDoc, but labels are not cached, cache should store 0 kvDoc", - args: args{ - []*etcdadpt.Response{ - { - Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)}}, - }, - }, - }, - want: want{ - kvNum: 0, - err: nil, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - kc := NewKvCache() - for _, rsp := range tt.args.rsp { - err := kc.watchCallBack("", rsp) - assert.Equal(t, tt.want.err, err) - } - num := kc.kvDocCache.ItemCount() - assert.Equal(t, tt.want.kvNum, num) - }) - } -} - -func TestStoreAndLoadKvDoc(t *testing.T) { - type want struct { - kvDoc *model.KVDoc - exist bool - } - type args struct { - kvID string - kvDoc *model.KVDoc - expireTime time.Duration - waitTimeAfterStore time.Duration - } - tests := []struct { - name string - args args - want want - }{ - { - name: "store 1 kv and the expire time is 1 seconds, then load the kv with no wait time, expect: load kv successfully", - args: args{ - kvID: "", - kvDoc: &model.KVDoc{ - ID: "1", - Key: "withFood", - Value: "yes", - Labels: map[string]string{ - "environment": "testing", - }, - }, - expireTime: 1 * time.Second, - waitTimeAfterStore: 0, - }, - want: want{ - kvDoc: &model.KVDoc{ - ID: "1", - Key: "withFood", - Value: "yes", - Labels: map[string]string{ - "environment": "testing", - }, - }, - exist: true, - }, - }, - { - name: "store 1 kv and the expire time is 1 seconds, after waiting 2 seconds, then load the kv, expect: unable to load the kv", - args: args{ - kvID: "", - kvDoc: &model.KVDoc{ - ID: "1", - Key: "withFood", - Value: "yes", - Labels: map[string]string{ - "environment": "testing", - }, - }, - expireTime: 1 * time.Second, - waitTimeAfterStore: 2 * time.Second, - }, - want: want{ - kvDoc: nil, - exist: false, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - kc := NewKvCache() - kc.kvDocCache = goCache.New(tt.args.expireTime, tt.args.expireTime) - kc.StoreKvDoc(tt.args.kvID, tt.args.kvDoc) - time.Sleep(tt.args.waitTimeAfterStore) - doc, exist := kc.LoadKvDoc(tt.args.kvID) - assert.Equal(t, tt.want.exist, exist) - reflect.DeepEqual(tt.want.kvDoc, doc) - }) - } -}