This is an automated email from the ASF dual-hosted git repository. asifdxtreme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
commit f6f5b0af01bf460bebba782f2b6739fb66e12782 Author: tian <xiaoliang.t...@gmail.com> AuthorDate: Fri May 31 11:57:33 2019 +0800 redesign database to make service faster and more ease of use --- client/client.go | 6 +- client/client_suite_test.go | 2 + client/client_test.go | 2 +- client/options.go | 6 +- go.mod | 4 +- pkg/model/kv.go | 22 +- pkg/model/kv_test.go | 4 +- pkg/model/mongodb_doc.go | 47 ++++ proxy.sh | 3 + server/dao/errors.go | 7 +- server/dao/{mongodb.go => kie_api.go} | 254 ++++++++++----------- server/dao/kv.go | 73 ++++-- server/dao/kv_test.go | 59 ++--- server/dao/label.go | 69 ++++++ server/dao/label_history.go | 111 +++++++++ server/dao/mongodb_operator.go | 95 ++++++++ server/dao/options.go | 22 +- server/resource/v1/common.go | 4 +- .../resource/v1/history_resource.go | 20 +- server/resource/v1/kv_resource.go | 23 +- 20 files changed, 589 insertions(+), 244 deletions(-) diff --git a/client/client.go b/client/client.go index cb78d7c..3bcf161 100644 --- a/client/client.go +++ b/client/client.go @@ -74,7 +74,7 @@ func New(config Config) (*Client, error) { } //GetValue get value of a key -func (c *Client) Get(ctx context.Context, key string, opts ...GetOption) ([]*model.KV, error) { +func (c *Client) Get(ctx context.Context, key string, opts ...GetOption) ([]*model.KVDoc, error) { options := GetOptions{} for _, o := range opts { o(&options) @@ -84,7 +84,7 @@ func (c *Client) Get(ctx context.Context, key string, opts ...GetOption) ([]*mod if options.MatchMode != "" { h.Set(common.HeaderMatch, options.MatchMode) } - resp, err := c.c.HTTPDo("GET", url, h, nil) + resp, err := c.c.HTTPDoWithContext(ctx, "GET", url, h, nil) if err != nil { return nil, err } @@ -96,7 +96,7 @@ func (c *Client) Get(ctx context.Context, key string, opts ...GetOption) ([]*mod return nil, fmt.Errorf("get %s failed,http status [%s], body [%s]", key, resp.Status, b) } - kvs := make([]*model.KV, 0) + kvs := make([]*model.KVDoc, 0) err = json.Unmarshal(b, kvs) if err != nil { openlogging.Error("unmarshal kv failed:" + err.Error()) diff --git a/client/client_suite_test.go b/client/client_suite_test.go index 27a4f23..f699279 100644 --- a/client/client_suite_test.go +++ b/client/client_suite_test.go @@ -25,6 +25,7 @@ import ( . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" . "github.com/onsi/gomega" + "os" ) func TestClient(t *testing.T) { @@ -41,4 +42,5 @@ var _ = BeforeSuite(func() { logger := log.NewLogger("ut") openlogging.SetLogger(logger) + os.Setenv("HTTP_DEBUG","1") }) diff --git a/client/client_test.go b/client/client_test.go index 3e02b02..937b6c1 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -53,7 +53,7 @@ var _ = Describe("Client", func() { }) Context("by key and labels", func() { - _, err := c1.Get(context.TODO(), "app.properties", WithLables(map[string]string{ + _, err := c1.Get(context.TODO(), "app.properties", WithLabels(map[string]string{ "app": "mall", })) It("should be 404 error", func() { diff --git a/client/options.go b/client/options.go index 351b476..374dcfc 100644 --- a/client/options.go +++ b/client/options.go @@ -25,11 +25,15 @@ type GetOptions struct { MatchMode string } -func WithLables(l map[string]string) GetOption { +//WithLabels query kv by labels +func WithLabels(l map[string]string) GetOption { return func(options *GetOptions) { options.Labels = l } } + +//WithMatchMode has 2 modes +//exact and greedy func WithMatchMode(m string) GetOption { return func(options *GetOptions) { options.MatchMode = m diff --git a/go.mod b/go.mod index dd87b23..d5efedc 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,9 @@ module github.com/apache/servicecomb-kie require ( github.com/emicklei/go-restful v2.8.0+incompatible - github.com/go-chassis/foundation v0.0.0-20190203091418-304855ea28bf + github.com/go-chassis/foundation v0.0.0-20190516083152-b8b2476b6db7 github.com/go-chassis/go-archaius v0.16.0 - github.com/go-chassis/go-chassis v1.4.0 + github.com/go-chassis/go-chassis v1.4.1 github.com/go-chassis/paas-lager v1.0.2-0.20190328010332-cf506050ddb2 github.com/go-mesh/openlogging v1.0.1-0.20181205082104-3d418c478b2d github.com/onsi/ginkgo v1.8.0 diff --git a/pkg/model/kv.go b/pkg/model/kv.go index 042cd20..be9830e 100644 --- a/pkg/model/kv.go +++ b/pkg/model/kv.go @@ -17,25 +17,9 @@ package model -import ( - "go.mongodb.org/mongo-driver/bson/primitive" -) -type Labels map[string]string -type KV struct { - ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` - Key string `json:"key"` - Value string `json:"value"` - ValueType string `json:"valueType"` //ini,json,text,yaml,properties - Domain string `json:"domain"` //tenant info - Labels map[string]string `json:"labels,omitempty"` //key has labels - Checker string `json:"check,omitempty"` //python script - Revision int `json:"revision"` -} -type KVHistory struct { - KID string `json:"id,omitempty" bson:"kvID"` - Value string `json:"value"` - Checker string `json:"check,omitempty"` //python script - Revision int `json:"revision"` + + +type KVResponse struct { } diff --git a/pkg/model/kv_test.go b/pkg/model/kv_test.go index cbb05c0..feb1ce3 100644 --- a/pkg/model/kv_test.go +++ b/pkg/model/kv_test.go @@ -25,7 +25,7 @@ import ( ) func TestKV_UnmarshalJSON(t *testing.T) { - kv := &model.KV{ + kv := &model.KVDoc{ Value: "test", Labels: map[string]string{ "test": "env", @@ -34,7 +34,7 @@ func TestKV_UnmarshalJSON(t *testing.T) { b, _ := json.Marshal(kv) t.Log(string(b)) - var kv2 model.KV + var kv2 model.KVDoc err := json.Unmarshal([]byte(` {"value": "1","labels":{"test":"env"}} `), &kv2) diff --git a/pkg/model/mongodb_doc.go b/pkg/model/mongodb_doc.go new file mode 100644 index 0000000..a298ca2 --- /dev/null +++ b/pkg/model/mongodb_doc.go @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type LabelDoc struct { + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Revision int `json:"revision"` + Domain string `json:"domain,omitempty"` //tenant info +} +type KVDoc struct { + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + LabelID string `json:"label_id,omitempty" bson:"label_id,omitempty"` + Key string `json:"key"` + Value string `json:"value,omitempty"` + ValueType string `json:"value_type,omitempty" bson:"value_type,omitempty"` //ini,json,text,yaml,properties + Checker string `json:"check,omitempty"` //python script + + Labels map[string]string `json:"labels,omitempty"` //redundant + Domain string `json:"domain,omitempty"` //redundant + Revision int `json:"revision,omitempty" bson:"-"` +} +type LabelRevisionDoc struct { + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + LabelID string `json:"label_id,omitempty" bson:"label_id,omitempty"` + Labels map[string]string `json:"labels,omitempty"` //redundant + Domain string `json:"domain,omitempty"` //redundant + KVs []*KVDoc `json:"data,omitempty"` // save states of this revision + Revision int `json:"revision"` +} diff --git a/proxy.sh b/proxy.sh new file mode 100755 index 0000000..da4430b --- /dev/null +++ b/proxy.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +export GOPROXY=https://goproxy.io diff --git a/server/dao/errors.go b/server/dao/errors.go index 31074f8..349ca6d 100644 --- a/server/dao/errors.go +++ b/server/dao/errors.go @@ -21,14 +21,13 @@ import ( "errors" "fmt" - "github.com/apache/servicecomb-kie/pkg/model" "github.com/go-mesh/openlogging" ) //ErrAction will wrap raw error to biz error and return -//it record audit log for mongodb operation failure like find, insert, update, deletion -func ErrAction(action, key string, labels model.Labels, domain string, err error) error { - msg := fmt.Sprintf("can not [%s] [%s] in [%s] with [%s],err: %s", action, key, domain, labels, err.Error()) +//it record audit log for mongodb operation failure like find, insert, updateKey, deletion +func ErrAction(action, filter interface{}, err error) error { + msg := fmt.Sprintf("can not [%s] [%v],err: %s", action, filter, err.Error()) openlogging.Error(msg) return errors.New(msg) diff --git a/server/dao/mongodb.go b/server/dao/kie_api.go similarity index 55% rename from server/dao/mongodb.go rename to server/dao/kie_api.go index 9fc6ad0..c5ae9bd 100644 --- a/server/dao/mongodb.go +++ b/server/dao/kie_api.go @@ -32,11 +32,12 @@ import ( var client *mongo.Client const ( - DB = "kie" - CollectionKV = "kv" - CollectionRevision = "revision" - DefaultTimeout = 5 * time.Second - DefaultValueType = "text" + DB = "kie" + CollectionLabel = "label" + CollectionKV = "kv" + CollectionLabelRevision = "label_revision" + DefaultTimeout = 5 * time.Second + DefaultValueType = "text" ) type MongodbService struct { @@ -44,104 +45,146 @@ type MongodbService struct { timeout time.Duration } -func (s *MongodbService) CreateOrUpdate(kv *model.KV) (*model.KV, error) { - if kv.Domain == "" { +func (s *MongodbService) CreateOrUpdate(ctx context.Context, domain string, kv *model.KVDoc) (*model.KVDoc, error) { + if domain == "" { return nil, ErrMissingDomain } - ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) - collection := s.c.Database(DB).Collection(CollectionKV) - oid, err := s.Exist(kv.Key, kv.Domain, kv.Labels) + ctx, _ = context.WithTimeout(ctx, DefaultTimeout) + //check labels exits or not + labelID, err := s.LabelsExist(ctx, domain, kv.Labels) + var l *model.LabelDoc if err != nil { - if err != ErrNotExists { - return nil, err - } - } - if oid != "" { - hex, err := primitive.ObjectIDFromHex(oid) - if err != nil { - openlogging.Error(fmt.Sprintf("convert %s ,err:%s", oid, err)) - return nil, err - } - kv.ID = hex - if err := s.update(ctx, collection, kv); err != nil { + if err == ErrLabelNotExists { + l, err = s.createLabel(ctx, domain, kv.Labels) + if err != nil { + return nil, err + } + labelID = l.ID + } else { return nil, err } - return kv, nil + } + kv.LabelID = labelID.Hex() + kv.Domain = domain if kv.ValueType == "" { kv.ValueType = DefaultValueType } - //set 1 to revision for insertion - kv.Revision = 1 - res, err := collection.InsertOne(ctx, kv) + keyID, err := s.KVExist(ctx, domain, kv.Key, WithLabelID(kv.LabelID)) if err != nil { + if err == ErrKeyNotExists { + kv, err := s.createKey(ctx, kv) + if err != nil { + return nil, err + } + return kv, nil + } return nil, err } - objectID, _ := res.InsertedID.(primitive.ObjectID) - kv.ID = objectID - if err := s.AddHistory(kv); err != nil { - openlogging.Warn( - fmt.Sprintf("can not update version for [%s] [%s] in [%s]", - kv.Key, kv.Labels, kv.Domain)) + kv.ID = keyID + revision, err := s.updateKey(ctx, kv) + if err != nil { + return nil, err } - openlogging.Debug(fmt.Sprintf("create %s with labels %s value [%s]", kv.Key, kv.Labels, kv.Value)) + kv.Revision = revision return kv, nil + } -//update get latest revision from history -//and increase revision -//and update and them add new history -func (s *MongodbService) update(ctx context.Context, collection *mongo.Collection, kv *model.KV) error { - h, err := s.getLatest(kv.ID) - if err != nil { - openlogging.Error(fmt.Sprintf("get latest [%s][%s] in [%s],err: %s", - kv.Key, kv.Labels, kv.Domain, err.Error())) - return err +//FindLabels find label doc by labels +//if map is empty. will return default labels doc which has no labels +func (s *MongodbService) FindLabels(ctx context.Context, domain string, labels map[string]string) (*model.LabelDoc, error) { + collection := s.c.Database(DB).Collection(CollectionLabel) + ctx, _ = context.WithTimeout(context.Background(), DefaultTimeout) + filter := bson.M{"domain": domain} + for k, v := range labels { + filter["labels."+k] = v } - if h != nil { - kv.Revision = h.Revision + 1 + if len(labels) == 0 { + filter["labels"] = "default" //allow key without labels } - ur, err := collection.UpdateOne(ctx, bson.M{"_id": kv.ID}, bson.D{ - {"$set", bson.D{ - {"value", kv.Value}, - {"revision", kv.Revision}, - {"checker", kv.Checker}, - }}, - }) + cur, err := collection.Find(ctx, filter) if err != nil { - return err + if err.Error() == context.DeadlineExceeded.Error() { + return nil, ErrAction("find label", filter, fmt.Errorf("can not reach mongodb in %s", s.timeout)) + } + return nil, err } - openlogging.Debug( - fmt.Sprintf("update %s with labels %s value [%s] %d ", - kv.Key, kv.Labels, kv.Value, ur.ModifiedCount)) - if err := s.AddHistory(kv); err != nil { - openlogging.Warn( - fmt.Sprintf("can not update version for [%s] [%s] in [%s]", - kv.Key, kv.Labels, kv.Domain)) - } - openlogging.Debug( - fmt.Sprintf("add history %s with labels %s value [%s] %d ", - kv.Key, kv.Labels, kv.Value, ur.ModifiedCount)) - return nil + defer cur.Close(ctx) + if cur.Err() != nil { + return nil, err + } + openlogging.Debug(fmt.Sprintf("find lables [%s] in [%s]", labels, domain)) + curLabel := &model.LabelDoc{} //reuse this pointer to reduce GC, only clear label + //check label length to get the exact match + for cur.Next(ctx) { //although complexity is O(n), but there won't be so much labels + curLabel.Labels = nil + err := cur.Decode(curLabel) + if err != nil { + openlogging.Error("decode error: " + err.Error()) + return nil, err + } + if len(curLabel.Labels) == len(labels) { + openlogging.Debug("hit exact labels") + curLabel.Labels = nil //exact match don't need to return labels + return curLabel, nil + } + } + return nil, ErrLabelNotExists } -func (s *MongodbService) Exist(key, domain string, labels model.Labels) (string, error) { - kvs, err := s.Find(domain, WithExactLabels(), WithLabels(labels), WithKey(key)) + +func (s *MongodbService) findKeys(ctx context.Context, filter bson.M, withoutLabel bool) ([]*model.KVDoc, error) { + collection := s.c.Database(DB).Collection(CollectionKV) + cur, err := collection.Find(ctx, filter) if err != nil { - return "", err + if err.Error() == context.DeadlineExceeded.Error() { + return nil, ErrAction("find", filter, fmt.Errorf("can not reach mongodb in %s", s.timeout)) + } + return nil, err + } + defer cur.Close(ctx) + if cur.Err() != nil { + return nil, err + } + kvs := make([]*model.KVDoc, 0) + curKV := &model.KVDoc{} //reduce GC,but need to clear labels + for cur.Next(ctx) { + curKV.Labels = nil + if err := cur.Decode(curKV); err != nil { + openlogging.Error("decode to KVs error: " + err.Error()) + return nil, err + } + if withoutLabel { + curKV.Labels = nil + } + kvs = append(kvs, curKV) + } - if len(kvs) != 1 { - return "", ErrTooMany + if len(kvs) == 0 { + return nil, ErrKeyNotExists } + return kvs, nil +} - return kvs[0].ID.Hex(), nil +//FindKVByLabelID get kvs by key and label id +//key can be empty, then it will return all key values +//if key is given, will return 0-1 key value +func (s *MongodbService) FindKVByLabelID(ctx context.Context, domain, labelID, key string) ([]*model.KVDoc, error) { + ctx, _ = context.WithTimeout(context.Background(), DefaultTimeout) + filter := bson.M{"label_id": labelID, "domain": domain} + if key != "" { + return s.findOneKey(ctx, filter, key) + } else { + return s.findKeys(ctx, filter, true) + } } -//Find get kvs by key, labels +//FindKV get kvs by key, labels //because labels has a a lot of combination, //you can use WithExactLabels to return only one kv which's labels exactly match the criteria -func (s *MongodbService) Find(domain string, options ...FindOption) ([]*model.KV, error) { +func (s *MongodbService) FindKV(ctx context.Context, domain string, options ...FindOption) ([]*model.KVDoc, error) { opts := FindOptions{} for _, o := range options { o(&opts) @@ -150,7 +193,7 @@ func (s *MongodbService) Find(domain string, options ...FindOption) ([]*model.KV return nil, ErrMissingDomain } collection := s.c.Database(DB).Collection(CollectionKV) - ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) + ctx, _ = context.WithTimeout(ctx, DefaultTimeout) filter := bson.M{"domain": domain} if opts.Key != "" { filter["key"] = opts.Key @@ -162,7 +205,7 @@ func (s *MongodbService) Find(domain string, options ...FindOption) ([]*model.KV cur, err := collection.Find(ctx, filter) if err != nil { if err.Error() == context.DeadlineExceeded.Error() { - return nil, ErrAction("find", opts.Key, opts.Labels, domain, fmt.Errorf("can not reach mongodb in %s", s.timeout)) + return nil, ErrAction("find", filter, fmt.Errorf("can not reach mongodb in %s", s.timeout)) } return nil, err } @@ -172,7 +215,7 @@ func (s *MongodbService) Find(domain string, options ...FindOption) ([]*model.KV } if opts.ExactLabels { openlogging.Debug(fmt.Sprintf("find one [%s] with lables [%s] in [%s]", opts.Key, opts.Labels, domain)) - curKV := &model.KV{} //reuse this pointer to reduce GC, only clear label + curKV := &model.KVDoc{} //reuse this pointer to reduce GC, only clear label //check label length to get the exact match for cur.Next(ctx) { //although complexity is O(n), but there won't be so much labels for one key curKV.Labels = nil @@ -182,16 +225,17 @@ func (s *MongodbService) Find(domain string, options ...FindOption) ([]*model.KV return nil, err } if len(curKV.Labels) == len(opts.Labels) { - openlogging.Debug("hit") - return []*model.KV{curKV}, nil + openlogging.Debug("hit exact labels") + curKV.Labels = nil //exact match don't need to return labels + return []*model.KVDoc{curKV}, nil } } - return nil, ErrNotExists + return nil, ErrKeyNotExists } else { - kvs := make([]*model.KV, 0) + kvs := make([]*model.KVDoc, 0) for cur.Next(ctx) { - curKV := &model.KV{} + curKV := &model.KVDoc{} if err := cur.Decode(curKV); err != nil { openlogging.Error("decode to KVs error: " + err.Error()) return nil, err @@ -200,7 +244,7 @@ func (s *MongodbService) Find(domain string, options ...FindOption) ([]*model.KV } if len(kvs) == 0 { - return nil, ErrNotExists + return nil, ErrKeyNotExists } return kvs, nil } @@ -253,7 +297,7 @@ func (s *MongodbService) Delete(ids []string, domain string) error { dr, err := collection.DeleteMany(ctx, filter) //check error and delete number if err != nil { - openlogging.Error(fmt.Sprintf("delete [%s] failed : [%s]", filter, err)) + openlogging.Error(fmt.Sprintf("delete [%v] failed : [%s]", filter, err)) return err } if dr.DeletedCount != int64(len(oid)) { @@ -263,51 +307,7 @@ func (s *MongodbService) Delete(ids []string, domain string) error { } return nil } -func (s *MongodbService) AddHistory(kv *model.KV) error { - collection := s.c.Database(DB).Collection(CollectionRevision) - ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) - h := &model.KVHistory{ - KID: kv.ID.Hex(), - Value: kv.Value, - Revision: kv.Revision, - Checker: kv.Checker, - } - _, err := collection.InsertOne(ctx, h) - if err != nil { - openlogging.Error(err.Error()) - return err - } - return nil -} -func (s *MongodbService) getLatest(id primitive.ObjectID) (*model.KVHistory, error) { - collection := s.c.Database(DB).Collection(CollectionRevision) - ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) - - filter := bson.M{"kvID": id.Hex()} - - cur, err := collection.Find(ctx, filter, - options.Find().SetSort(map[string]interface{}{ - "revision": -1, - }), options.Find().SetLimit(1)) - if err != nil { - return nil, err - } - h := &model.KVHistory{} - var exist bool - for cur.Next(ctx) { - if err := cur.Decode(h); err != nil { - openlogging.Error("decode to KVs error: " + err.Error()) - return nil, err - } - exist = true - break - } - if !exist { - return nil, nil - } - return h, nil -} -func NewMongoService(opts Options) (KV, error) { +func NewMongoService(opts Options) (*MongodbService, error) { if opts.Timeout == 0 { opts.Timeout = DefaultTimeout } diff --git a/server/dao/kv.go b/server/dao/kv.go index 9cf40fe..1f7eccf 100644 --- a/server/dao/kv.go +++ b/server/dao/kv.go @@ -19,27 +19,26 @@ package dao import ( + "context" "crypto/tls" "errors" "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/server/config" + "github.com/go-mesh/openlogging" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" "time" ) -var ErrMissingDomain = errors.New("domain info missing, illegal access") -var ErrNotExists = errors.New("key with labels does not exits") -var ErrTooMany = errors.New("key with labels should be only one") -var ErrKeyMustNotEmpty = errors.New("must supply key if you want to get exact one result") - -type KV interface { - CreateOrUpdate(kv *model.KV) (*model.KV, error) - //do not use primitive.ObjectID as return to decouple with mongodb, we can afford perf lost - Exist(key, domain string, labels model.Labels) (string, error) - Delete(ids []string, domain string) error - Find(domain string, options ...FindOption) ([]*model.KV, error) - AddHistory(kv *model.KV) error - //RollBack(kv *KV, version string) error -} +var ( + ErrMissingDomain = errors.New("domain info missing, illegal access") + ErrKeyNotExists = errors.New("key with labels does not exits") + ErrLabelNotExists = errors.New("labels does not exits") + ErrTooMany = errors.New("key with labels should be only one") + ErrKeyMustNotEmpty = errors.New("must supply key if you want to get exact one result") + ErrRevisionNotExist = errors.New("label revision not exist") +) type Options struct { URI string @@ -49,7 +48,7 @@ type Options struct { Timeout time.Duration } -func NewKVService() (KV, error) { +func NewKVService() (*MongodbService, error) { opts := Options{ URI: config.GetDB().URI, PoolSize: config.GetDB().PoolSize, @@ -60,3 +59,47 @@ func NewKVService() (KV, error) { } return NewMongoService(opts) } +func (s *MongodbService) findOneKey(ctx context.Context, filter bson.M, key string) ([]*model.KVDoc, error) { + collection := s.c.Database(DB).Collection(CollectionKV) + filter["key"] = key + sr := collection.FindOne(ctx, filter) + if sr.Err() != nil { + return nil, sr.Err() + } + curKV := &model.KVDoc{} + err := sr.Decode(curKV) + if err != nil { + if err == mongo.ErrNoDocuments { + return nil, ErrKeyNotExists + } + openlogging.Error("decode error: " + err.Error()) + return nil, err + } + return []*model.KVDoc{curKV}, nil +} + +//KVExist supports you query by label map or labels id +func (s *MongodbService) KVExist(ctx context.Context, domain, key string, options ...FindOption) (primitive.ObjectID, error) { + opts := FindOptions{} + for _, o := range options { + o(&opts) + } + if opts.LabelID != "" { + kvs, err := s.FindKVByLabelID(ctx, domain, opts.LabelID, key) + if err != nil { + return primitive.NilObjectID, err + } + return kvs[0].ID, nil + } else { + kvs, err := s.FindKV(ctx, domain, WithExactLabels(), WithLabels(opts.Labels), WithKey(key)) + if err != nil { + return primitive.NilObjectID, err + } + if len(kvs) != 1 { + return primitive.NilObjectID, ErrTooMany + } + + return kvs[0].ID, nil + } + +} diff --git a/server/dao/kv_test.go b/server/dao/kv_test.go index 81bd106..7e2a5eb 100644 --- a/server/dao/kv_test.go +++ b/server/dao/kv_test.go @@ -18,15 +18,15 @@ package dao_test import ( + "context" "github.com/apache/servicecomb-kie/pkg/model" - . "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/server/dao" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) var _ = Describe("Kv mongodb service", func() { - var s dao.KV + var s *dao.MongodbService var err error Describe("connecting db", func() { s, err = dao.NewMongoService(dao.Options{ @@ -39,10 +39,9 @@ var _ = Describe("Kv mongodb service", func() { Describe("put kv timeout", func() { Context("with labels app and service", func() { - kv, err := s.CreateOrUpdate(&model.KV{ - Key: "timeout", - Value: "2s", - Domain: "default", + kv, err := s.CreateOrUpdate(context.TODO(), "default", &model.KVDoc{ + Key: "timeout", + Value: "2s", Labels: map[string]string{ "app": "mall", "service": "cart", @@ -51,36 +50,29 @@ var _ = Describe("Kv mongodb service", func() { It("should not return err", func() { Expect(err).Should(BeNil()) }) - It("should has revision", func() { - Expect(kv.Revision).ShouldNot(BeZero()) - }) It("should has ID", func() { Expect(kv.ID.Hex()).ShouldNot(BeEmpty()) }) }) Context("with labels app, service and version", func() { - kv, err := s.CreateOrUpdate(&KV{ - Key: "timeout", - Value: "2s", - Domain: "default", + kv, err := s.CreateOrUpdate(context.TODO(), "default", &model.KVDoc{ + Key: "timeout", + Value: "2s", Labels: map[string]string{ "app": "mall", "service": "cart", "version": "1.0.0", }, }) - oid, err := s.Exist("timeout", "default", map[string]string{ + oid, err := s.KVExist(context.TODO(), "default", "timeout", dao.WithLabels(map[string]string{ "app": "mall", "service": "cart", "version": "1.0.0", - }) + })) It("should not return err", func() { Expect(err).Should(BeNil()) }) - It("should has revision", func() { - Expect(kv.Revision).ShouldNot(BeZero()) - }) It("should has ID", func() { Expect(kv.ID.Hex()).ShouldNot(BeEmpty()) }) @@ -89,7 +81,7 @@ var _ = Describe("Kv mongodb service", func() { }) }) Context("with labels app,and update value", func() { - beforeKV, err := s.CreateOrUpdate(&KV{ + beforeKV, err := s.CreateOrUpdate(context.Background(), "default", &model.KVDoc{ Key: "timeout", Value: "1s", Domain: "default", @@ -100,13 +92,13 @@ var _ = Describe("Kv mongodb service", func() { It("should not return err", func() { Expect(err).Should(BeNil()) }) - kvs1, err := s.Find("default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ + kvs1, err := s.FindKV(context.Background(), "default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ "app": "mall", }), dao.WithExactLabels()) It("should be 1s", func() { Expect(kvs1[0].Value).Should(Equal(beforeKV.Value)) }) - afterKV, err := s.CreateOrUpdate(&KV{ + afterKV, err := s.CreateOrUpdate(context.Background(), "default", &model.KVDoc{ Key: "timeout", Value: "3s", Domain: "default", @@ -117,13 +109,13 @@ var _ = Describe("Kv mongodb service", func() { It("should has same id", func() { Expect(afterKV.ID.Hex()).Should(Equal(beforeKV.ID.Hex())) }) - oid, err := s.Exist("timeout", "default", map[string]string{ + oid, err := s.KVExist(context.Background(), "default", "timeout", dao.WithLabels(map[string]string{ "app": "mall", - }) + })) It("should exists", func() { - Expect(oid).Should(Equal(beforeKV.ID.Hex())) + Expect(oid.Hex()).Should(Equal(beforeKV.ID.Hex())) }) - kvs, err := s.Find("default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ + kvs, err := s.FindKV(context.Background(), "default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ "app": "mall", }), dao.WithExactLabels()) It("should be 3s", func() { @@ -134,7 +126,7 @@ var _ = Describe("Kv mongodb service", func() { Describe("greedy find by kv and labels", func() { Context("with labels app ", func() { - kvs, err := s.Find("default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ + kvs, err := s.FindKV(context.Background(), "default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ "app": "mall", })) It("should not return err", func() { @@ -148,7 +140,7 @@ var _ = Describe("Kv mongodb service", func() { }) Describe("exact find by kv and labels", func() { Context("with labels app ", func() { - kvs, err := s.Find("default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ + kvs, err := s.FindKV(context.Background(), "default", dao.WithKey("timeout"), dao.WithLabels(map[string]string{ "app": "mall", }), dao.WithExactLabels()) It("should not return err", func() { @@ -162,7 +154,7 @@ var _ = Describe("Kv mongodb service", func() { }) Describe("exact find by labels", func() { Context("with labels app ", func() { - kvs, err := s.Find("default", dao.WithLabels(map[string]string{ + kvs, err := s.FindKV(context.Background(), "default", dao.WithLabels(map[string]string{ "app": "mall", }), dao.WithExactLabels()) It("should not return err", func() { @@ -176,7 +168,7 @@ var _ = Describe("Kv mongodb service", func() { }) Describe("greedy find by labels", func() { Context("with labels app ans service ", func() { - kvs, err := s.Find("default", dao.WithLabels(map[string]string{ + kvs, err := s.FindKV(context.Background(), "default", dao.WithLabels(map[string]string{ "app": "mall", "service": "cart", })) @@ -192,10 +184,9 @@ var _ = Describe("Kv mongodb service", func() { Describe("delete key", func() { Context("delete key by id,seperated by ',' ", func() { - kv1, err := s.CreateOrUpdate(&model.KV{ - Key: "timeout", - Value: "20s", - Domain: "default", + kv1, err := s.CreateOrUpdate(context.Background(), "default", &model.KVDoc{ + Key: "timeout", + Value: "20s", Labels: map[string]string{ "env": "test", }, @@ -204,7 +195,7 @@ var _ = Describe("Kv mongodb service", func() { Expect(err).Should(BeNil()) }) - kv2, err := s.CreateOrUpdate(&model.KV{ + kv2, err := s.CreateOrUpdate(context.Background(), "default", &model.KVDoc{ Key: "times", Value: "3", Domain: "default", diff --git a/server/dao/label.go b/server/dao/label.go new file mode 100644 index 0000000..3fddd2f --- /dev/null +++ b/server/dao/label.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dao + +import ( + "context" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/go-mesh/openlogging" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" +) + +func (s *MongodbService) createLabel(ctx context.Context, domain string, labels map[string]string) (*model.LabelDoc, error) { + l := &model.LabelDoc{ + Domain: domain, + Labels: labels, + } + collection := s.c.Database(DB).Collection(CollectionLabel) + res, err := collection.InsertOne(ctx, l) + if err != nil { + return nil, err + } + objectID, _ := res.InsertedID.(primitive.ObjectID) + l.ID = objectID + return l, nil +} +func (s *MongodbService) findOneLabels(ctx context.Context, filter bson.M) (*model.LabelDoc, error) { + collection := s.c.Database(DB).Collection(CollectionLabel) + ctx, _ = context.WithTimeout(context.Background(), DefaultTimeout) + sr := collection.FindOne(ctx, filter) + if sr.Err() != nil { + return nil, sr.Err() + } + l := &model.LabelDoc{} + err := sr.Decode(l) + if err != nil { + if err == mongo.ErrNoDocuments { + return nil, ErrLabelNotExists + } + openlogging.Error("decode error: " + err.Error()) + return nil, err + } + return l, nil +} +func (s *MongodbService) LabelsExist(ctx context.Context, domain string, labels map[string]string) (primitive.ObjectID, error) { + l, err := s.FindLabels(ctx, domain, labels) + if err != nil { + return primitive.NilObjectID, err + } + + return l.ID, nil + +} diff --git a/server/dao/label_history.go b/server/dao/label_history.go new file mode 100644 index 0000000..fb8164f --- /dev/null +++ b/server/dao/label_history.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dao + +import ( + "context" + + "fmt" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/go-mesh/openlogging" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func (s *MongodbService) getLatestLabel(ctx context.Context, labelID string) (*model.LabelRevisionDoc, error) { + collection := s.c.Database(DB).Collection(CollectionLabelRevision) + ctx, _ = context.WithTimeout(ctx, DefaultTimeout) + + filter := bson.M{"label_id": labelID} + + cur, err := collection.Find(ctx, filter, + options.Find().SetSort(map[string]interface{}{ + "revision": -1, + }), options.Find().SetLimit(1)) + if err != nil { + return nil, err + } + h := &model.LabelRevisionDoc{} + var exist bool + for cur.Next(ctx) { + if err := cur.Decode(h); err != nil { + openlogging.Error("decode to KVs error: " + err.Error()) + return nil, err + } + exist = true + break + } + if !exist { + return nil, ErrRevisionNotExist + } + return h, nil +} +func (s *MongodbService) AddHistory(ctx context.Context, labelID string, labels map[string]string, domain string) (int, error) { + r, err := s.getLatestLabel(ctx, labelID) + if err != nil { + if err == ErrRevisionNotExist { + openlogging.Warn(fmt.Sprintf("label revision not exists, create first label revision")) + r = &model.LabelRevisionDoc{ + LabelID: labelID, + Labels: labels, + Domain: domain, + Revision: 0, + } + } else { + openlogging.Error(fmt.Sprintf("get latest [%s] in [%s],err: %s", + labelID, domain, err.Error())) + return 0, err + } + + } + r.Revision = r.Revision + 1 + + kvs, err := s.findKeys(ctx, bson.M{"label_id": labelID}, true) + if err != nil { + return 0, err + } + //save current kv states + r.KVs = kvs + //clear prev id + r.ID = primitive.NilObjectID + collection := s.c.Database(DB).Collection(CollectionLabelRevision) + _, err = collection.InsertOne(ctx, r) + if err != nil { + openlogging.Error(err.Error()) + return 0, err + } + + hex, err := primitive.ObjectIDFromHex(labelID) + if err != nil { + openlogging.Error(fmt.Sprintf("convert %s,err:%s", labelID, err)) + return 0, err + } + labelCollection := s.c.Database(DB).Collection(CollectionLabel) + _, err = labelCollection.UpdateOne(ctx, bson.M{"_id": hex}, bson.D{ + {"$set", bson.D{ + {"revision", r.Revision}, + }}, + }) + if err != nil { + return 0, err + } + openlogging.Debug(fmt.Sprintf("update revision to %d", r.Revision)) + + return r.Revision, nil +} diff --git a/server/dao/mongodb_operator.go b/server/dao/mongodb_operator.go new file mode 100644 index 0000000..ce672d1 --- /dev/null +++ b/server/dao/mongodb_operator.go @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dao + +import ( + "context" + + "fmt" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/go-mesh/openlogging" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +//createKey get latest revision from history +//and increase revision of label +//and insert key +func (s *MongodbService) createKey(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) { + r, err := s.getLatestLabel(ctx, kv.LabelID) + if err != nil { + if err != ErrRevisionNotExist { + openlogging.Error(fmt.Sprintf("get latest [%s][%s] in [%s],err: %s", + kv.Key, kv.Labels, kv.Domain, err.Error())) + return nil, err + } + //the first time labels is created, at this time, labels has no revision yet + //after first key created, labels got revision 1 + r = &model.LabelRevisionDoc{Revision: 0} + } + if r != nil { + r.Revision = r.Revision + 1 + } + collection := s.c.Database(DB).Collection(CollectionKV) + res, err := collection.InsertOne(ctx, kv) + if err != nil { + return nil, err + } + objectID, _ := res.InsertedID.(primitive.ObjectID) + kv.ID = objectID + revision, err := s.AddHistory(ctx, kv.LabelID, kv.Labels, kv.Domain) + if err != nil { + openlogging.Warn( + fmt.Sprintf("can not updateKey version for [%s] [%s] in [%s]", + kv.Key, kv.Labels, kv.Domain)) + } + openlogging.Debug(fmt.Sprintf("create %s with labels %s value [%s]", kv.Key, kv.Labels, kv.Value)) + kv.Revision = revision + return kv, nil + +} + +//updateKey get latest revision from history +//and increase revision of label +//and updateKey and them add new revision +func (s *MongodbService) updateKey(ctx context.Context, kv *model.KVDoc) (int, error) { + collection := s.c.Database(DB).Collection(CollectionKV) + ur, err := collection.UpdateOne(ctx, bson.M{"key": kv.Key, "label_id": kv.LabelID}, bson.D{ + {"$set", bson.D{ + {"value", kv.Value}, + {"checker", kv.Checker}, + }}, + }) + if err != nil { + return 0, err + } + openlogging.Debug( + fmt.Sprintf("updateKey %s with labels %s value [%s] %d ", + kv.Key, kv.Labels, kv.Value, ur.ModifiedCount)) + revision, err := s.AddHistory(ctx, kv.LabelID, kv.Labels, kv.Domain) + if err != nil { + openlogging.Warn( + fmt.Sprintf("can not label revision for [%s] [%s] in [%s],err: %s", + kv.Key, kv.Labels, kv.Domain, err)) + } + openlogging.Debug( + fmt.Sprintf("add history %s with labels %s value [%s] %d ", + kv.Key, kv.Labels, kv.Value, ur.ModifiedCount)) + return revision, nil + +} diff --git a/server/dao/options.go b/server/dao/options.go index 2116b67..e1af384 100644 --- a/server/dao/options.go +++ b/server/dao/options.go @@ -17,12 +17,12 @@ package dao -import "github.com/apache/servicecomb-kie/pkg/model" - type FindOptions struct { ExactLabels bool Key string - Labels model.Labels + Labels map[string]string + LabelID string + ClearLabel bool } type FindOption func(*FindOptions) @@ -42,8 +42,22 @@ func WithKey(key string) FindOption { } //WithLabels find kv by labels -func WithLabels(labels model.Labels) FindOption { +func WithLabels(labels map[string]string) FindOption { return func(o *FindOptions) { o.Labels = labels } } + +//WithLabels find kv by labelID +func WithLabelID(label string) FindOption { + return func(o *FindOptions) { + o.LabelID = label + } +} + +//WithOutLabelField will clear all labels attributes in kv doc +func WithOutLabelField() FindOption { + return func(o *FindOptions) { + o.ClearLabel = true + } +} diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go index c9fb5bb..2293b72 100644 --- a/server/resource/v1/common.go +++ b/server/resource/v1/common.go @@ -54,11 +54,11 @@ func WriteErrResponse(context *restful.Context, status int, msg string) { context.Write(b) } -func ErrLog(action string, kv *model.KV, err error) { +func ErrLog(action string, kv *model.KVDoc, err error) { openlogging.Error(fmt.Sprintf("[%s] [%v] err:%s", action, kv, err.Error())) } -func InfoLog(action string, kv *model.KV) { +func InfoLog(action string, kv *model.KVDoc) { openlogging.Info( fmt.Sprintf("[%s] [%s:%s] in [%s] success", action, kv.Key, kv.Value, kv.Domain)) } diff --git a/client/options.go b/server/resource/v1/history_resource.go similarity index 70% copy from client/options.go copy to server/resource/v1/history_resource.go index 351b476..3952a19 100644 --- a/client/options.go +++ b/server/resource/v1/history_resource.go @@ -15,23 +15,7 @@ * limitations under the License. */ -package client +package v1 - - -type GetOption func(*GetOptions) -type GetOptions struct { - Labels map[string]string - MatchMode string -} - -func WithLables(l map[string]string) GetOption { - return func(options *GetOptions) { - options.Labels = l - } -} -func WithMatchMode(m string) GetOption { - return func(options *GetOptions) { - options.MatchMode = m - } +type HistoryResource struct { } diff --git a/server/resource/v1/kv_resource.go b/server/resource/v1/kv_resource.go index 0ecd74c..db2d2b6 100644 --- a/server/resource/v1/kv_resource.go +++ b/server/resource/v1/kv_resource.go @@ -37,7 +37,7 @@ type KVResource struct { func (r *KVResource) Put(context *restful.Context) { var err error key := context.ReadPathParameter("key") - kv := new(model.KV) + kv := new(model.KVDoc) decoder := json.NewDecoder(context.ReadRequest().Body) if err = decoder.Decode(kv); err != nil { WriteErrResponse(context, http.StatusInternalServerError, err.Error()) @@ -48,13 +48,12 @@ func (r *KVResource) Put(context *restful.Context) { WriteErrResponse(context, http.StatusInternalServerError, MsgDomainMustNotBeEmpty) } kv.Key = key - kv.Domain = domain.(string) s, err := dao.NewKVService() if err != nil { WriteErrResponse(context, http.StatusInternalServerError, err.Error()) return } - kv, err = s.CreateOrUpdate(kv) + kv, err = s.CreateOrUpdate(context.Ctx, domain.(string), kv) if err != nil { ErrLog("put", kv, err) WriteErrResponse(context, http.StatusInternalServerError, err.Error()) @@ -92,18 +91,18 @@ func (r *KVResource) FindWithKey(context *restful.Context) { return } policy := ReadMatchPolicy(context) - var kvs []*model.KV + var kvs []*model.KVDoc switch policy { case common.MatchGreedy: - kvs, err = s.Find(domain.(string), dao.WithKey(key), dao.WithLabels(labels)) + kvs, err = s.FindKV(context.Ctx, domain.(string), dao.WithKey(key), dao.WithLabels(labels)) case common.MatchExact: - kvs, err = s.Find(domain.(string), dao.WithKey(key), dao.WithLabels(labels), + kvs, err = s.FindKV(context.Ctx, domain.(string), dao.WithKey(key), dao.WithLabels(labels), dao.WithExactLabels()) default: WriteErrResponse(context, http.StatusBadRequest, MsgIllegalFindPolicy) return } - if err == dao.ErrNotExists { + if err == dao.ErrKeyNotExists { WriteErrResponse(context, http.StatusNotFound, err.Error()) return } @@ -139,18 +138,18 @@ func (r *KVResource) FindByLabels(context *restful.Context) { return } policy := ReadMatchPolicy(context) - var kvs []*model.KV + var kvs []*model.KVDoc switch policy { case common.MatchGreedy: - kvs, err = s.Find(domain.(string), dao.WithLabels(labels)) + kvs, err = s.FindKV(context.Ctx, domain.(string), dao.WithLabels(labels)) case common.MatchExact: - kvs, err = s.Find(domain.(string), dao.WithLabels(labels), + kvs, err = s.FindKV(context.Ctx, domain.(string), dao.WithLabels(labels), dao.WithExactLabels()) default: WriteErrResponse(context, http.StatusBadRequest, MsgIllegalFindPolicy) return } - if err == dao.ErrNotExists { + if err == dao.ErrKeyNotExists { WriteErrResponse(context, http.StatusNotFound, err.Error()) return } @@ -280,7 +279,7 @@ func (r *KVResource) URLPatterns() []restful.Route { Method: http.MethodDelete, Path: "/v1/kv/{ids}", ResourceFuncName: "Delete", - FuncDesc: "delete key by id,seperated by ','", + FuncDesc: "delete key by id,separated by ','", Parameters: []*restful.Parameters{ { DataType: "string",