This is an automated email from the ASF dual-hosted git repository. asifdxtreme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
commit 6afc216839e28680776a539407cfb30cf83ecd5d Author: tian <xiaoliang.t...@gmail.com> AuthorDate: Tue May 7 19:25:30 2019 +0800 add kv data access api --- deployments/docker/docker-compose.yaml | 19 +++ go.mod | 16 ++ pkg/model/kv.go | 49 ++++++ pkg/model/kv_test.go | 45 +++++ server/config/config.go | 44 +++++ server/config/config_test.go | 51 ++++++ server/config/struct.go | 29 ++++ server/kv/errors.go | 35 ++++ server/kv/kv.go | 62 +++++++ server/kv/kv_test.go | 192 +++++++++++++++++++++ server/kv/model_suite_test.go | 44 +++++ server/kv/mongodb.go | 304 +++++++++++++++++++++++++++++++++ server/kv/options.go | 49 ++++++ 13 files changed, 939 insertions(+) diff --git a/deployments/docker/docker-compose.yaml b/deployments/docker/docker-compose.yaml new file mode 100644 index 0000000..2c87590 --- /dev/null +++ b/deployments/docker/docker-compose.yaml @@ -0,0 +1,19 @@ +version: '3.1' +services: + mongo: + image: mongo + restart: always + ports: + - 27017:27017 + environment: + MONGO_INITDB_ROOT_USERNAME: kie + MONGO_INITDB_ROOT_PASSWORD: 123 + + mongo-express: + image: mongo-express + restart: always + ports: + - 8081:8081 + environment: + ME_CONFIG_MONGODB_ADMINUSERNAME: kie + ME_CONFIG_MONGODB_ADMINPASSWORD: 123 \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..da3d3eb --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/apache/servicecomb-kie + +require ( + github.com/go-chassis/go-archaius v0.14.0 + github.com/go-chassis/go-chassis v1.4.0 // indirect + github.com/go-chassis/paas-lager v1.0.2-0.20190328010332-cf506050ddb2 + github.com/go-mesh/openlogging v1.0.1-0.20181205082104-3d418c478b2d + github.com/onsi/ginkgo v1.8.0 + github.com/onsi/gomega v1.5.0 + github.com/stretchr/testify v1.2.2 + github.com/urfave/cli v1.20.0 // indirect + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect + github.com/xdg/stringprep v1.0.0 // indirect + go.mongodb.org/mongo-driver v1.0.0 + gopkg.in/yaml.v2 v2.2.1 +) diff --git a/pkg/model/kv.go b/pkg/model/kv.go new file mode 100644 index 0000000..1c31fc0 --- /dev/null +++ b/pkg/model/kv.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type Labels map[string]string + +//func (m Labels) ToString() string { +// sb := strings.Builder{} +// for k, v := range m { +// sb.WriteString(k + "=" + v + ",") +// } +// return sb.String() +//} + +type KV struct { + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + Key string `json:"key"` + Value string `json:"value"` + ValueType string `json:"valueType"` //ini,json,text,yaml,properties + Domain string `json:"domain"` //tenant info + Labels map[string]string `json:"labels,omitempty"` //key has labels + Checker string `json:"check,omitempty"` //python script + Revision int `json:"revision"` +} +type KVHistory struct { + KID string `json:"id,omitempty" bson:"kvID"` + Value string `json:"value"` + Checker string `json:"check,omitempty"` //python script + Revision int `json:"revision"` +} diff --git a/pkg/model/kv_test.go b/pkg/model/kv_test.go new file mode 100644 index 0000000..cbb05c0 --- /dev/null +++ b/pkg/model/kv_test.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model_test + +import ( + "encoding/json" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestKV_UnmarshalJSON(t *testing.T) { + kv := &model.KV{ + Value: "test", + Labels: map[string]string{ + "test": "env", + }, + } + b, _ := json.Marshal(kv) + t.Log(string(b)) + + var kv2 model.KV + err := json.Unmarshal([]byte(` + {"value": "1","labels":{"test":"env"}} + `), &kv2) + assert.NoError(t, err) + assert.Equal(t, "env", kv2.Labels["test"]) + assert.Equal(t, "1", kv2.Value) + +} diff --git a/server/config/config.go b/server/config/config.go new file mode 100644 index 0000000..7e03731 --- /dev/null +++ b/server/config/config.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "github.com/go-chassis/go-archaius" + "github.com/go-chassis/go-archaius/sources/file-source" + "gopkg.in/yaml.v2" + "path/filepath" +) + +var configurations *Config + +func Init(file string) error { + if err := archaius.AddFile(file, archaius.WithFileHandler(filesource.UseFileNameAsKeyContentAsValue)); err != nil { + return err + } + _, filename := filepath.Split(file) + content := archaius.GetString(filename, "") + configurations = &Config{} + if err := yaml.Unmarshal([]byte(content), configurations); err != nil { + return err + } + return nil +} + +func GetDB() DB { + return configurations.DB +} diff --git a/server/config/config_test.go b/server/config/config_test.go new file mode 100644 index 0000000..75ca3ae --- /dev/null +++ b/server/config/config_test.go @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config_test + +import ( + "github.com/apache/servicecomb-kie/server/config" + "github.com/go-chassis/go-archaius" + "github.com/stretchr/testify/assert" + "io" + "os" + "testing" +) + +func TestInit(t *testing.T) { + err := archaius.Init() + assert.NoError(t, err) + b := []byte(` +db: + uri: mongodb://admin:123@127.0.0.1:27017/kie + type: mongodb + poolSize: 10 + ssl: false + sslCA: + sslCert: + +`) + defer os.Remove("test.yaml") + f1, err := os.Create("test.yaml") + assert.NoError(t, err) + _, err = io.WriteString(f1, string(b)) + assert.NoError(t, err) + err = config.Init("test.yaml") + assert.NoError(t, err) + assert.Equal(t, 10, config.GetDB().PoolSize) + assert.Equal(t, "mongodb://admin:123@127.0.0.1:27017/kie", config.GetDB().URI) +} diff --git a/server/config/struct.go b/server/config/struct.go new file mode 100644 index 0000000..cbfb644 --- /dev/null +++ b/server/config/struct.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +type Config struct { + DB DB `yaml:"db"` +} +type DB struct { + URI string `yaml:"uri"` + PoolSize int `yaml:"poolSize"` + SSL bool `yaml:"ssl"` + CABundle []string `yaml:"sslCA"` + Cert string `yaml:"sslCert"` +} diff --git a/server/kv/errors.go b/server/kv/errors.go new file mode 100644 index 0000000..958a015 --- /dev/null +++ b/server/kv/errors.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import ( + "errors" + "fmt" + + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/go-mesh/openlogging" +) + +//ErrAction will wrap raw error to biz error and return +//it record audit log for mongodb operation failure like find, insert, update, deletion +func ErrAction(action, key string, labels model.Labels, domain string, err error) error { + msg := fmt.Sprintf("can not [%s] [%s] in [%s] with [%s],err: %s", action, key, domain, labels, err.Error()) + openlogging.Error(msg) + return errors.New(msg) + +} diff --git a/server/kv/kv.go b/server/kv/kv.go new file mode 100644 index 0000000..9513e60 --- /dev/null +++ b/server/kv/kv.go @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import ( + "crypto/tls" + "errors" + "time" + "github.com/apache/servicecomb-kie/server/config" + "github.com/apache/servicecomb-kie/pkg/model" +) + +var ErrMissingDomain = errors.New("domain info missing, illegal access") +var ErrNotExists = errors.New("key with labels does not exits") +var ErrTooMany = errors.New("key with labels should be only one") +var ErrKeyMustNotEmpty = errors.New("must supply key if you want to get exact one result") + +type Service interface { + CreateOrUpdate(kv *model.KV) (*model.KV, error) + //do not use primitive.ObjectID as return to decouple with mongodb, we can afford perf lost + Exist(key, domain string, labels model.Labels) (string, error) + DeleteByID(id string) error + Delete(key, domain string, labels model.Labels) error + Find(domain string, options ...CallOption) ([]*model.KV, error) + AddHistory(kv *model.KV) error + //RollBack(kv *KV, version string) error +} + +type Options struct { + URI string + PoolSize int + SSL bool + TLS *tls.Config + Timeout time.Duration +} + +func NewKVService() (Service, error) { + opts := Options{ + URI: config.GetDB().URI, + PoolSize: config.GetDB().PoolSize, + SSL: config.GetDB().SSL, + } + if opts.SSL { + + } + return NewMongoService(opts) +} diff --git a/server/kv/kv_test.go b/server/kv/kv_test.go new file mode 100644 index 0000000..4bdd043 --- /dev/null +++ b/server/kv/kv_test.go @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv_test + +import ( + . "github.com/apache/servicecomb-kie/pkg/model" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/server/kv" +) + +var _ = Describe("Kv mongodb service", func() { + var s kv.Service + var err error + Describe("connecting db", func() { + s, err = kv.NewMongoService(kv.Options{ + URI: "mongodb://kie:123@127.0.0.1:27017", + }) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + }) + + Describe("put kv timeout", func() { + Context("with labels app and service", func() { + kv, err := s.CreateOrUpdate(&model.KV{ + Key: "timeout", + Value: "2s", + Domain: "default", + Labels: map[string]string{ + "app": "mall", + "service": "cart", + }, + }) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + It("should has revision", func() { + Expect(kv.Revision).ShouldNot(BeZero()) + }) + It("should has ID", func() { + Expect(kv.ID.Hex()).ShouldNot(BeEmpty()) + }) + + }) + Context("with labels app, service and version", func() { + kv, err := s.CreateOrUpdate(&KV{ + Key: "timeout", + Value: "2s", + Domain: "default", + Labels: map[string]string{ + "app": "mall", + "service": "cart", + "version": "1.0.0", + }, + }) + oid, err := s.Exist("timeout", "default", map[string]string{ + "app": "mall", + "service": "cart", + "version": "1.0.0", + }) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + It("should has revision", func() { + Expect(kv.Revision).ShouldNot(BeZero()) + }) + It("should has ID", func() { + Expect(kv.ID.Hex()).ShouldNot(BeEmpty()) + }) + It("should exist", func() { + Expect(oid).ShouldNot(BeEmpty()) + }) + }) + Context("with labels app,and update value", func() { + beforeKV, err := s.CreateOrUpdate(&KV{ + Key: "timeout", + Value: "1s", + Domain: "default", + Labels: map[string]string{ + "app": "mall", + }, + }) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + kvs1, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{ + "app": "mall", + }), kv.WithExactLabels()) + It("should be 1s", func() { + Expect(kvs1[0].Value).Should(Equal(beforeKV.Value)) + }) + afterKV, err := s.CreateOrUpdate(&KV{ + Key: "timeout", + Value: "3s", + Domain: "default", + Labels: map[string]string{ + "app": "mall", + }, + }) + It("should has same id", func() { + Expect(afterKV.ID.Hex()).Should(Equal(beforeKV.ID.Hex())) + }) + oid, err := s.Exist("timeout", "default", map[string]string{ + "app": "mall", + }) + It("should exists", func() { + Expect(oid).Should(Equal(beforeKV.ID.Hex())) + }) + kvs, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{ + "app": "mall", + }), kv.WithExactLabels()) + It("should be 3s", func() { + Expect(kvs[0].Value).Should(Equal(afterKV.Value)) + }) + }) + }) + + Describe("greedy find by kv and labels", func() { + Context("with labels app ", func() { + kvs, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{ + "app": "mall", + })) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + It("should has 3 records", func() { + Expect(len(kvs)).Should(Equal(3)) + }) + + }) + }) + Describe("exact find by kv and labels", func() { + Context("with labels app ", func() { + kvs, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{ + "app": "mall", + }), kv.WithExactLabels()) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + It("should has 1 records", func() { + Expect(len(kvs)).Should(Equal(1)) + }) + + }) + }) + Describe("exact find by labels", func() { + Context("with labels app ", func() { + kvs, err := s.Find("default", kv.WithLabels(map[string]string{ + "app": "mall", + }), kv.WithExactLabels()) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + It("should has 1 records", func() { + Expect(len(kvs)).Should(Equal(1)) + }) + + }) + }) + Describe("greedy find by labels", func() { + Context("with labels app ans service ", func() { + kvs, err := s.Find("default", kv.WithLabels(map[string]string{ + "app": "mall", + "service": "cart", + })) + It("should not return err", func() { + Expect(err).Should(BeNil()) + }) + It("should has 2 records", func() { + Expect(len(kvs)).Should(Equal(2)) + }) + + }) + }) +}) diff --git a/server/kv/model_suite_test.go b/server/kv/model_suite_test.go new file mode 100644 index 0000000..965802e --- /dev/null +++ b/server/kv/model_suite_test.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv_test + +import ( + "testing" + + "github.com/go-chassis/paas-lager" + "github.com/go-mesh/openlogging" + . "github.com/onsi/ginkgo" + "github.com/onsi/ginkgo/reporters" + . "github.com/onsi/gomega" +) + +func TestModel(t *testing.T) { + RegisterFailHandler(Fail) + junitReporter := reporters.NewJUnitReporter("junit.xml") + RunSpecsWithDefaultAndCustomReporters(t, "Model Suite", []Reporter{junitReporter}) +} + +var _ = BeforeSuite(func() { + log.Init(log.Config{ + Writers: []string{"stdout"}, + LoggerLevel: "DEBUG", + }) + + logger := log.NewLogger("ut") + openlogging.SetLogger(logger) +}) diff --git a/server/kv/mongodb.go b/server/kv/mongodb.go new file mode 100644 index 0000000..37664ae --- /dev/null +++ b/server/kv/mongodb.go @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import ( + "context" + "fmt" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/go-mesh/openlogging" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +var client *mongo.Client + +const ( + DB = "kie" + CollectionKV = "kv" + CollectionRevision = "revision" + DefaultTimeout = 5 * time.Second + DefaultValueType = "text" +) + +type MongodbService struct { + c *mongo.Client + timeout time.Duration +} + +func (s *MongodbService) CreateOrUpdate(kv *model.KV) (*model.KV, error) { + if kv.Domain == "" { + return nil, ErrMissingDomain + } + ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) + collection := s.c.Database(DB).Collection(CollectionKV) + oid, err := s.Exist(kv.Key, kv.Domain, kv.Labels) + if err != nil { + if err != ErrNotExists { + return nil, err + } + } + if oid != "" { + hex, err := primitive.ObjectIDFromHex(oid) + if err != nil { + openlogging.Error(fmt.Sprintf("convert %s ,err:%s", oid, err)) + return nil, err + } + kv.ID = hex + if err := s.update(ctx, collection, kv); err != nil { + return nil, err + } + return kv, nil + } + if kv.ValueType == "" { + kv.ValueType = DefaultValueType + } + //set 1 to revision for insertion + kv.Revision = 1 + res, err := collection.InsertOne(ctx, kv) + if err != nil { + return nil, err + } + objectID, _ := res.InsertedID.(primitive.ObjectID) + kv.ID = objectID + if err := s.AddHistory(kv); err != nil { + openlogging.Warn( + fmt.Sprintf("can not update version for [%s] [%s] in [%s]", + kv.Key, kv.Labels, kv.Domain)) + } + openlogging.Debug(fmt.Sprintf("create %s with labels %s value [%s]", kv.Key, kv.Labels, kv.Value)) + return kv, nil +} + +//update get latest revision from history +//and increase revision +//and update and them add new history +func (s *MongodbService) update(ctx context.Context, collection *mongo.Collection, kv *model.KV) error { + h, err := s.getLatest(kv.ID) + if err != nil { + openlogging.Error(fmt.Sprintf("get latest [%s][%s] in [%s],err: %s", + kv.Key, kv.Labels, kv.Domain, err.Error())) + return err + } + if h != nil { + kv.Revision = h.Revision + 1 + } + ur, err := collection.UpdateOne(ctx, bson.M{"_id": kv.ID}, bson.D{ + {"$set", bson.D{ + {"value", kv.Value}, + {"revision", kv.Revision}, + {"checker", kv.Checker}, + }}, + }) + if err != nil { + return err + } + openlogging.Debug( + fmt.Sprintf("update %s with labels %s value [%s] %d ", + kv.Key, kv.Labels, kv.Value, ur.ModifiedCount)) + if err := s.AddHistory(kv); err != nil { + openlogging.Warn( + fmt.Sprintf("can not update version for [%s] [%s] in [%s]", + kv.Key, kv.Labels, kv.Domain)) + } + openlogging.Debug( + fmt.Sprintf("add history %s with labels %s value [%s] %d ", + kv.Key, kv.Labels, kv.Value, ur.ModifiedCount)) + return nil + +} +func (s *MongodbService) Exist(key, domain string, labels model.Labels) (string, error) { + kvs, err := s.Find(domain, WithExactLabels(), WithLabels(labels), WithKey(key)) + if err != nil { + return "", err + } + if len(kvs) != 1 { + return "", ErrTooMany + } + + return kvs[0].ID.Hex(), nil + +} + +//Find get kvs by key, labels +//because labels has a a lot of combination, +//you can use WithExactLabels to return only one kv which's labels exactly match the criteria +func (s *MongodbService) Find(domain string, options ...CallOption) ([]*model.KV, error) { + opts := CallOptions{} + for _, o := range options { + o(&opts) + } + if domain == "" { + return nil, ErrMissingDomain + } + collection := s.c.Database(DB).Collection(CollectionKV) + ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) + filter := bson.M{"domain": domain} + if opts.Key != "" { + filter["key"] = opts.Key + } + for k, v := range opts.Labels { + filter["labels."+k] = v + } + + cur, err := collection.Find(ctx, filter) + if err != nil { + if err.Error() == context.DeadlineExceeded.Error() { + return nil, ErrAction("find", opts.Key, opts.Labels, domain, fmt.Errorf("can not reach mongodb in %s", s.timeout)) + } + return nil, err + } + defer cur.Close(ctx) + if cur.Err() != nil { + return nil, err + } + if opts.ExactLabels { + openlogging.Debug(fmt.Sprintf("find one [%s] with lables [%s] in [%s]", opts.Key, opts.Labels, domain)) + curKV := &model.KV{} //reuse this pointer to reduce GC, only clear label + //check label length to get the exact match + for cur.Next(ctx) { //although complexity is O(n), but there won't be so much labels for one key + curKV.Labels = nil + err := cur.Decode(curKV) + if err != nil { + openlogging.Error("decode error: " + err.Error()) + return nil, err + } + if len(curKV.Labels) == len(opts.Labels) { + openlogging.Debug("hit") + return []*model.KV{curKV}, nil + } + + } + return nil, ErrNotExists + } else { + kvs := make([]*model.KV, 0) + for cur.Next(ctx) { + curKV := &model.KV{} + if err := cur.Decode(curKV); err != nil { + openlogging.Error("decode to KVs error: " + err.Error()) + return nil, err + } + kvs = append(kvs, curKV) + + } + if len(kvs) == 0 { + return nil, ErrNotExists + } + return kvs, nil + } + +} +func (s *MongodbService) DeleteByID(id string) error { + collection := s.c.Database(DB).Collection(CollectionKV) + hex, err := primitive.ObjectIDFromHex(id) + if err != nil { + openlogging.Error(fmt.Sprintf("convert %s ,err:%s", id, err)) + return err + } + ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) + dr, err := collection.DeleteOne(ctx, bson.M{"_id": hex}) + if err != nil { + openlogging.Error(fmt.Sprintf("delete [%s] failed: %s", hex, err)) + } + if dr.DeletedCount != 1 { + openlogging.Warn(fmt.Sprintf("delete [%s], but it is not exist", hex)) + } + return nil +} + +func (s *MongodbService) Delete(key, domain string, labels model.Labels) error { + return nil +} +func (s *MongodbService) AddHistory(kv *model.KV) error { + collection := s.c.Database(DB).Collection(CollectionRevision) + ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) + h := &model.KVHistory{ + KID: kv.ID.Hex(), + Value: kv.Value, + Revision: kv.Revision, + Checker: kv.Checker, + } + _, err := collection.InsertOne(ctx, h) + if err != nil { + openlogging.Error(err.Error()) + return err + } + return nil +} +func (s *MongodbService) getLatest(id primitive.ObjectID) (*model.KVHistory, error) { + collection := s.c.Database(DB).Collection(CollectionRevision) + ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout) + + filter := bson.M{"kvID": id.Hex()} + + cur, err := collection.Find(ctx, filter, + options.Find().SetSort(map[string]interface{}{ + "revision": -1, + }), options.Find().SetLimit(1)) + if err != nil { + return nil, err + } + h := &model.KVHistory{} + var exist bool + for cur.Next(ctx) { + if err := cur.Decode(h); err != nil { + openlogging.Error("decode to KVs error: " + err.Error()) + return nil, err + } + exist = true + break + } + if !exist { + return nil, nil + } + return h, nil +} +func NewMongoService(opts Options) (Service, error) { + if opts.Timeout == 0 { + opts.Timeout = DefaultTimeout + } + c, err := getClient(opts) + if err != nil { + return nil, err + } + m := &MongodbService{ + c: c, + timeout: opts.Timeout, + } + return m, nil +} +func getClient(opts Options) (*mongo.Client, error) { + if client == nil { + var err error + client, err = mongo.NewClient(options.Client().ApplyURI(opts.URI)) + if err != nil { + return nil, err + } + openlogging.Info("connecting to " + opts.URI) + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + err = client.Connect(ctx) + if err != nil { + return nil, err + } + openlogging.Info("connected to " + opts.URI) + } + return client, nil +} diff --git a/server/kv/options.go b/server/kv/options.go new file mode 100644 index 0000000..aabe1ec --- /dev/null +++ b/server/kv/options.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kv + +import "github.com/apache/servicecomb-kie/pkg/model" + +type CallOptions struct { + ExactLabels bool + Key string + Labels model.Labels +} + +type CallOption func(*CallOptions) + +//WithExactLabels tell model service to return only one kv matches the labels +func WithExactLabels() CallOption { + return func(o *CallOptions) { + o.ExactLabels = true + } +} + +//WithKey find by key +func WithKey(key string) CallOption { + return func(o *CallOptions) { + o.Key = key + } +} + +//WithLabels find kv by labels +func WithLabels(labels model.Labels) CallOption { + return func(o *CallOptions) { + o.Labels = labels + } +}