This is an automated email from the ASF dual-hosted git repository. tianxiaoliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new 0c9258e [SCB-2094] implement mongo instance/dependence/engine interface (#759) 0c9258e is described below commit 0c9258e3761ba1838948c9a48bd3cfd12d8e5798 Author: xzccfzy <50447852+xzcc...@users.noreply.github.com> AuthorDate: Wed Dec 2 08:56:21 2020 +0800 [SCB-2094] implement mongo instance/dependence/engine interface (#759) --- datasource/dependency_util.go | 47 + datasource/mongo/account.go | 11 +- datasource/mongo/client/common.go | 1 + datasource/mongo/client/mongo.go | 12 +- datasource/mongo/common.go | 34 + datasource/mongo/database.go | 61 +- datasource/mongo/dep.go | 340 ++++++- datasource/mongo/dep_test.go | 351 +++++++ datasource/mongo/engine.go | 213 +++- datasource/mongo/heartbeat/common.go | 25 + .../mongo/heartbeat/heartbeatchecker/heartbeat.go | 6 +- .../heartbeat/heartbeatchecker/heartbeat_test.go | 10 +- .../heartbeat/heartbeatchecker/heartbeatchecker.go | 5 +- .../heartbeatchecker/heartbeatchecker_test.go | 2 +- datasource/mongo/heartbeat/manager.go | 4 +- datasource/mongo/mongo.go | 4 +- datasource/mongo/ms.go | 1025 ++++++++++++++++++-- datasource/mongo/ms_test.go | 626 +++++++++--- datasource/mongo/{engine.go => util.go} | 35 +- 19 files changed, 2534 insertions(+), 278 deletions(-) diff --git a/datasource/dependency_util.go b/datasource/dependency_util.go new file mode 100644 index 0000000..c907f04 --- /dev/null +++ b/datasource/dependency_util.go @@ -0,0 +1,47 @@ +package datasource + +import ( + "github.com/apache/servicecomb-service-center/datasource/etcd/path" + "github.com/apache/servicecomb-service-center/pkg/log" + pb "github.com/go-chassis/cari/discovery" +) + +func ParamsChecker(consumerInfo *pb.MicroServiceKey, providersInfo []*pb.MicroServiceKey) *pb.CreateDependenciesResponse { + flag := make(map[string]bool, len(providersInfo)) + for _, providerInfo := range providersInfo { + //存在带*的情况,后面的数据就不校验了 + if providerInfo.ServiceName == "*" { + break + } + if len(providerInfo.AppId) == 0 { + providerInfo.AppId = consumerInfo.AppId + } + + version := providerInfo.Version + if len(version) == 0 { + return BadParamsResponse("Required provider version") + } + + providerInfo.Version = "" + if _, ok := flag[toString(providerInfo)]; ok { + return BadParamsResponse("Invalid request body for provider info.Duplicate provider or (serviceName and appId is same).") + } + flag[toString(providerInfo)] = true + providerInfo.Version = version + } + return nil +} + +func BadParamsResponse(detailErr string) *pb.CreateDependenciesResponse { + log.Errorf(nil, "request params is invalid. %s", detailErr) + if len(detailErr) == 0 { + detailErr = "Request params is invalid." + } + return &pb.CreateDependenciesResponse{ + Response: pb.CreateResponse(pb.ErrInvalidParams, detailErr), + } +} + +func toString(in *pb.MicroServiceKey) string { + return path.GenerateProviderDependencyRuleKey(in.Tenant, in) +} diff --git a/datasource/mongo/account.go b/datasource/mongo/account.go index feba2af..ff57904 100644 --- a/datasource/mongo/account.go +++ b/datasource/mongo/account.go @@ -19,7 +19,6 @@ package mongo import ( "context" - "errors" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/datasource/mongo/client" @@ -34,7 +33,7 @@ import ( func (ds *DataSource) CreateAccount(ctx context.Context, a *rbacframe.Account) error { exist, err := ds.AccountExist(ctx, a.Name) if err != nil { - log.Errorf(err, "can not save account info") + log.Error("can not save account info", err) return err } if exist { @@ -42,7 +41,7 @@ func (ds *DataSource) CreateAccount(ctx context.Context, a *rbacframe.Account) e } hash, err := bcrypt.GenerateFromPassword([]byte(a.Password), 14) if err != nil { - log.Errorf(err, "pwd hash failed") + log.Error("pwd hash failed", err) return err } a.Password = stringutil.Bytes2str(hash) @@ -86,7 +85,7 @@ func (ds *DataSource) GetAccount(ctx context.Context, key string) (*rbacframe.Ac var account rbacframe.Account err = result.Decode(&account) if err != nil { - log.Errorf(err, "Decode account failed: ") + log.Error("Decode account failed: ", err) return nil, err } return &account, nil @@ -106,7 +105,7 @@ func (ds *DataSource) ListAccount(ctx context.Context, key string) ([]*rbacframe var account rbacframe.Account err = cursor.Decode(&account) if err != nil { - log.Errorf(err, "Decode account failed: ") + log.Error("Decode account failed: ", err) break } accounts = append(accounts, &account) @@ -142,7 +141,7 @@ func (ds *DataSource) UpdateAccount(ctx context.Context, key string, account *rb return err } if result.ModifiedCount == 0 { - return errors.New("UpdateAccount: no data to update") + return ErrUpdateNodata } return nil } diff --git a/datasource/mongo/client/common.go b/datasource/mongo/client/common.go index a9b7f7d..e71706a 100644 --- a/datasource/mongo/client/common.go +++ b/datasource/mongo/client/common.go @@ -21,4 +21,5 @@ import ( var ( ErrCollectionsNil = errors.New("collection is nil") + ErrOpenDbFailed = errors.New("open db failed") ) diff --git a/datasource/mongo/client/mongo.go b/datasource/mongo/client/mongo.go index eabdb7d..4be189a 100644 --- a/datasource/mongo/client/mongo.go +++ b/datasource/mongo/client/mongo.go @@ -17,7 +17,7 @@ package client import ( "context" - "errors" + "fmt" "time" "github.com/apache/servicecomb-service-center/pkg/gopool" @@ -55,7 +55,7 @@ func GetMongoClient() *MongoClient { func NewMongoClient(config storage.Options) { inst := &MongoClient{} if err := inst.Initialize(config); err != nil { - log.Errorf(err, "failed to init mongodb") + log.Error("failed to init mongodb", err) inst.err <- err } mc = inst @@ -86,7 +86,7 @@ func (mc *MongoClient) Ready() <-chan struct{} { func (mc *MongoClient) Close() { if mc.client != nil { if err := mc.client.Disconnect(context.TODO()); err != nil { - log.Errorf(err, "[close mongo client] failed disconnect the mongo client") + log.Error("[close mongo client] failed disconnect the mongo client", err) } } } @@ -107,7 +107,7 @@ func (mc *MongoClient) HealthCheck(ctx context.Context) { if err == nil { break } - log.Errorf(err, "retry to connect to mongodb %s after %s", mc.dbconfig.URI, MongoCheckDelay) + log.Error(fmt.Sprintf("retry to connect to mongodb %s after %s", mc.dbconfig.URI, MongoCheckDelay), err) select { case <-ctx.Done(): mc.Close() @@ -124,13 +124,13 @@ func (mc *MongoClient) newClient(ctx context.Context) (err error) { mc.client, err = mongo.Connect(ctx, clientOptions) if err != nil { if derr := mc.client.Disconnect(ctx); derr != nil { - log.Errorf(derr, "[init mongo client] failed to disconnect mongo client ") + log.Error("[init mongo client] failed to disconnect mongo clients", derr) } return } mc.db = mc.client.Database(MongoDB) if mc.db == nil { - return errors.New("open db failed") + return ErrOpenDbFailed } return nil } diff --git a/datasource/mongo/common.go b/datasource/mongo/common.go new file mode 100644 index 0000000..8cab32c --- /dev/null +++ b/datasource/mongo/common.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except request compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to request writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mongo + +import "errors" + +var ( + ErrInvalidConsumer = errors.New("Invalid consumer") + ErrUpdateNodata = errors.New("UpdateAccount: no data to update") + ErrServiceFileLost = errors.New("service center service file lost") + ErrInvalidDomainProject = errors.New("invalid domainProject") + ErrNotAllowDeleteSC = errors.New("not allow to delete service center") + ErrDeleteSchemaFailed = errors.New("delete schema failed") + ErrInvalidParamBatchGetInstancesRequest = errors.New("invalid param BatchGetInstancesRequest") +) + +func NewError(errInfo string, errMsg string) error { + return errors.New(errInfo + errMsg) +} diff --git a/datasource/mongo/database.go b/datasource/mongo/database.go index 0dc96f3..3bfdf25 100644 --- a/datasource/mongo/database.go +++ b/datasource/mongo/database.go @@ -32,8 +32,6 @@ const ( AccountTokenExpirationTime = "tokenexpirationtime" AccountCurrentPassword = "currentpassword" AccountStatus = "status" - InstanceID = "instanceinfo.instanceid" - ServiceID = "instanceinfo.serviceid" RefreshTime = "refreshtime" ) @@ -43,36 +41,41 @@ const ( CollectionSchema = "schema" CollectionRule = "rule" CollectionInstance = "instance" + CollectionDep = "dependency" ) const ( + DepsQueueUUID = "0" ErrorDuplicateKey = 11000 ) const ( - Domain = "domain" - Project = "project" - ServiceTag = "tags" - SchemaID = "schemaid" - RuleServiceID = "serviceid" - RuleRuleID = "ruleinfo.ruleid" - SchemaServiceID = "serviceid" - ServiceServiceID = "serviceinfo.serviceid" - ServiceProperty = "serviceinfo.properties" - ServiceModTime = "serviceinfo.modtimestamp" - ServiceEnv = "serviceinfo.environment" - ServiceAppID = "serviceinfo.appid" - ServiceServiceName = "serviceinfo.servicename" - ServiceAlias = "serviceinfo.alias" - ServiceVersion = "serviceinfo.version" - ServiceSchemas = "serviceinfo.schemas" - RuleAttribute = "ruleinfo.attribute" - RulePattern = "ruleinfo.pattern" - RuleModTime = "ruleinfo.modtimestamp" - RuleDescription = "ruleinfo.description" - RuleRuletype = "ruleinfo.ruletype" - SchemaInfo = "schemainfo" - SchemaSummary = "schemasummary" + ColumnDomain = "domain" + ColumnProject = "project" + ColumnTag = "tags" + ColumnSchemaID = "schemaid" + ColumnServiceID = "serviceid" + ColumnRuleID = "ruleid" + ColumnServiceInfo = "serviceinfo" + ColumnProperty = "properties" + ColumnModTime = "modtimestamp" + ColumnEnv = "environment" + ColumnAppID = "appid" + ColumnServiceName = "servicename" + ColumnAlias = "alias" + ColumnVersion = "version" + ColumnSchemas = "schemas" + ColumnAttribute = "attribute" + ColumnPattern = "pattern" + ColumnDescription = "description" + ColumnRuleType = "ruletype" + ColumnSchemaInfo = "schemainfo" + ColumnSchemaSummary = "schemasummary" + ColumnConsumer = "consumer" + ColumnDependencyInfo = "dependencyinfo" + ColumnRuleInfo = "ruleinfo" + ColumnInstanceInfo = "instanceinfo" + ColumnInstanceID = "instanceid" ) type Service struct { @@ -104,3 +107,11 @@ type Instance struct { RefreshTime time.Time InstanceInfo *pb.MicroServiceInstance } + +type Dependency struct { + Domain string + Project string + ConsumerID string + UUID string + DependencyInfo *pb.ConsumerDependency +} diff --git a/datasource/mongo/dep.go b/datasource/mongo/dep.go index e3322a2..6a1eef2 100644 --- a/datasource/mongo/dep.go +++ b/datasource/mongo/dep.go @@ -21,20 +21,354 @@ import ( "context" pb "github.com/go-chassis/cari/discovery" + "go.mongodb.org/mongo-driver/bson" + + "fmt" + "strings" + + "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/datasource/mongo/client" + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" ) func (ds *DataSource) SearchProviderDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetProDependenciesResponse, error) { - return &pb.GetProDependenciesResponse{}, nil + providerServiceID := request.ServiceId + filter := GeneratorServiceFilter(ctx, providerServiceID) + provider, err := GetService(ctx, filter) + if err != nil { + log.Error("GetProviderDependencies failed, provider is "+providerServiceID, err) + return nil, err + } + if provider == nil { + log.Error(fmt.Sprintf("GetProviderDependencies failed for provider %s", providerServiceID), err) + return &pb.GetProDependenciesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, "Provider does not exist"), + }, nil + } + + services, err := GetDependencyProviders(ctx, provider.ServiceInfo, request) + if err != nil { + log.Error(fmt.Sprintf("GetProviderDependencies failed, provider is %s/%s/%s/%s", + provider.ServiceInfo.Environment, provider.ServiceInfo.AppId, provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version), err) + return &pb.GetProDependenciesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + + return &pb.GetProDependenciesResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Get all consumers successful."), + Consumers: services, + }, nil } func (ds *DataSource) SearchConsumerDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetConDependenciesResponse, error) { - return &pb.GetConDependenciesResponse{}, nil + consumerID := request.ServiceId + + filter := GeneratorServiceFilter(ctx, consumerID) + consumer, err := GetService(ctx, filter) + if err != nil { + log.Error(fmt.Sprintf("GetConsumerDependencies failed, consumer is %s", consumerID), err) + return nil, err + } + if consumer == nil { + log.Error(fmt.Sprintf("GetConsumerDependencies failed for consumer %s does not exist", consumerID), err) + return &pb.GetConDependenciesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, "Consumer does not exist"), + }, nil + } + + services, err := GetDependencyProviders(ctx, consumer.ServiceInfo, request) + if err != nil { + log.Error(fmt.Sprintf("GetConsumerDependencies failed, consumer is %s/%s/%s/%s", + consumer.ServiceInfo.Environment, consumer.ServiceInfo.AppId, consumer.ServiceInfo.ServiceName, consumer.ServiceInfo.Version), err) + return &pb.GetConDependenciesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + + return &pb.GetConDependenciesResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Get all providers successfully."), + Providers: services, + }, nil } func (ds *DataSource) AddOrUpdateDependencies(ctx context.Context, dependencyInfos []*pb.ConsumerDependency, override bool) (*pb.Response, error) { - return pb.CreateResponse(pb.ResponseSuccess, ""), nil + domainProject := util.ParseDomainProject(ctx) + for _, dependencyInfo := range dependencyInfos { + consumerFlag := util.StringJoin([]string{ + dependencyInfo.Consumer.Environment, + dependencyInfo.Consumer.AppId, + dependencyInfo.Consumer.ServiceName, + dependencyInfo.Consumer.Version}, "/") + consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{dependencyInfo.Consumer}, domainProject)[0] + providersInfo := pb.DependenciesToKeys(dependencyInfo.Providers, domainProject) + + rsp := datasource.ParamsChecker(consumerInfo, providersInfo) + if rsp != nil { + log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t consumer is %s %s", + override, consumerFlag, rsp.Response.GetMessage()), nil) + return rsp.Response, nil + } + + consumerID, err := GetServiceID(ctx, consumerInfo) + if err != nil { + log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, get consumer %s id failed", + override, consumerFlag), err) + return pb.CreateResponse(pb.ErrInternal, err.Error()), err + } + if len(consumerID) == 0 { + log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t consumer %s does not exist", + override, consumerFlag), err) + return pb.CreateResponse(pb.ErrServiceNotExists, fmt.Sprintf("Consumer %s does not exist.", consumerFlag)), nil + } + + dependencyInfo.Override = override + id := DepsQueueUUID + if !override { + id = util.GenerateUUID() + } + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + data := &Dependency{ + Domain: domain, + Project: project, + ConsumerID: consumerID, + UUID: id, + DependencyInfo: dependencyInfo, + } + insertRes, err := client.GetMongoClient().Insert(ctx, CollectionDep, data) + if err != nil { + log.Error("failed to insert dep to mongodb", err) + return pb.CreateResponse(pb.ErrInternal, err.Error()), err + } + log.Error(fmt.Sprintf("insert dep to mongodb success %s", insertRes.InsertedID), err) + } + return pb.CreateResponse(pb.ResponseSuccess, "Create dependency successfully."), nil } func (ds *DataSource) DeleteDependency() { panic("implement me") } + +func GetDependencyProviders(ctx context.Context, consumer *pb.MicroService, request *pb.GetDependenciesRequest) ([]*pb.MicroService, error) { + keys, err := GetProviderKeys(ctx, consumer) + if err != nil { + return nil, err + } + + services := make([]*pb.MicroService, 0, len(keys)) + + for _, key := range keys { + domainProject := util.ParseDomainProject(ctx) + if request.SameDomain && key.Tenant != domainProject { + continue + } + + providerIDs, err := ParseDependencyRule(ctx, key) + if err != nil { + return nil, err + } + + if key.ServiceName == "*" { + services = services[:0] + } + + for _, providerID := range providerIDs { + filter := GeneratorServiceFilter(ctx, providerID) + provider, err := GetService(ctx, filter) + if err != nil { + log.Warn(fmt.Sprintf("get provider[%s/%s/%s/%s] failed", + key.Environment, key.AppId, key.ServiceName, key.Version)) + continue + } + if provider == nil { + log.Warn(fmt.Sprintf("provider[%s/%s/%s/%s] does not exist", + key.Environment, key.AppId, key.ServiceName, key.Version)) + continue + } + if request.NoSelf && providerID == consumer.ServiceId { + continue + } + services = append(services, provider.ServiceInfo) + } + + if key.ServiceName == "*" { + break + } + } + + return services, nil +} + +func GetProviderKeys(ctx context.Context, consumer *pb.MicroService) ([]*pb.MicroServiceKey, error) { + if consumer == nil { + return nil, ErrInvalidConsumer + } + domainProject := util.ParseDomainProject(ctx) + consumerMicroServiceKey := &pb.MicroServiceKey{ + Tenant: domainProject, + Environment: consumer.Environment, + AppId: consumer.AppId, + ServiceName: consumer.ServiceName, + Alias: consumer.Alias, + Version: consumer.Version, + } + + filter := GenerateConsumerDependencyRuleKey(ctx, consumerMicroServiceKey) + + findRes, err := client.GetMongoClient().Find(ctx, CollectionDep, filter) + if err != nil { + return nil, err + } + var services []*pb.MicroServiceKey + for findRes.Next(ctx) { + var tempMongoDep Dependency + err := findRes.Decode(&tempMongoDep) + if err != nil { + return nil, err + } + providers := tempMongoDep.DependencyInfo.Providers + services = append(services, providers...) + } + return services, nil +} + +func GenerateConsumerDependencyRuleKey(ctx context.Context, in *pb.MicroServiceKey) bson.M { + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + if in == nil { + return bson.M{ + ColumnDomain: domain, + ColumnProject: project, + } + } + if in.ServiceName == "*" { + return bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnDependencyInfo, ColumnConsumer, ColumnEnv}): in.Environment, + } + } + return bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnDependencyInfo, ColumnConsumer, ColumnEnv}): in.Environment, + StringBuilder([]string{ColumnDependencyInfo, ColumnConsumer, ColumnAppID}): in.AppId, + StringBuilder([]string{ColumnDependencyInfo, ColumnConsumer, ColumnVersion}): in.Version, + } +} + +func ParseDependencyRule(ctx context.Context, dependencyRule *pb.MicroServiceKey) (serviceIDs []string, err error) { + switch { + case dependencyRule.ServiceName == "*": + splited := strings.Split(dependencyRule.Tenant, "/") + filter := bson.M{ + ColumnDomain: splited[0], + ColumnProject: splited[1], + StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): dependencyRule.Environment} + findRes, err := client.GetMongoClient().Find(ctx, CollectionService, filter) + if err != nil { + return nil, err + } + for findRes.Next(ctx) { + var service Service + err = findRes.Decode(&service) + if err != nil { + return nil, err + } + serviceIDs = append(serviceIDs, service.ServiceInfo.ServiceId) + } + default: + serviceIDs, err = FindServiceIds(ctx, dependencyRule) + } + return +} + +func FindServiceIds(ctx context.Context, key *pb.MicroServiceKey) ([]string, error) { + versionRule := key.Version + splited := strings.Split(key.Tenant, "/") + if len(versionRule) == 0 { + return nil, nil + } + rangeIdx := strings.Index(versionRule, "-") + switch { + case versionRule == "latest": + filter := bson.M{ + ColumnDomain: splited[0], + ColumnProject: splited[1]} + return GetFilterVersionService(ctx, filter) + case versionRule[len(versionRule)-1:] == "+": + start := versionRule[:len(versionRule)-1] + filter := bson.M{ + ColumnDomain: splited[0], + ColumnProject: splited[1], + StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): bson.M{"$gte": start}} + return GetFilterVersionService(ctx, filter) + case rangeIdx > 0: + start := versionRule[:rangeIdx] + end := versionRule[rangeIdx+1:] + filter := bson.M{ + ColumnDomain: splited[0], + ColumnProject: splited[1], + StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): bson.M{"$gts": start, "$lt": end}} + return GetFilterVersionService(ctx, filter) + default: + filter := bson.M{ + ColumnDomain: splited[0], + ColumnProject: splited[1], + StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): key.Environment, + StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): key.AppId, + StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}): key.ServiceName, + StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): key.Version} + return GetFilterVersionService(ctx, filter) + } +} + +func GetFilterVersionService(ctx context.Context, m bson.M) (serviceIDs []string, err error) { + findRes, err := client.GetMongoClient().Find(ctx, CollectionService, m) + if err != nil { + return nil, err + } + for findRes.Next(ctx) { + var service Service + err = findRes.Decode(&service) + if err != nil { + return nil, err + } + serviceIDs = append(serviceIDs, service.ServiceInfo.ServiceId) + } + return +} + +func GetServiceID(ctx context.Context, key *pb.MicroServiceKey) (serviceID string, err error) { + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): key.Environment, + StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): key.AppId, + StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}): key.ServiceName, + StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): key.Version} + + findRes, err := client.GetMongoClient().Find(ctx, CollectionService, filter) + if err != nil { + return "", nil + } + var service []*Service + for findRes.Next(ctx) { + var temp *Service + err := findRes.Decode(&temp) + if err != nil { + return "", nil + } + service = append(service, temp) + } + if service == nil { + return "", nil + } + return service[0].ServiceInfo.ServiceId, nil +} diff --git a/datasource/mongo/dep_test.go b/datasource/mongo/dep_test.go new file mode 100644 index 0000000..77bdc06 --- /dev/null +++ b/datasource/mongo/dep_test.go @@ -0,0 +1,351 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package mongo_test + +import ( + "testing" + + "github.com/apache/servicecomb-service-center/datasource" + pb "github.com/go-chassis/cari/discovery" + "github.com/stretchr/testify/assert" +) + +func TestDep_Creat(t *testing.T) { + + var ( + consumerId1 string + consumerId3 string + ) + + t.Run("creat service, when request is valid, should not pass", func(t *testing.T) { + respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep1", + AppId: "create_dep_group", + ServiceName: "create_dep_consumer", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + consumerId1 = respCreateService.ServiceId + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep2", + AppId: "create_dep_group", + ServiceName: "create_dep_consumer_all", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + consumerId3 = respCreateService.ServiceId + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep3", + Environment: pb.ENV_PROD, + AppId: "create_dep_group", + ServiceName: "create_dep_consumer", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep4", + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep5", + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "1.0.1", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep6", + Environment: pb.ENV_PROD, + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + }) + + t.Run("create dep, when request is valid, should be passed", func(t *testing.T) { + + respCreateDependency, err := datasource.Instance().AddOrUpdateDependencies(getContext(), []*pb.ConsumerDependency{ + { + Consumer: &pb.MicroServiceKey{ + ServiceName: "create_dep_consumer", + AppId: "create_dep_group", + Version: "1.0.0", + }, + Providers: []*pb.MicroServiceKey{ + { + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "latest", + }, + }, + }, + }, false) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateDependency.GetCode()) + + respPro, err := datasource.Instance().SearchConsumerDependency(getContext(), &pb.GetDependenciesRequest{ + ServiceId: consumerId1, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode()) + assert.NotEqual(t, "1.0.1", respPro.Providers[0].Version) + + respCreateDependency, err = datasource.Instance().AddOrUpdateDependencies(getContext(), []*pb.ConsumerDependency{ + { + Consumer: &pb.MicroServiceKey{ + ServiceName: "create_dep_consumer", + AppId: "create_dep_group", + Version: "1.0.0", + }, + Providers: []*pb.MicroServiceKey{ + { + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "1.0.0+", + }, + }, + }, + }, false) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateDependency.GetCode()) + + respPro, err = datasource.Instance().SearchConsumerDependency(getContext(), &pb.GetDependenciesRequest{ + ServiceId: consumerId1, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode()) + assert.NotEqual(t, "1.0.1", respPro.Providers[0].Version) + + respCreateDependency, err = datasource.Instance().AddOrUpdateDependencies(getContext(), []*pb.ConsumerDependency{ + { + Consumer: &pb.MicroServiceKey{ + ServiceName: "create_dep_consumer", + AppId: "create_dep_group", + Version: "1.0.0", + }, + Providers: []*pb.MicroServiceKey{ + { + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "1.0.0+", + }, + }, + }, + }, false) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateDependency.GetCode()) + + respPro, err = datasource.Instance().SearchConsumerDependency(getContext(), &pb.GetDependenciesRequest{ + ServiceId: consumerId1, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode()) + assert.NotEqual(t, 2, len(respPro.Providers)) + + respCreateDependency, err = datasource.Instance().AddOrUpdateDependencies(getContext(), []*pb.ConsumerDependency{ + { + Consumer: &pb.MicroServiceKey{ + ServiceName: "create_dep_consumer_all", + AppId: "create_dep_group", + Version: "1.0.0", + }, + Providers: []*pb.MicroServiceKey{ + { + ServiceName: "*", + }, + }, + }, + }, false) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateDependency.GetCode()) + + respPro, err = datasource.Instance().SearchConsumerDependency(getContext(), &pb.GetDependenciesRequest{ + ServiceId: consumerId3, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode()) + assert.NotEqual(t, 0, len(respPro.Providers)) + + respCreateDependency, err = datasource.Instance().AddOrUpdateDependencies(getContext(), []*pb.ConsumerDependency{ + { + Consumer: &pb.MicroServiceKey{ + ServiceName: "create_dep_consumer_all", + AppId: "create_dep_group", + Version: "1.0.0", + }, + Providers: nil, + }, + }, false) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateDependency.GetCode()) + + respCreateDependency, err = datasource.Instance().AddOrUpdateDependencies(getContext(), []*pb.ConsumerDependency{ + { + Consumer: &pb.MicroServiceKey{ + ServiceName: "create_dep_consumer", + AppId: "create_dep_group", + Version: "1.0.0", + }, + Providers: []*pb.MicroServiceKey{ + { + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "1.0.0", + }, + { + ServiceName: "*", + }, + }, + }, + }, false) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateDependency.GetCode()) + + respCreateDependency, err = datasource.Instance().AddOrUpdateDependencies(getContext(), []*pb.ConsumerDependency{ + { + Consumer: &pb.MicroServiceKey{ + ServiceName: "create_dep_consumer", + AppId: "create_dep_group", + Version: "1.0.0", + }, + Providers: []*pb.MicroServiceKey{ + { + AppId: "create_dep_group", + ServiceName: "create_dep_provider", + Version: "1.0.0-1.0.1", + }, + }, + }, + }, false) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateDependency.GetCode()) + + respPro, err = datasource.Instance().SearchConsumerDependency(getContext(), &pb.GetDependenciesRequest{ + ServiceId: consumerId1, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode()) + assert.Equal(t, "1.0.0", respPro.Providers[0].Version) + }) +} + +func TestDep_Get(t *testing.T) { + + var ( + consumerId1 string + providerId1 string + ) + + t.Run("create service, when request is valid, should be passed", func(t *testing.T) { + respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep7", + AppId: "get_dep_group", + ServiceName: "get_dep_consumer", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + consumerId1 = respCreateService.ServiceId + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep8", + AppId: "get_dep_group", + ServiceName: "get_dep_provider", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + providerId1 = respCreateService.ServiceId + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "dep9", + AppId: "get_dep_group", + ServiceName: "get_dep_provider", + Version: "2.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + }) + + t.Run("execute 'search dep' operation, when request is valid,should be passed", func(t *testing.T) { + respPro, err := datasource.Instance().SearchProviderDependency(getContext(), &pb.GetDependenciesRequest{ + ServiceId: providerId1, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode()) + + respCon, err := datasource.Instance().SearchConsumerDependency(getContext(), &pb.GetDependenciesRequest{ + ServiceId: consumerId1, + }) + + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCon.Response.GetCode()) + + }) +} diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go index 246ce78..e7b799a 100644 --- a/datasource/mongo/engine.go +++ b/datasource/mongo/engine.go @@ -22,17 +22,114 @@ import ( "time" "github.com/apache/servicecomb-service-center/pkg/cluster" + + "fmt" + "strconv" + "strings" + + "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/datasource/etcd/path" + "github.com/apache/servicecomb-service-center/datasource/mongo/client" + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/core" + "github.com/apache/servicecomb-service-center/server/metrics" + pb "github.com/go-chassis/cari/discovery" + "go.mongodb.org/mongo-driver/bson" ) func (ds *DataSource) SelfRegister(ctx context.Context) error { - return nil + err := ds.registryService(ctx) + if err != nil { + return err + } + + // 实例信息 + err = ds.registryInstance(ctx) + + // wait heartbeat + ds.autoSelfHeartBeat() + + metrics.ReportScInstance() + return err } func (ds *DataSource) SelfUnregister(ctx context.Context) error { + if len(core.Instance.InstanceId) == 0 { + return nil + } + + ctx = core.AddDefaultContextValue(ctx) + respI, err := datasource.Instance().UnregisterInstance(ctx, core.UnregisterInstanceRequest()) + if err != nil { + log.Error("unregister failed", err) + return err + } + if respI.Response.GetCode() != pb.ResponseSuccess { + err = fmt.Errorf("unregister service center instance[%s/%s] failed, %s", + core.Instance.ServiceId, core.Instance.InstanceId, respI.Response.GetMessage()) + log.Error(err.Error(), nil) + return err + } + log.Warn(fmt.Sprintf("unregister service center instance[%s/%s]", + core.Service.ServiceId, core.Instance.InstanceId)) return nil } // OPS func (ds *DataSource) ClearNoInstanceServices(ctx context.Context, ttl time.Duration) error { + services, err := GetAllServicesAcrossDomainProject(ctx) + if err != nil { + return err + } + if len(services) == 0 { + log.Info("no service found, no need to clear") + return nil + } + + timeLimit := time.Now().Add(0 - ttl) + log.Info(fmt.Sprintf("clear no-instance services created before %s", timeLimit)) + timeLimitStamp := strconv.FormatInt(timeLimit.Unix(), 10) + + for domainProject, svcList := range services { + if len(svcList) == 0 { + continue + } + ctx, err := ctxFromDomainProject(ctx, domainProject) + if err != nil { + log.Error("get domain project context failed", err) + continue + } + for _, svc := range svcList { + if svc == nil { + continue + } + ok, err := shouldClear(ctx, timeLimitStamp, svc) + if err != nil { + log.Error("check service clear necessity failed", err) + continue + } + if !ok { + continue + } + svcCtxStr := "domainProject: " + domainProject + ", " + + "env: " + svc.Environment + ", " + + "service: " + util.StringJoin([]string{svc.AppId, svc.ServiceName, svc.Version}, path.SPLIT) + delSvcReq := &pb.DeleteServiceRequest{ + ServiceId: svc.ServiceId, + Force: true, //force delete + } + delSvcResp, err := datasource.Instance().UnregisterService(ctx, delSvcReq) + if err != nil { + log.Error(fmt.Sprintf("clear service failed, %s", svcCtxStr), err) + continue + } + if delSvcResp.Response.GetCode() != pb.ResponseSuccess { + log.Error(fmt.Sprintf("clear service failed %s %s", delSvcResp.Response.GetMessage(), svcCtxStr), err) + continue + } + log.Warn(fmt.Sprintf("clear service success, %s", svcCtxStr)) + } + } return nil } @@ -43,3 +140,117 @@ func (ds *DataSource) UpgradeVersion(ctx context.Context) error { func (ds *DataSource) GetClusters(ctx context.Context) (cluster.Clusters, error) { return nil, nil } + +func (ds *DataSource) registryService(pCtx context.Context) error { + ctx := core.AddDefaultContextValue(pCtx) + respE, err := datasource.Instance().ExistService(ctx, core.GetExistenceRequest()) + if err != nil { + log.Error("query service center existence failed", err) + return err + } + if respE.Response.GetCode() == pb.ResponseSuccess { + log.Warn(fmt.Sprintf("service center service[%s] already registered", respE.ServiceId)) + respG, err := datasource.Instance().GetService(ctx, core.GetServiceRequest(respE.ServiceId)) + if respG.Response.GetCode() != pb.ResponseSuccess { + log.Error(fmt.Sprintf("query service center service[%s] info failed", respE.ServiceId), err) + return ErrServiceFileLost + } + core.Service = respG.Service + return nil + } + + respS, err := datasource.Instance().RegisterService(ctx, core.CreateServiceRequest()) + if err != nil { + log.Error("register service center failed", err) + return err + } + core.Service.ServiceId = respS.ServiceId + log.Info(fmt.Sprintf("register service center service[%s]", respS.ServiceId)) + return nil +} + +func (ds *DataSource) registryInstance(pCtx context.Context) error { + core.Instance.InstanceId = "" + core.Instance.ServiceId = core.Service.ServiceId + + ctx := core.AddDefaultContextValue(pCtx) + + respI, err := datasource.Instance().RegisterInstance(ctx, core.RegisterInstanceRequest()) + if err != nil { + log.Error("register failed", err) + return err + } + if respI.Response.GetCode() != pb.ResponseSuccess { + err = fmt.Errorf("register service center[%s] instance failed, %s", + core.Instance.ServiceId, respI.Response.GetMessage()) + log.Error(err.Error(), nil) + return err + } + core.Instance.InstanceId = respI.InstanceId + log.Info(fmt.Sprintf("register service center instance[%s/%s], endpoints is %s", + core.Service.ServiceId, respI.InstanceId, core.Instance.Endpoints)) + return nil +} + +func (ds *DataSource) autoSelfHeartBeat() { + //todo +} + +func GetAllServicesAcrossDomainProject(ctx context.Context) (map[string][]*pb.MicroService, error) { + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + + filter := bson.M{"domain": domain, "project": project} + + findRes, err := client.GetMongoClient().Find(ctx, CollectionService, filter) + if err != nil { + return nil, err + } + + services := make(map[string][]*pb.MicroService) + + for findRes.Next(ctx) { + var mongoService Service + err := findRes.Decode(&mongoService) + if err != nil { + return nil, err + } + domainProject := mongoService.Domain + "/" + mongoService.Project + services[domainProject] = append(services[domainProject], mongoService.ServiceInfo) + } + return services, nil +} + +func ctxFromDomainProject(pCtx context.Context, domainProject string) (ctx context.Context, err error) { + splitIndex := strings.Index(domainProject, path.SPLIT) + if splitIndex == -1 { + return nil, NewError("invalid domainProject: ", domainProject) + } + domain := domainProject[:splitIndex] + project := domainProject[splitIndex+1:] + return util.SetDomainProject(pCtx, domain, project), nil +} + +func shouldClear(ctx context.Context, timeLimitStamp string, svc *pb.MicroService) (bool, error) { + if svc.Timestamp > timeLimitStamp { + return false, nil + } + + getInstsReq := &pb.GetInstancesRequest{ + ConsumerServiceId: svc.ServiceId, + ProviderServiceId: svc.ServiceId, + } + + getInstsResp, err := datasource.Instance().GetInstances(ctx, getInstsReq) + if err != nil { + return false, err + } + if getInstsResp.Response.GetCode() != pb.ResponseSuccess { + return false, NewError("get instance failed: ", getInstsResp.Response.GetMessage()) + } + //ignore a service if it has instances + if len(getInstsResp.Instances) > 0 { + return false, nil + } + return true, nil +} diff --git a/datasource/mongo/heartbeat/common.go b/datasource/mongo/heartbeat/common.go new file mode 100644 index 0000000..9d5cb31 --- /dev/null +++ b/datasource/mongo/heartbeat/common.go @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except request compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to request writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package heartbeat + +import "errors" + +var ( + ErrPluginNameNil = errors.New("plugin implement name is nil") + ErrPluginNotSupport = errors.New("plugin implement not supported [#{opts.PluginImplName}]") +) diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go index f87d08c..57e50f0 100644 --- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go +++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go @@ -29,15 +29,15 @@ import ( func updateInstanceRefreshTime(ctx context.Context, serviceID string, instanceID string) error { filter := bson.M{ - mongo.InstanceID: instanceID, - mongo.ServiceID: serviceID, + mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instanceID, + mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnServiceID}): serviceID, } update := bson.M{ "$set": bson.M{mongo.RefreshTime: time.Now()}, } result, err := client.GetMongoClient().FindOneAndUpdate(ctx, mongo.CollectionInstance, filter, update) if err != nil { - log.Errorf(err, "failed to update refresh time of instance: ") + log.Error("failed to update refresh time of instance: ", err) return err } if result.Err() != nil { diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go index 5ef1bc9..f77cd8e 100644 --- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go +++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go @@ -19,12 +19,12 @@ package heartbeatchecker import ( "context" - "fmt" "testing" "time" "github.com/apache/servicecomb-service-center/datasource/mongo" "github.com/apache/servicecomb-service-center/datasource/mongo/client" + "github.com/apache/servicecomb-service-center/pkg/log" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/go-chassis/v2/storage" "github.com/stretchr/testify/assert" @@ -41,7 +41,7 @@ func init() { func TestUpdateInstanceRefreshTime(t *testing.T) { t.Run("update instance refresh time: if the instance does not exist,the update should fail", func(t *testing.T) { err := updateInstanceRefreshTime(context.Background(), "not-exist", "not-exist") - fmt.Println(err) + log.Error("", err) assert.NotNil(t, err) }) @@ -58,8 +58,8 @@ func TestUpdateInstanceRefreshTime(t *testing.T) { err = updateInstanceRefreshTime(context.Background(), instance1.InstanceInfo.ServiceId, instance1.InstanceInfo.InstanceId) assert.Equal(t, nil, err) filter := bson.M{ - mongo.InstanceID: instance1.InstanceInfo.InstanceId, - mongo.ServiceID: instance1.InstanceInfo.ServiceId, + mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId, + mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnServiceID}): instance1.InstanceInfo.ServiceId, } result, err := client.GetMongoClient().FindOne(context.Background(), mongo.CollectionInstance, filter) assert.Nil(t, err) @@ -68,7 +68,7 @@ func TestUpdateInstanceRefreshTime(t *testing.T) { assert.Nil(t, err) assert.NotEqual(t, instance1.RefreshTime, ins.RefreshTime) filter = bson.M{ - mongo.InstanceID: instance1.InstanceInfo.InstanceId, + mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId, } _, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, filter) assert.Nil(t, err) diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go index 822ed82..fad3ec0 100644 --- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go +++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go @@ -24,6 +24,8 @@ import ( "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" pb "github.com/go-chassis/cari/discovery" + + "fmt" ) func init() { @@ -41,8 +43,7 @@ func (h *HeartBeatChecker) Heartbeat(ctx context.Context, request *pb.HeartbeatR remoteIP := util.GetIPFromContext(ctx) err := updateInstanceRefreshTime(ctx, request.ServiceId, request.InstanceId) if err != nil { - log.Errorf(err, "heartbeat failed, instance[%s]. operator %s", - request.InstanceId, remoteIP) + log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err) resp := &pb.HeartbeatResponse{ Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())), } diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go index a519240..ffe82b5 100644 --- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go +++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go @@ -58,7 +58,7 @@ func TestHeartbeat(t *testing.T) { assert.Nil(t, err) assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode()) filter := bson.M{ - mongo.InstanceID: instance1.InstanceInfo.InstanceId, + mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId, } _, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, filter) assert.Nil(t, err) diff --git a/datasource/mongo/heartbeat/manager.go b/datasource/mongo/heartbeat/manager.go index 4a1e46f..279e72a 100644 --- a/datasource/mongo/heartbeat/manager.go +++ b/datasource/mongo/heartbeat/manager.go @@ -48,11 +48,11 @@ func Init(opts Options) error { func New(opts Options) (HealthCheck, error) { if opts.PluginImplName == "" { - return nil, fmt.Errorf("plugin implement name is nil") + return nil, ErrPluginNameNil } f, ok := plugins[opts.PluginImplName] if !ok { - return nil, fmt.Errorf("plugin implement not supported [#{opts.PluginImplName}]") + return nil, ErrPluginNotSupport } return f(opts) } diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go index 7d93769..6e96711 100644 --- a/datasource/mongo/mongo.go +++ b/datasource/mongo/mongo.go @@ -39,7 +39,7 @@ type DataSource struct { func NewDataSource(opts datasource.Options) (datasource.DataSource, error) { // TODO: construct a reasonable DataSource instance - log.Warnf("dependency data source enable etcd mode") + log.Warn("dependency data source enable etcd mode") inst := &DataSource{ SchemaEditable: opts.SchemaEditable, @@ -71,7 +71,7 @@ func (ds *DataSource) initPlugins() error { kind := config.GetString("registry.heartbeat.kind", "heartbeatchecker", config.WithStandby("heartbeat_plugin")) err := heartbeat.Init(heartbeat.Options{PluginImplName: heartbeat.ImplName(kind)}) if err != nil { - log.Fatalf(err, "heartbeat init failed") + log.Fatal("heartbeat init failed", err) return err } return nil diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go index f1befa3..63204aa 100644 --- a/datasource/mongo/ms.go +++ b/datasource/mongo/ms.go @@ -19,7 +19,6 @@ package mongo import ( "context" - "errors" "fmt" "strconv" "time" @@ -27,6 +26,7 @@ import ( "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/datasource/mongo/client" "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat" + "github.com/apache/servicecomb-service-center/pkg/gopool" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" apt "github.com/apache/servicecomb-service-center/server/core" @@ -86,7 +86,8 @@ func (ds *DataSource) RegisterService(ctx context.Context, request *pb.CreateSer } remoteIP := util.GetIPFromContext(ctx) - log.Infof("create micro-service[%s][%s] successfully,operator: %s", service.ServiceId, insertRes.InsertedID, remoteIP) + log.Info(fmt.Sprintf("create micro-service[%s][%s] successfully,operator: %s", + service.ServiceId, insertRes.InsertedID, remoteIP)) return &pb.CreateServiceResponse{ Response: pb.CreateResponse(pb.ResponseSuccess, "Register service successfully"), @@ -100,7 +101,7 @@ func (ds *DataSource) GetServices(ctx context.Context, request *pb.GetServicesRe domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - filter := bson.M{Domain: domain, Project: project} + filter := bson.M{ColumnDomain: domain, ColumnProject: project} services, err := GetServices(ctx, filter) if err != nil { @@ -119,7 +120,10 @@ func (ds *DataSource) GetApplications(ctx context.Context, request *pb.GetAppsRe domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - filter := bson.M{Domain: domain, Project: project, ServiceEnv: request.Environment} + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): request.Environment} services, err := GetServices(ctx, filter) if err != nil { @@ -155,7 +159,7 @@ func (ds *DataSource) GetService(ctx context.Context, request *pb.GetServiceRequ *pb.GetServiceResponse, error) { svc, err := GetService(ctx, GeneratorServiceFilter(ctx, request.ServiceId)) if err != nil { - log.Errorf(err, "failed to get single service [%s] from mongo", request.ServiceId) + log.Error(fmt.Sprintf("failed to get single service %s from mongo", request.ServiceId), err) return &pb.GetServiceResponse{ Response: pb.CreateResponse(pb.ErrInternal, "get service data from mongodb failed."), }, err @@ -281,24 +285,23 @@ func DelServicePri(ctx context.Context, serviceID string, force bool) (*pb.Respo } if serviceID == apt.Service.ServiceId { - err := errors.New("not allow to delete service center") - log.Errorf(err, "%s micro-service[%s] failed, operator: %s", title, serviceID, remoteIP) - return pb.CreateResponse(pb.ErrInvalidParams, err.Error()), nil + log.Error(fmt.Sprintf("%s micro-service %s failed, operator: %s", title, serviceID, remoteIP), ErrNotAllowDeleteSC) + return pb.CreateResponse(pb.ErrInvalidParams, ErrNotAllowDeleteSC.Error()), nil } microservice, err := GetService(ctx, GeneratorServiceFilter(ctx, serviceID)) if err != nil { - log.Errorf(err, "%s micro-service[%s] failed, get service file failed, operator: %s", - title, serviceID, remoteIP) + log.Error(fmt.Sprintf("%s micro-service %s failed, get service file failed, operator: %s", + title, serviceID, remoteIP), err) return pb.CreateResponse(pb.ErrInternal, err.Error()), err } if microservice == nil { - log.Errorf(err, "%s micro-service[%s] failed, service does not exist, operator: %s", - title, serviceID, remoteIP) + log.Error(fmt.Sprintf("%s micro-service %s failed, service does not exist, operator: %s", + title, serviceID, remoteIP), err) return pb.CreateResponse(pb.ErrServiceNotExists, "Service does not exist."), nil } // 强制删除,则与该服务相关的信息删除,非强制删除: 如果作为该被依赖(作为provider,提供服务,且不是只存在自依赖)或者存在实例,则不能删除 if !force { - log.Infof("force delete,should del instance...") + log.Info("force delete,should del instance...") //todo wait for dep interface } filter := GeneratorServiceFilter(ctx, serviceID) @@ -329,9 +332,13 @@ func (ds *DataSource) UpdateService(ctx context.Context, request *pb.UpdateServi }, nil } - err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), bson.M{"$set": bson.M{ServiceModTime: strconv.FormatInt(time.Now().Unix(), 10), ServiceProperty: request.Properties}}) + updateData := bson.M{ + "$set": bson.M{ + StringBuilder([]string{ColumnServiceInfo, ColumnModTime}): strconv.FormatInt(time.Now().Unix(), 10), + StringBuilder([]string{ColumnServiceInfo, ColumnProperty}): request.Properties}} + err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), updateData) if err != nil { - log.Errorf(err, "update service [%s] properties failed, update mongo failed", request.ServiceId) + log.Error(fmt.Sprintf("update service %s properties failed, update mongo failed", request.ServiceId), err) return &pb.UpdateServicePropsResponse{ Response: pb.CreateResponse(pb.ErrUnavailableBackend, "Update doc in mongo failed."), }, nil @@ -362,8 +369,7 @@ func (ds *DataSource) GetServiceDetail(ctx context.Context, request *pb.GetServi svc := mgSvc.ServiceInfo versions, err := GetServicesVersions(ctx, bson.M{}) if err != nil { - log.Errorf(err, "get service[%s/%s/%s] all versions failed", - svc.Environment, svc.AppId, svc.ServiceName) + log.Error(fmt.Sprintf("get service %s %s %s all versions failed", svc.Environment, svc.AppId, svc.ServiceName), err) return &pb.GetServiceDetailResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, err @@ -403,7 +409,7 @@ func (ds *DataSource) GetServicesInfo(ctx context.Context, request *pb.GetServic //todo add get statistics info services, err := GetMongoServices(ctx, bson.M{}) if err != nil { - log.Errorf(err, "get all services by domain failed") + log.Error("get all services by domain failed", err) return &pb.GetServicesInfoResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, err @@ -443,7 +449,7 @@ func (ds *DataSource) GetServicesInfo(ctx context.Context, request *pb.GetServic func (ds *DataSource) AddTags(ctx context.Context, request *pb.AddServiceTagsRequest) (*pb.AddServiceTagsResponse, error) { service, err := GetService(ctx, GeneratorServiceFilter(ctx, request.ServiceId)) if err != nil { - log.Errorf(err, "failed to add tags for service [%s] for get service failed,", request.ServiceId) + log.Error(fmt.Sprintf("failed to add tags for service %s for get service failed", request.ServiceId), err) return &pb.AddServiceTagsResponse{ Response: pb.CreateResponse(pb.ErrInternal, "Failed to check service exist"), }, nil @@ -460,9 +466,9 @@ func (ds *DataSource) AddTags(ctx context.Context, request *pb.AddServiceTagsReq } tags[key] = value } - err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), bson.M{"$set": bson.M{ServiceTag: tags}}) + err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), bson.M{"$set": bson.M{ColumnTag: tags}}) if err != nil { - log.Errorf(err, "update service [%s] tags failed.", request.ServiceId) + log.Error(fmt.Sprintf("update service %s tags failed.", request.ServiceId), err) return &pb.AddServiceTagsResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, nil @@ -475,7 +481,7 @@ func (ds *DataSource) AddTags(ctx context.Context, request *pb.AddServiceTagsReq func (ds *DataSource) GetTags(ctx context.Context, request *pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error) { svc, err := GetService(ctx, GeneratorServiceFilter(ctx, request.ServiceId)) if err != nil { - log.Errorf(err, "failed to get service [%s] tags", request.ServiceId) + log.Error(fmt.Sprintf("failed to get service %s tags", request.ServiceId), err) return &pb.GetServiceTagsResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, nil @@ -494,7 +500,7 @@ func (ds *DataSource) GetTags(ctx context.Context, request *pb.GetServiceTagsReq func (ds *DataSource) UpdateTag(ctx context.Context, request *pb.UpdateServiceTagRequest) (*pb.UpdateServiceTagResponse, error) { svc, err := GetService(ctx, GeneratorServiceFilter(ctx, request.ServiceId)) if err != nil { - log.Errorf(err, "failed to get service [%s] tags", request.ServiceId) + log.Error(fmt.Sprintf("failed to get %s tags", request.ServiceId), err) return &pb.UpdateServiceTagResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, nil @@ -518,9 +524,9 @@ func (ds *DataSource) UpdateTag(ctx context.Context, request *pb.UpdateServiceTa } newTags[request.Key] = request.Value - err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), bson.M{"$set": bson.M{ServiceTag: newTags}}) + err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), bson.M{"$set": bson.M{ColumnTag: newTags}}) if err != nil { - log.Errorf(err, "update service [%s] tags failed.", request.ServiceId) + log.Error(fmt.Sprintf("update service %s tags failed", request.ServiceId), err) return &pb.UpdateServiceTagResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, nil @@ -533,7 +539,7 @@ func (ds *DataSource) UpdateTag(ctx context.Context, request *pb.UpdateServiceTa func (ds *DataSource) DeleteTags(ctx context.Context, request *pb.DeleteServiceTagsRequest) (*pb.DeleteServiceTagsResponse, error) { svc, err := GetService(ctx, GeneratorServiceFilter(ctx, request.ServiceId)) if err != nil { - log.Errorf(err, "failed to get service [%s] tags", request.ServiceId) + log.Error(fmt.Sprintf("failed to get service %s tags", request.ServiceId), err) return &pb.DeleteServiceTagsResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, nil @@ -558,9 +564,9 @@ func (ds *DataSource) DeleteTags(ctx context.Context, request *pb.DeleteServiceT delete(newTags, key) } } - err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), bson.M{"$set": bson.M{ServiceTag: newTags}}) + err = UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), bson.M{"$set": bson.M{ColumnTag: newTags}}) if err != nil { - log.Errorf(err, "delete service [%s] tags failed.", request.ServiceId) + log.Error(fmt.Sprintf("delete service %s tags failed", request.ServiceId), err) return &pb.DeleteServiceTagsResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), }, nil @@ -698,7 +704,7 @@ func (ds *DataSource) ModifySchema(ctx context.Context, request *pb.ModifySchema defer session.EndSession(ctx) err = ds.modifySchema(ctx, request.ServiceId, &schema) if err != nil { - log.Errorf(err, "modify schema[%s/%s] failed, operator: %s", serviceID, schemaID, remoteIP) + log.Error(fmt.Sprintf("modify schema %s %s failed, operator %s", serviceID, schemaID, remoteIP), err) errAbort := session.AbortTransaction(ctx) if errAbort != nil { return &pb.ModifySchemaResponse{ @@ -715,7 +721,7 @@ func (ds *DataSource) ModifySchema(ctx context.Context, request *pb.ModifySchema Response: pb.CreateResponse(pb.ErrInternal, "Txn ModifySchema CommitTransaction failed."), }, err } - log.Infof("modify schema[%s/%s] successfully, operator: %s", serviceID, schemaID, remoteIP) + log.Info(fmt.Sprintf("modify schema[%s/%s] successfully, operator: %s", serviceID, schemaID, remoteIP)) return &pb.ModifySchemaResponse{ Response: pb.CreateResponse(pb.ResponseSuccess, "modify schema info success."), }, nil @@ -791,13 +797,13 @@ func (ds *DataSource) modifySchema(ctx context.Context, serviceID string, schema } if schema != nil { if len(schema.Summary) == 0 { - log.Errorf(err, "modify schema[%s/%s] failed, get schema summary failed, operator: %s", - serviceID, schema.SchemaId, remoteIP) + log.Error(fmt.Sprintf("modify schema %s %s failed, get schema summary failed, operator: %s", + serviceID, schema.SchemaId, remoteIP), err) return pb.NewError(pb.ErrUnavailableBackend, err.Error()) } if len(respSchema.SchemaSummary) != 0 { - log.Errorf(err, "%s mode, schema[%s/%s] already exist, can not be changed, operator: %s", - pb.ENV_PROD, serviceID, schema.SchemaId, remoteIP) + log.Error(fmt.Sprintf("mode, schema %s %s already exist, can not be changed, operator: %s", + serviceID, schema.SchemaId, remoteIP), err) return pb.NewError(pb.ErrModifySchemaNotAllow, "schema already exist, can not be changed request "+pb.ENV_PROD) } } @@ -812,12 +818,14 @@ func (ds *DataSource) modifySchema(ctx context.Context, serviceID string, schema } } if len(newSchemas) != len(microservice.Schemas) { - err := UpdateService(ctx, GeneratorServiceFilter(ctx, serviceID), bson.M{"$set": bson.M{ServiceSchemas: newSchemas}}) + + updateData := bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnSchemas}): newSchemas} + err := UpdateService(ctx, GeneratorServiceFilter(ctx, serviceID), bson.M{"$set": updateData}) if err != nil { return pb.NewError(pb.ErrInternal, err.Error()) } } - newSchema := bson.M{"$set": bson.M{SchemaInfo: schema.Schema, SchemaSummary: schema.Summary}} + newSchema := bson.M{"$set": bson.M{ColumnSchemaInfo: schema.Schema, ColumnSchemaSummary: schema.Summary}} err = UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), newSchema, options.FindOneAndUpdate().SetUpsert(true)) if err != nil { return pb.NewError(pb.ErrInternal, err.Error()) @@ -830,8 +838,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *pb.MicroServic serviceID := service.ServiceId schemasFromDatabase, err := GetSchemas(ctx, GeneratorServiceFilter(ctx, serviceID)) if err != nil { - log.Errorf(nil, "modify service[%s] schemas failed, get schemas failed, operator: %s", - serviceID, remoteIP) + log.Error(fmt.Sprintf("modify service %s schemas failed, get schemas failed, operator: %s", serviceID, remoteIP), err) return pb.NewError(pb.ErrUnavailableBackend, err.Error()) } needUpdateSchemas, needAddSchemas, needDeleteSchemas, nonExistSchemaIds := @@ -839,16 +846,17 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *pb.MicroServic if !ds.isSchemaEditable(service) { if len(service.Schemas) == 0 { //todo add quota check - err := UpdateService(ctx, GeneratorServiceFilter(ctx, serviceID), bson.M{"$set": bson.M{ServiceSchemas: nonExistSchemaIds}}) + updateData := bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnSchemas}): nonExistSchemaIds} + err := UpdateService(ctx, GeneratorServiceFilter(ctx, serviceID), bson.M{"$set": updateData}) if err != nil { - log.Errorf(err, "modify service[%s] schemas failed, update service.Schemas failed, operator: %s", - serviceID, remoteIP) + log.Error(fmt.Sprintf("modify service %s schemas failed, update service.Schemas failed, operator: %s", + serviceID, remoteIP), err) return pb.NewError(pb.ErrInternal, err.Error()) } } else { if len(nonExistSchemaIds) != 0 { errInfo := fmt.Errorf("non-existent schemaIDs %v", nonExistSchemaIds) - log.Errorf(errInfo, "modify service[%s] schemas failed, operator: %s", serviceID, remoteIP) + log.Error(fmt.Sprintf("modify service %s schemas failed, operator: %s", serviceID, remoteIP), err) return pb.NewError(pb.ErrUndefinedSchemaID, errInfo.Error()) } for _, needUpdateSchema := range needUpdateSchemas { @@ -857,20 +865,20 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *pb.MicroServic return pb.NewError(pb.ErrInternal, err.Error()) } if !exist { - err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, needUpdateSchema.SchemaId), bson.M{"$set": bson.M{SchemaInfo: needUpdateSchema.Schema, SchemaSummary: needUpdateSchema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) + err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, needUpdateSchema.SchemaId), bson.M{"$set": bson.M{ColumnSchemaInfo: needUpdateSchema.Schema, ColumnSchemaSummary: needUpdateSchema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) if err != nil { return pb.NewError(pb.ErrInternal, err.Error()) } } else { - log.Warnf("schema[%s/%s] and it's summary already exist, skip to update, operator: %s", - serviceID, needUpdateSchema.SchemaId, remoteIP) + log.Warn(fmt.Sprintf("schema[%s/%s] and it's summary already exist, skip to update, operator: %s", + serviceID, needUpdateSchema.SchemaId, remoteIP)) } } } for _, schema := range needAddSchemas { - log.Infof("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP) - err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), bson.M{"$set": bson.M{SchemaInfo: schema.Schema, SchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) + log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) + err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), bson.M{"$set": bson.M{ColumnSchemaInfo: schema.Schema, ColumnSchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) if err != nil { return pb.NewError(pb.ErrInternal, err.Error()) } @@ -879,8 +887,8 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *pb.MicroServic var schemaIDs []string for _, schema := range needAddSchemas { - log.Infof("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP) - err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), bson.M{"$set": bson.M{SchemaInfo: schema.Schema, SchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) + log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) + err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), bson.M{"$set": bson.M{ColumnSchemaInfo: schema.Schema, ColumnSchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) if err != nil { return pb.NewError(pb.ErrInternal, err.Error()) } @@ -888,8 +896,8 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *pb.MicroServic } for _, schema := range needUpdateSchemas { - log.Infof("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP) - err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), bson.M{"$set": bson.M{SchemaInfo: schema.Schema, SchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) + log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) + err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), bson.M{"$set": bson.M{ColumnSchemaInfo: schema.Schema, ColumnSchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true)) if err != nil { return pb.NewError(pb.ErrInternal, err.Error()) } @@ -897,17 +905,17 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *pb.MicroServic } for _, schema := range needDeleteSchemas { - log.Infof("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP) + log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) err = DeleteSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId)) if err != nil { return pb.NewError(pb.ErrInternal, err.Error()) } } - err := UpdateService(ctx, GeneratorServiceFilter(ctx, serviceID), bson.M{"$set": bson.M{ServiceSchemas: schemaIDs}}) + updateData := bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnSchemas}): schemaIDs} + err := UpdateService(ctx, GeneratorServiceFilter(ctx, serviceID), bson.M{"$set": updateData}) if err != nil { - log.Errorf(err, "modify service[%s] schemas failed, update service.Schemas failed, operator: %s", - serviceID, remoteIP) + log.Error(fmt.Sprintf("modify service %s schemas failed, update service.Schemas failed, operator: %s", serviceID, remoteIP), err) return pb.NewError(pb.ErrInternal, err.Error()) } } @@ -917,7 +925,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *pb.MicroServic func (ds *DataSource) AddRule(ctx context.Context, request *pb.AddServiceRulesRequest) (*pb.AddServiceRulesResponse, error) { exist, err := ServiceExistID(ctx, request.ServiceId) if err != nil { - log.Errorf(err, "failed to add rules for service [%s] for get service failed,", request.ServiceId) + log.Error(fmt.Sprintf("failed to add rules for service %s for get service failed", request.ServiceId), err) return &pb.AddServiceRulesResponse{ Response: pb.CreateResponse(pb.ErrInternal, "Failed to check service exist"), }, nil @@ -1013,7 +1021,7 @@ func (ds *DataSource) DeleteRule(ctx context.Context, request *pb.DeleteServiceR *pb.DeleteServiceRulesResponse, error) { exist, err := ServiceExistID(ctx, request.ServiceId) if err != nil { - log.Errorf(err, "failed to add tags for service [%s] for get service failed,", request.ServiceId) + log.Error(fmt.Sprintf("failed to add tags for service %s for get service failed", request.ServiceId), err) return &pb.DeleteServiceRulesResponse{ Response: pb.CreateResponse(pb.ErrInternal, "Failed to check service exist"), }, err @@ -1046,7 +1054,6 @@ func (ds *DataSource) UpdateRule(ctx context.Context, request *pb.UpdateServiceR if err != nil { return &pb.UpdateServiceRuleResponse{ Response: pb.CreateResponse(pb.ErrServiceNotExists, "UpdateRule failed for get service failed."), - //Schemas: nil, }, nil } if !exist { @@ -1077,12 +1084,14 @@ func (ds *DataSource) UpdateRule(ctx context.Context, request *pb.UpdateServiceR }, nil } - newRule := bson.M{"$set": bson.M{RuleRuletype: request.Rule.RuleType, - RulePattern: request.Rule.Pattern, RuleAttribute: request.Rule.Attribute, - RuleDescription: request.Rule.Description, - RuleModTime: strconv.FormatInt(time.Now().Unix(), 10)}} + newRule := bson.M{ + StringBuilder([]string{ColumnRuleInfo, ColumnRuleType}): request.Rule.RuleType, + StringBuilder([]string{ColumnRuleInfo, ColumnPattern}): request.Rule.Pattern, + StringBuilder([]string{ColumnRuleInfo, ColumnAttribute}): request.Rule.Attribute, + StringBuilder([]string{ColumnRuleInfo, ColumnDescription}): request.Rule.Description, + StringBuilder([]string{ColumnRuleInfo, ColumnModTime}): strconv.FormatInt(time.Now().Unix(), 10)} - err = UpdateRule(ctx, GeneratorRuleFilter(ctx, request.ServiceId, request.RuleId), newRule) + err = UpdateRule(ctx, GeneratorRuleFilter(ctx, request.ServiceId, request.RuleId), bson.M{"$set": newRule}) if err != nil { return &pb.UpdateServiceRuleResponse{ Response: pb.CreateResponse(pb.ErrInternal, err.Error()), @@ -1188,7 +1197,7 @@ func getServiceDetailUtil(ctx context.Context, mgs *Service, countOnly bool, opt case "rules": rules, err := GetRules(ctx, mgs.ServiceInfo.ServiceId) if err != nil { - log.Errorf(err, "get service[%s]'s all rules failed", mgs.ServiceInfo.ServiceId) + log.Error(fmt.Sprintf("get service %s's all rules failed", mgs.ServiceInfo.ServiceId), err) return nil, err } for _, rule := range rules { @@ -1200,7 +1209,7 @@ func getServiceDetailUtil(ctx context.Context, mgs *Service, countOnly bool, opt case "schemas": schemas, err := GetSchemas(ctx, GeneratorServiceFilter(ctx, mgs.ServiceInfo.ServiceId)) if err != nil { - log.Errorf(err, "get service[%s]'s all schemas failed", mgs.ServiceInfo.ServiceId) + log.Error(fmt.Sprintf("get service %s's all schemas failed", mgs.ServiceInfo.ServiceId), err) return nil, err } serviceDetail.SchemaInfos = schemas @@ -1209,7 +1218,7 @@ func getServiceDetailUtil(ctx context.Context, mgs *Service, countOnly bool, opt case "": continue default: - log.Errorf(nil, "request option[%s] is invalid", opt) + log.Info(fmt.Sprintf("request option %s is invalid", opt)) } } return serviceDetail, nil @@ -1222,7 +1231,7 @@ func UpdateService(ctx context.Context, filter interface{}, m bson.M) error { func GetRules(ctx context.Context, serviceID string) ([]*pb.ServiceRule, error) { domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - filter := bson.M{Domain: domain, Project: project, RuleServiceID: serviceID} + filter := bson.M{ColumnDomain: domain, ColumnProject: project, ColumnServiceID: serviceID} ruleRes, err := client.GetMongoClient().Find(ctx, CollectionRule, filter) if err != nil { @@ -1254,7 +1263,7 @@ func DeleteSchema(ctx context.Context, filter interface{}) error { return err } if !res { - return errors.New("delete schema failed") + return ErrDeleteSchemaFailed } return nil } @@ -1267,39 +1276,61 @@ func GeneratorServiceFilter(ctx context.Context, serviceID string) bson.M { domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - return bson.M{Domain: domain, Project: project, ServiceServiceID: serviceID} + return bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnServiceInfo, ColumnServiceID}): serviceID} } func GeneratorServiceNameFilter(ctx context.Context, service *pb.MicroServiceKey) bson.M { domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - return bson.M{Domain: domain, Project: project, ServiceEnv: service.Environment, ServiceAppID: service.AppId, ServiceServiceName: service.ServiceName, ServiceVersion: service.Version} + return bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): service.Environment, + StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): service.AppId, + StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}): service.ServiceName, + StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): service.Version} } func GeneratorServiceAliasFilter(ctx context.Context, service *pb.MicroServiceKey) bson.M { domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - return bson.M{Domain: domain, Project: project, ServiceEnv: service.Environment, ServiceAppID: service.AppId, ServiceAlias: service.Alias, ServiceVersion: service.Version} + return bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): service.Environment, + StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): service.AppId, + StringBuilder([]string{ColumnServiceInfo, ColumnAlias}): service.Alias, + StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): service.Version} } func GeneratorRuleAttFilter(ctx context.Context, serviceID, attribute, pattern string) bson.M { - return bson.M{RuleServiceID: serviceID, RuleAttribute: attribute, RulePattern: pattern} + return bson.M{ + ColumnServiceID: serviceID, + StringBuilder([]string{ColumnRuleInfo, ColumnAttribute}): attribute, + StringBuilder([]string{ColumnRuleInfo, ColumnPattern}): pattern} } func GeneratorSchemaFilter(ctx context.Context, serviceID, schemaID string) bson.M { domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - return bson.M{Domain: domain, Project: project, SchemaServiceID: serviceID, SchemaID: schemaID} + return bson.M{ColumnDomain: domain, ColumnProject: project, ColumnServiceID: serviceID, ColumnSchemaID: schemaID} } func GeneratorRuleFilter(ctx context.Context, serviceID, ruleID string) bson.M { domain := util.ParseDomain(ctx) project := util.ParseProject(ctx) - return bson.M{Domain: domain, Project: project, RuleServiceID: serviceID, RuleRuleID: ruleID} + return bson.M{ + ColumnDomain: domain, + ColumnProject: project, + ColumnServiceID: serviceID, + StringBuilder([]string{ColumnRuleInfo, ColumnRuleID}): ruleID} } func GetSchemas(ctx context.Context, filter bson.M) ([]*pb.Schema, error) { @@ -1346,56 +1377,876 @@ func SchemaExist(ctx context.Context, serviceID, schemaID string) (bool, error) // Instance management func (ds *DataSource) RegisterInstance(ctx context.Context, request *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) { - return &pb.RegisterInstanceResponse{}, nil + remoteIP := util.GetIPFromContext(ctx) + instance := request.Instance + + // 允许自定义 id + if len(instance.InstanceId) > 0 { + resp, err := ds.Heartbeat(ctx, &pb.HeartbeatRequest{ + InstanceId: instance.InstanceId, + ServiceId: instance.ServiceId, + }) + if err != nil || resp == nil { + log.Error(fmt.Sprintf("register service %s's instance failed, endpoints %s, host '%s', operator %s", + instance.ServiceId, instance.Endpoints, instance.HostName, remoteIP), err) + return &pb.RegisterInstanceResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, nil + } + switch resp.Response.GetCode() { + case pb.ResponseSuccess: + log.Info(fmt.Sprintf("register instance successful, reuse instance[%s/%s], operator %s", + instance.ServiceId, instance.InstanceId, remoteIP)) + return &pb.RegisterInstanceResponse{ + Response: resp.Response, + InstanceId: instance.InstanceId, + }, nil + case pb.ErrInstanceNotExists: + // register a new one + return registryInstance(ctx, request) + default: + log.Error(fmt.Sprintf("register instance failed, reuse instance %s %s, operator %s", + instance.ServiceId, instance.InstanceId, remoteIP), err) + return &pb.RegisterInstanceResponse{ + Response: resp.Response, + }, err + } + } + + if err := preProcessRegisterInstance(ctx, instance); err != nil { + log.Error(fmt.Sprintf("register service %s instance failed, endpoints %s, host %s operator %s", + instance.ServiceId, instance.Endpoints, instance.HostName, remoteIP), err) + return &pb.RegisterInstanceResponse{ + Response: pb.CreateResponseWithSCErr(err), + }, nil + } + return registryInstance(ctx, request) } // GetInstances returns instances under the current domain func (ds *DataSource) GetInstance(ctx context.Context, request *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) { - return &pb.GetOneInstanceResponse{}, nil + service := &Service{} + var err error + if len(request.ConsumerServiceId) > 0 { + filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId) + service, err = GetService(ctx, filter) + if err != nil { + log.Error(fmt.Sprintf(" get consumer failed, consumer %s find provider instance %s", + request.ConsumerServiceId, request.ProviderInstanceId), err) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if service == nil { + log.Error(fmt.Sprintf("consumer does not exist, consumer %s find provider instance %s %s", + request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId), err) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, + fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)), + }, nil + } + } + + filter := GeneratorServiceFilter(ctx, request.ProviderServiceId) + provider, err := GetService(ctx, filter) + if err != nil { + log.Error(fmt.Sprintf("get provider failed, consumer %s find provider instance %s %s", + request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId), err) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if provider == nil { + log.Error(fmt.Sprintf("provider does not exist, consumer %s find provider instance %s %s", + request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId), err) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, + fmt.Sprintf("Provider[%s] does not exist.", request.ProviderServiceId)), + }, nil + } + + findFlag := func() string { + return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instance[%s]", + request.ConsumerServiceId, service.ServiceInfo.Environment, service.ServiceInfo.AppId, service.ServiceInfo.ServiceName, service.ServiceInfo.Version, + provider.ServiceInfo.ServiceId, provider.ServiceInfo.Environment, provider.ServiceInfo.AppId, provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version, + request.ProviderInstanceId) + } + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter = bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): request.ProviderServiceId} + findOneRes, err := client.GetMongoClient().FindOne(ctx, CollectionInstance, filter) + if err != nil { + mes := fmt.Errorf("%s failed, provider instance does not exist", findFlag()) + log.Error("FindInstances.GetWithProviderID failed", err) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(pb.ErrInstanceNotExists, mes.Error()), + }, nil + } + var instance Instance + err = findOneRes.Decode(&instance) + if err != nil { + log.Error(fmt.Sprintf("FindInstances.GetWithProviderID failed %s failed", findFlag()), err) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Get instance successfully."), + Instance: instance.InstanceInfo, + }, nil } func (ds *DataSource) GetInstances(ctx context.Context, request *pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) { - return &pb.GetInstancesResponse{}, nil + service := &Service{} + var err error + + if len(request.ConsumerServiceId) > 0 { + filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId) + service, err = GetService(ctx, filter) + if err != nil { + log.Error(fmt.Sprintf("get consumer failed, consumer %s find provider %sinstances", + request.ConsumerServiceId, request.ProviderServiceId), err) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if service == nil { + log.Error(fmt.Sprintf("consumer does not exist, consumer %s find provider %s instances", + request.ConsumerServiceId, request.ProviderServiceId), err) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, + fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)), + }, nil + } + } + + filter := GeneratorServiceFilter(ctx, request.ProviderServiceId) + provider, err := GetService(ctx, filter) + if err != nil { + log.Error(fmt.Sprintf("get provider failed, consumer %s find provider instances %s", + request.ConsumerServiceId, request.ProviderServiceId), err) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if provider == nil { + log.Error(fmt.Sprintf("provider does not exist, consumer %s find provider %s instances", + request.ConsumerServiceId, request.ProviderServiceId), err) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, + fmt.Sprintf("Provider[%s] does not exist.", request.ProviderServiceId)), + }, nil + } + + findFlag := fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instances", + request.ConsumerServiceId, service.ServiceInfo.Environment, service.ServiceInfo.AppId, service.ServiceInfo.ServiceName, service.ServiceInfo.Version, + provider.ServiceInfo.ServiceId, provider.ServiceInfo.Environment, provider.ServiceInfo.AppId, provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version) + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter = bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): request.ProviderServiceId} + resp, err := client.GetMongoClient().Find(ctx, CollectionInstance, filter) + if err != nil { + log.Error(fmt.Sprintf("FindInstancesCache.Get failed %s failed", findFlag), err) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if resp == nil { + mes := fmt.Errorf("%s failed, provider does not exist", findFlag) + log.Error("FindInstancesCache.Get failed", mes) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, mes.Error()), + }, nil + } + + var instances []*pb.MicroServiceInstance + for resp.Next(ctx) { + var instance Instance + err := resp.Decode(&instance) + if err != nil { + log.Error(fmt.Sprintf("FindInstances.GetWithProviderID failed %s failed", findFlag), err) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + instances = append(instances, instance.InstanceInfo) + } + + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Query service instances successfully."), + Instances: instances, + }, nil } // GetProviderInstances returns instances under the specified domain func (ds *DataSource) GetProviderInstances(ctx context.Context, request *pb.GetProviderInstancesRequest) (instances []*pb.MicroServiceInstance, rev string, err error) { - return nil, "", nil + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): request.ProviderServiceId} + + findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance, filter) + if err != nil { + return + } + + for findRes.Next(ctx) { + var mongoInstance Instance + err := findRes.Decode(&mongoInstance) + if err == nil { + instances = append(instances, mongoInstance.InstanceInfo) + } + } + + return instances, "", nil } func (ds *DataSource) GetAllInstances(ctx context.Context, request *pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) { - return &pb.GetAllInstancesResponse{}, nil + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + + filter := bson.M{ColumnDomain: domain, ColumnProject: project} + + findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance, filter) + if err != nil { + return nil, err + } + resp := &pb.GetAllInstancesResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Get all instances successfully"), + } + + for findRes.Next(ctx) { + var instance Instance + err := findRes.Decode(&instance) + if err != nil { + return &pb.GetAllInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + resp.Instances = append(resp.Instances, instance.InstanceInfo) + } + + return resp, nil } func (ds *DataSource) BatchGetProviderInstances(ctx context.Context, request *pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, rev string, err error) { - return nil, "", nil + if request == nil || len(request.ServiceIds) == 0 { + return nil, "", ErrInvalidParamBatchGetInstancesRequest + } + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + + for _, providerServiceID := range request.ServiceIds { + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): providerServiceID} + findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance, filter) + if err != nil { + return instances, "", nil + } + + for findRes.Next(ctx) { + var mongoInstance Instance + err := findRes.Decode(&mongoInstance) + if err == nil { + instances = append(instances, mongoInstance.InstanceInfo) + } + } + } + + return instances, "", nil } // FindInstances returns instances under the specified domain func (ds *DataSource) FindInstances(ctx context.Context, request *pb.FindInstancesRequest) (*pb.FindInstancesResponse, error) { - return &pb.FindInstancesResponse{}, nil + provider := &pb.MicroServiceKey{ + Tenant: util.ParseTargetDomainProject(ctx), + Environment: request.Environment, + AppId: request.AppId, + ServiceName: request.ServiceName, + Alias: request.ServiceName, + Version: request.VersionRule, + } + + return ds.findInstance(ctx, request, provider) } func (ds *DataSource) UpdateInstanceStatus(ctx context.Context, request *pb.UpdateInstanceStatusRequest) (*pb.UpdateInstanceStatusResponse, error) { - return &pb.UpdateInstanceStatusResponse{}, nil + updateStatusFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId, request.Status}, "/") + + // todo finish get instance + instance, err := GetInstance(ctx, request.ServiceId, request.InstanceId) + if err != nil { + log.Error(fmt.Sprintf("update instance %s status failed", updateStatusFlag), err) + return &pb.UpdateInstanceStatusResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if instance == nil { + log.Error(fmt.Sprintf("update instance %s status failed, instance does not exist", updateStatusFlag), err) + return &pb.UpdateInstanceStatusResponse{ + Response: pb.CreateResponse(pb.ErrInstanceNotExists, "Service instance does not exist."), + }, nil + } + + copyInstanceRef := *instance + copyInstanceRef.InstanceInfo.Status = request.Status + + if err := UpdateInstanceS(ctx, copyInstanceRef.InstanceInfo); err != nil { + log.Error(fmt.Sprintf("update instance %s status failed", updateStatusFlag), err) + resp := &pb.UpdateInstanceStatusResponse{ + Response: pb.CreateResponseWithSCErr(err), + } + if err.InternalError() { + return resp, err + } + return resp, nil + } + + log.Infof("update instance[%s] status successfully", updateStatusFlag) + return &pb.UpdateInstanceStatusResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Update service instance status successfully."), + }, nil } func (ds *DataSource) UpdateInstanceProperties(ctx context.Context, request *pb.UpdateInstancePropsRequest) (*pb.UpdateInstancePropsResponse, error) { - return &pb.UpdateInstancePropsResponse{}, nil + instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, "/") + + instance, err := GetInstance(ctx, request.ServiceId, request.InstanceId) + if err != nil { + log.Error(fmt.Sprintf("update instance %s properties failed", instanceFlag), err) + return &pb.UpdateInstancePropsResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if instance == nil { + log.Error(fmt.Sprintf("update instance %s properties failed, instance does not exist", instanceFlag), err) + return &pb.UpdateInstancePropsResponse{ + Response: pb.CreateResponse(pb.ErrInstanceNotExists, "Service instance does not exist."), + }, nil + } + + copyInstanceRef := *instance + copyInstanceRef.InstanceInfo.Properties = request.Properties + + // todo finish update instance + if err := UpdateInstanceP(ctx, copyInstanceRef.InstanceInfo); err != nil { + log.Error(fmt.Sprintf("update instance %s properties failed", instanceFlag), err) + resp := &pb.UpdateInstancePropsResponse{ + Response: pb.CreateResponseWithSCErr(err), + } + if err.InternalError() { + return resp, err + } + return resp, nil + } + + log.Infof("update instance[%s] properties successfully", instanceFlag) + return &pb.UpdateInstancePropsResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Update service instance properties successfully."), + }, nil } func (ds *DataSource) UnregisterInstance(ctx context.Context, request *pb.UnregisterInstanceRequest) (*pb.UnregisterInstanceResponse, error) { - return &pb.UnregisterInstanceResponse{}, nil + remoteIP := util.GetIPFromContext(ctx) + serviceID := request.ServiceId + instanceID := request.InstanceId + + instanceFlag := util.StringJoin([]string{serviceID, instanceID}, "/") + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): serviceID, + StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instanceID} + _, err := client.GetMongoClient().Delete(ctx, CollectionInstance, filter) + if err != nil { + log.Error(fmt.Sprintf("unregister instance failed, instance %s, operator %s revoke instance failed", instanceFlag, remoteIP), err) + return &pb.UnregisterInstanceResponse{ + Response: pb.CreateResponse(pb.ErrInternal, "delete instance failed"), + }, err + } + + log.Infof("unregister instance[%s], operator %s", instanceFlag, remoteIP) + return &pb.UnregisterInstanceResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Unregister service instance successfully."), + }, nil } func (ds *DataSource) Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) { - return heartbeat.Instance().Heartbeat(ctx, request) + remoteIP := util.GetIPFromContext(ctx) + instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, "/") + err := KeepAliveLease(ctx, request) + if err != nil { + log.Error(fmt.Sprintf("heartbeat failed, instance %s operator %s", instanceFlag, remoteIP), err) + resp := &pb.HeartbeatResponse{ + Response: pb.CreateResponseWithSCErr(err), + } + if err.InternalError() { + return resp, err + } + return resp, nil + } + return &pb.HeartbeatResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, + "Update service instance heartbeat successfully."), + }, nil } func (ds *DataSource) HeartbeatSet(ctx context.Context, request *pb.HeartbeatSetRequest) (*pb.HeartbeatSetResponse, error) { - return &pb.HeartbeatSetResponse{}, nil + domainProject := util.ParseDomainProject(ctx) + + heartBeatCount := len(request.Instances) + existFlag := make(map[string]bool, heartBeatCount) + instancesHbRst := make(chan *pb.InstanceHbRst, heartBeatCount) + noMultiCounter := 0 + + for _, heartbeatElement := range request.Instances { + if _, ok := existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId]; ok { + log.Warnf("instance[%s/%s] is duplicate request heartbeat set", + heartbeatElement.ServiceId, heartbeatElement.InstanceId) + continue + } else { + existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true + noMultiCounter++ + } + gopool.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement)) + } + + count := 0 + successFlag := false + failFlag := false + instanceHbRstArr := make([]*pb.InstanceHbRst, 0, heartBeatCount) + + for hbRst := range instancesHbRst { + count++ + if len(hbRst.ErrMessage) != 0 { + failFlag = true + } else { + successFlag = true + } + instanceHbRstArr = append(instanceHbRstArr, hbRst) + if count == noMultiCounter { + close(instancesHbRst) + } + } + + if !failFlag && successFlag { + log.Infof("batch update heartbeats[%d] successfully", count) + return &pb.HeartbeatSetResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Heartbeat set successfully."), + Instances: instanceHbRstArr, + }, nil + } + + log.Info(fmt.Sprintf("batch update heartbeats failed %v", request.Instances)) + return &pb.HeartbeatSetResponse{ + Response: pb.CreateResponse(pb.ErrInstanceNotExists, "Heartbeat set failed."), + Instances: instanceHbRstArr, + }, nil } func (ds *DataSource) BatchFind(ctx context.Context, request *pb.BatchFindInstancesRequest) (*pb.BatchFindInstancesResponse, error) { - return &pb.BatchFindInstancesResponse{}, nil + response := &pb.BatchFindInstancesResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Batch query service instances successfully."), + } + + var err error + + response.Services, err = ds.batchFindServices(ctx, request) + if err != nil { + return &pb.BatchFindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + + response.Instances, err = ds.batchFindInstances(ctx, request) + if err != nil { + return &pb.BatchFindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + + return response, nil +} + +func registryInstance(ctx context.Context, request *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) { + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + remoteIP := util.GetIPFromContext(ctx) + instance := request.Instance + instanceID := instance.InstanceId + data := &Instance{ + Domain: domain, + Project: project, + RefreshTime: time.Now(), + InstanceInfo: instance, + } + + instanceFlag := fmt.Sprintf("endpoints %v, host '%s', serviceID %s", + instance.Endpoints, instance.HostName, instance.ServiceId) + + insertRes, err := client.GetMongoClient().Insert(ctx, CollectionInstance, data) + if err != nil { + log.Error(fmt.Sprintf("register instance failed %s instanceID %s operator %s", instanceFlag, instanceID, remoteIP), err) + return &pb.RegisterInstanceResponse{ + Response: pb.CreateResponse(pb.ErrUnavailableBackend, err.Error()), + }, err + } + + log.Infof("register instance %s, instanceID %s, operator %s", + instanceFlag, insertRes.InsertedID, remoteIP) + return &pb.RegisterInstanceResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Register service instance successfully."), + InstanceId: instanceID, + }, nil +} + +func (ds *DataSource) findInstance(ctx context.Context, request *pb.FindInstancesRequest, provider *pb.MicroServiceKey) (*pb.FindInstancesResponse, error) { + var err error + domainProject := util.ParseDomainProject(ctx) + service := &Service{ServiceInfo: &pb.MicroService{Environment: request.Environment}} + if len(request.ConsumerServiceId) > 0 { + filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId) + service, err = GetService(ctx, filter) + if err != nil { + log.Error(fmt.Sprintf("get consumer failed, consumer %s find provider %s/%s/%s/%s", + request.ConsumerServiceId, request.Environment, request.AppId, request.ServiceName, request.VersionRule), err) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if service == nil { + log.Error(fmt.Sprintf("consumer does not exist, consumer %s find provider %s/%s/%s/%s", + request.ConsumerServiceId, request.Environment, request.AppId, request.ServiceName, request.VersionRule), err) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, + fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)), + }, nil + } + provider.Environment = service.ServiceInfo.Environment + } + + // provider is not a shared micro-service, + // only allow shared micro-service instances found request different domains. + ctx = util.SetTargetDomainProject(ctx, util.ParseDomain(ctx), util.ParseProject(ctx)) + provider.Tenant = util.ParseTargetDomainProject(ctx) + + findFlag := fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s/%s/%s/%s]", + request.ConsumerServiceId, service.ServiceInfo.Environment, service.ServiceInfo.AppId, service.ServiceInfo.ServiceName, service.ServiceInfo.Version, + provider.Environment, provider.AppId, provider.ServiceName, provider.Version) + + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + resp, err := client.GetMongoClient().Find(ctx, CollectionInstance, bson.M{ColumnDomain: domain, ColumnProject: project}) + if err != nil { + log.Error(fmt.Sprintf("FindInstancesCache.Get failed %s failed", findFlag), err) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + if resp == nil { + mes := fmt.Errorf("%s failed, provider does not exist", findFlag) + log.Error("FindInstancesCache.Get failed", mes) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, mes.Error()), + }, nil + } + + var instances []*pb.MicroServiceInstance + for resp.Next(ctx) { + var instance Instance + err := resp.Decode(&instance) + if err != nil { + log.Error(fmt.Sprintf("FindInstances.GetWithProviderID failed %s failed", findFlag), err) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + instances = append(instances, instance.InstanceInfo) + } + + // add dependency queue + if len(request.ConsumerServiceId) > 0 && + len(instances) > 0 { + provider, err = ds.reshapeProviderKey(ctx, provider, instances[0].ServiceId) + if err != nil { + return nil, err + } + if provider != nil { + err = AddServiceVersionRule(ctx, domainProject, service.ServiceInfo, provider) + } else { + mes := fmt.Errorf("%s failed, provider does not exist", findFlag) + log.Error("AddServiceVersionRule failed", mes) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrServiceNotExists, mes.Error()), + }, nil + } + if err != nil { + log.Error(fmt.Sprintf("AddServiceVersionRule failed %s failed", findFlag), err) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ErrInternal, err.Error()), + }, err + } + } + + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(pb.ResponseSuccess, "Query service instances successfully."), + Instances: instances, + }, nil +} + +func (ds *DataSource) reshapeProviderKey(ctx context.Context, provider *pb.MicroServiceKey, providerID string) ( + *pb.MicroServiceKey, error) { + //维护version的规则,service name 可能是别名,所以重新获取 + filter := GeneratorServiceFilter(ctx, providerID) + providerService, err := GetService(ctx, filter) + if providerService == nil { + return nil, err + } + + versionRule := provider.Version + provider = pb.MicroServiceToKey(provider.Tenant, providerService.ServiceInfo) + provider.Version = versionRule + return provider, nil +} + +func AddServiceVersionRule(ctx context.Context, domainProject string, consumer *pb.MicroService, provider *pb.MicroServiceKey) error { + return nil +} + +func GetInstance(ctx context.Context, serviceID string, instanceID string) (*Instance, error) { + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): serviceID, + StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instanceID} + findRes, err := client.GetMongoClient().FindOne(ctx, CollectionInstance, filter) + if err != nil { + return nil, err + } + var instance *Instance + if findRes.Err() != nil { + //not get any service,not db err + return nil, nil + } + err = findRes.Decode(&instance) + if err != nil { + return nil, err + } + return instance, nil +} + +func UpdateInstanceS(ctx context.Context, instance *pb.MicroServiceInstance) *pb.Error { + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): instance.ServiceId, + StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instance.InstanceId} + _, err := client.GetMongoClient().Update(ctx, CollectionInstance, filter, bson.M{"$set": bson.M{"instance.motTimestamp": strconv.FormatInt(time.Now().Unix(), 10), "instance.status": instance.Status}}) + if err != nil { + return pb.NewError(pb.ErrUnavailableBackend, err.Error()) + } + return nil +} + +func UpdateInstanceP(ctx context.Context, instance *pb.MicroServiceInstance) *pb.Error { + domain := util.ParseDomain(ctx) + project := util.ParseProject(ctx) + filter := bson.M{ + ColumnDomain: domain, + ColumnProject: project, + StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): instance.ServiceId, + StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instance.InstanceId} + _, err := client.GetMongoClient().Update(ctx, CollectionInstance, filter, bson.M{"$set": bson.M{"instance.motTimestamp": strconv.FormatInt(time.Now().Unix(), 10), "instance.properties": instance.Properties}}) + if err != nil { + return pb.NewError(pb.ErrUnavailableBackend, err.Error()) + } + return nil +} + +func KeepAliveLease(ctx context.Context, request *pb.HeartbeatRequest) *pb.Error { + _, err := heartbeat.Instance().Heartbeat(ctx, request) + if err != nil { + return pb.NewError(pb.ErrInstanceNotExists, err.Error()) + } + return nil +} + +func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) { + return func(_ context.Context) { + hbRst := &pb.InstanceHbRst{ + ServiceId: element.ServiceId, + InstanceId: element.InstanceId, + ErrMessage: "", + } + + req := &pb.HeartbeatRequest{ + InstanceId: element.InstanceId, + ServiceId: element.ServiceId, + } + + err := KeepAliveLease(ctx, req) + if err != nil { + hbRst.ErrMessage = err.Error() + log.Error(fmt.Sprintf("heartbeat set failed %s %s", element.ServiceId, element.InstanceId), err) + } + instancesHbRst <- hbRst + } +} + +func (ds *DataSource) batchFindServices(ctx context.Context, request *pb.BatchFindInstancesRequest) ( + *pb.BatchFindResult, error) { + if len(request.Services) == 0 { + return nil, nil + } + cloneCtx := util.CloneContext(ctx) + + services := &pb.BatchFindResult{} + failedResult := make(map[int32]*pb.FindFailedResult) + for index, key := range request.Services { + findCtx := util.SetContext(cloneCtx, util.CtxRequestRevision, key.Rev) + resp, err := ds.FindInstances(findCtx, &pb.FindInstancesRequest{ + ConsumerServiceId: request.ConsumerServiceId, + AppId: key.Service.AppId, + ServiceName: key.Service.ServiceName, + VersionRule: key.Service.Version, + Environment: key.Service.Environment, + }) + if err != nil { + return nil, err + } + failed, ok := failedResult[resp.Response.GetCode()] + AppendFindResponse(findCtx, int64(index), resp.Response, resp.Instances, + &services.Updated, &services.NotModified, &failed) + if !ok && failed != nil { + failedResult[resp.Response.GetCode()] = failed + } + } + for _, result := range failedResult { + services.Failed = append(services.Failed, result) + } + return services, nil +} + +func (ds *DataSource) batchFindInstances(ctx context.Context, request *pb.BatchFindInstancesRequest) (*pb.BatchFindResult, error) { + if len(request.Instances) == 0 { + return nil, nil + } + cloneCtx := util.CloneContext(ctx) + // can not find the shared provider instances + cloneCtx = util.SetTargetDomainProject(cloneCtx, util.ParseDomain(ctx), util.ParseProject(ctx)) + + instances := &pb.BatchFindResult{} + failedResult := make(map[int32]*pb.FindFailedResult) + for index, key := range request.Instances { + getCtx := util.SetContext(cloneCtx, util.CtxRequestRevision, key.Rev) + resp, err := ds.GetInstance(getCtx, &pb.GetOneInstanceRequest{ + ConsumerServiceId: request.ConsumerServiceId, + ProviderServiceId: key.Instance.ServiceId, + ProviderInstanceId: key.Instance.InstanceId, + }) + if err != nil { + return nil, err + } + failed, ok := failedResult[resp.Response.GetCode()] + AppendFindResponse(getCtx, int64(index), resp.Response, []*pb.MicroServiceInstance{resp.Instance}, + &instances.Updated, &instances.NotModified, &failed) + if !ok && failed != nil { + failedResult[resp.Response.GetCode()] = failed + } + } + for _, result := range failedResult { + instances.Failed = append(instances.Failed, result) + } + return instances, nil +} + +func AppendFindResponse(ctx context.Context, index int64, resp *pb.Response, instances []*pb.MicroServiceInstance, + updatedResult *[]*pb.FindResult, notModifiedResult *[]int64, failedResult **pb.FindFailedResult) { + if code := resp.GetCode(); code != pb.ResponseSuccess { + if *failedResult == nil { + *failedResult = &pb.FindFailedResult{ + Error: pb.NewError(code, resp.GetMessage()), + } + } + (*failedResult).Indexes = append((*failedResult).Indexes, index) + return + } + iv, _ := ctx.Value(util.CtxRequestRevision).(string) + ov, _ := ctx.Value(util.CtxResponseRevision).(string) + if len(iv) > 0 && iv == ov { + *notModifiedResult = append(*notModifiedResult, index) + return + } + *updatedResult = append(*updatedResult, &pb.FindResult{ + Index: index, + Instances: instances, + Rev: ov, + }) +} + +func preProcessRegisterInstance(ctx context.Context, instance *pb.MicroServiceInstance) *pb.Error { + if len(instance.Status) == 0 { + instance.Status = pb.MSI_UP + } + + if len(instance.InstanceId) == 0 { + instance.InstanceId = uuid.Generator().GetInstanceID(ctx) + } + + instance.Timestamp = strconv.FormatInt(time.Now().Unix(), 10) + instance.ModTimestamp = instance.Timestamp + + // 这里应该根据租约计时 + renewalInterval := apt.RegistryDefaultLeaseRenewalinterval + retryTimes := apt.RegistryDefaultLeaseRetrytimes + if instance.HealthCheck == nil { + instance.HealthCheck = &pb.HealthCheck{ + Mode: pb.CHECK_BY_HEARTBEAT, + Interval: renewalInterval, + Times: retryTimes, + } + } else { + // Health check对象仅用于呈现服务健康检查逻辑,如果CHECK_BY_PLATFORM类型,表明由sidecar代发心跳,实例120s超时 + switch instance.HealthCheck.Mode { + case pb.CHECK_BY_HEARTBEAT: + d := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1) + if d <= 0 { + return pb.NewError(pb.ErrInvalidParams, "Invalid 'healthCheck' settings in request body.") + } + case pb.CHECK_BY_PLATFORM: + // 默认120s + instance.HealthCheck.Interval = renewalInterval + instance.HealthCheck.Times = retryTimes + } + } + + filter := GeneratorServiceFilter(ctx, instance.ServiceId) + microservice, err := GetService(ctx, filter) + if microservice == nil || err != nil { + return pb.NewError(pb.ErrServiceNotExists, "Invalid 'serviceID' in request body.") + } + instance.Version = microservice.ServiceInfo.Version + return nil } diff --git a/datasource/mongo/ms_test.go b/datasource/mongo/ms_test.go index 8a2fb9c..5e4254f 100644 --- a/datasource/mongo/ms_test.go +++ b/datasource/mongo/ms_test.go @@ -24,11 +24,13 @@ import ( "time" "github.com/apache/servicecomb-service-center/datasource" + "github.com/apache/servicecomb-service-center/datasource/mongo" "github.com/apache/servicecomb-service-center/datasource/mongo/client" "github.com/apache/servicecomb-service-center/server/plugin/quota" pb "github.com/go-chassis/cari/discovery" "github.com/go-chassis/go-chassis/v2/storage" "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" ) func init() { @@ -185,34 +187,6 @@ func TestGetService(t *testing.T) { } -// need mongodb cluster -//func TestServiceDelete(t *testing.T) { -// t.Run("delete service by mongo, should pass", func(t *testing.T) { -// request := &pb.CreateServiceRequest{ -// Service: &pb.MicroService{ -// ServiceId: "ms-service-delete-new-id", -// ServiceName: "ms-service-delete", -// AppId: "default", -// Version: "1.0.4", -// Level: "BACK", -// Properties: make(map[string]string), -// }, -// } -// -// resp, err := datasource.Instance().RegisterService(getContext(), request) -// assert.NoError(t, err) -// assert.Equal(t, resp.Response.GetCode(), pb.ResponseSuccess) -// -// res, err := datasource.Instance().UnregisterService(getContext(), &pb.DeleteServiceRequest{ -// ServiceId: "ms-service-delete-new-id", -// Force: false, -// }) -// fmt.Println(res.Response.Message) -// assert.NoError(t, err) -// assert.Equal(t, pb.ResponseSuccess, res.Response.GetCode()) -// }) -//} - func TestUpdateService(t *testing.T) { t.Run("update service by mongo, should pass", func(t *testing.T) { request := &pb.CreateServiceRequest{ @@ -247,7 +221,7 @@ func TestUpdateService(t *testing.T) { func TestTagsAdd(t *testing.T) { // create service - t.Run("create service", func(t *testing.T) { + t.Run("create service, the request is valid, should pass", func(t *testing.T) { svc1 := &pb.MicroService{ ServiceId: "service_tag_id", AppId: "create_tag_group_ms", @@ -264,7 +238,7 @@ func TestTagsAdd(t *testing.T) { }) // - t.Run("the request is valid", func(t *testing.T) { + t.Run("create service, the request is valid, should pass", func(t *testing.T) { defaultQuota := quota.DefaultTagQuota tags := make(map[string]string, defaultQuota) for i := 0; i < defaultQuota; i++ { @@ -282,7 +256,7 @@ func TestTagsAdd(t *testing.T) { } func TestTagsGet(t *testing.T) { - t.Run("create service and add tags", func(t *testing.T) { + t.Run("create service and add tags, the request is valid, should pass", func(t *testing.T) { svc := &pb.MicroService{ ServiceId: "get_tag_group_ms_id", AppId: "get_tag_group_ms", @@ -318,7 +292,7 @@ func TestTagsGet(t *testing.T) { } func TestTagUpdate(t *testing.T) { - t.Run("add service and add tags", func(t *testing.T) { + t.Run("add service and add tags, the request is valid, should pass", func(t *testing.T) { svc := &pb.MicroService{ ServiceId: "update_tag_group_ms_id", AppId: "update_tag_group_ms", @@ -344,7 +318,7 @@ func TestTagUpdate(t *testing.T) { assert.Equal(t, pb.ResponseSuccess, respAddTags.Response.GetCode()) }) - t.Run("the request is valid", func(t *testing.T) { + t.Run("the request is valid, should pass", func(t *testing.T) { resp, err := datasource.Instance().UpdateTag(getContext(), &pb.UpdateServiceTagRequest{ ServiceId: "update_tag_group_ms_id", Key: "a", @@ -356,7 +330,7 @@ func TestTagUpdate(t *testing.T) { } func TestTagsDelete(t *testing.T) { - t.Run("create service and add tags", func(t *testing.T) { + t.Run("create service and add tags, the request is valid, should pass", func(t *testing.T) { resp, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ Service: &pb.MicroService{ ServiceId: "delete_tag_group_ms_id", @@ -380,7 +354,7 @@ func TestTagsDelete(t *testing.T) { assert.NoError(t, err) assert.Equal(t, pb.ResponseSuccess, respAddTages.Response.GetCode()) }) - t.Run("the request is valid", func(t *testing.T) { + t.Run("the request is valid, should pass", func(t *testing.T) { resp, err := datasource.Instance().DeleteTags(getContext(), &pb.DeleteServiceTagsRequest{ ServiceId: "delete_tag_group_ms_id", Keys: []string{"b"}, @@ -398,7 +372,7 @@ func TestTagsDelete(t *testing.T) { } func TestRuleAdd(t *testing.T) { - t.Run("register service and datasource.Instance()", func(t *testing.T) { + t.Run("register service, the request is valid, should pass", func(t *testing.T) { respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ Service: &pb.MicroService{ ServiceId: "create_rule_group_ms_id", @@ -412,7 +386,7 @@ func TestRuleAdd(t *testing.T) { assert.NoError(t, err) assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) }) - t.Run("request is valid", func(t *testing.T) { + t.Run("register service, the request is valid, should pass", func(t *testing.T) { respAddRule, err := datasource.Instance().AddRule(getContext(), &pb.AddServiceRulesRequest{ ServiceId: "create_rule_group_ms_id", Rules: []*pb.AddOrUpdateServiceRule{ @@ -429,7 +403,7 @@ func TestRuleAdd(t *testing.T) { ruleId := respAddRule.RuleIds[0] assert.NotEqual(t, "", ruleId) }) - t.Run("request rule is already exist", func(t *testing.T) { + t.Run("request rule is already exist, should pass", func(t *testing.T) { respAddRule, err := datasource.Instance().AddRule(getContext(), &pb.AddServiceRulesRequest{ ServiceId: "create_rule_group_ms_id", Rules: []*pb.AddOrUpdateServiceRule{ @@ -448,7 +422,7 @@ func TestRuleAdd(t *testing.T) { } func TestRuleGet(t *testing.T) { - t.Run("register service and rules", func(t *testing.T) { + t.Run("register service and rules, the request is valid, should pass", func(t *testing.T) { respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ Service: &pb.MicroService{ ServiceId: "get_rule_group_ms_id", @@ -478,7 +452,7 @@ func TestRuleGet(t *testing.T) { ruleId := respAddRule.RuleIds[0] assert.NotEqual(t, "", ruleId) }) - t.Run("get when request is valid", func(t *testing.T) { + t.Run("get rule, when request is valid, should pass", func(t *testing.T) { respGetRule, err := datasource.Instance().GetRules(getContext(), &pb.GetServiceRulesRequest{ ServiceId: "get_rule_group_ms_id", }) @@ -490,7 +464,7 @@ func TestRuleGet(t *testing.T) { func TestRuleDelete(t *testing.T) { var ruleId string - t.Run("register service and rules", func(t *testing.T) { + t.Run("register service and rules, when request is valid, should pass", func(t *testing.T) { respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ Service: &pb.MicroService{ ServiceId: "delete_rule_group_ms_id", @@ -519,7 +493,7 @@ func TestRuleDelete(t *testing.T) { assert.Equal(t, pb.ResponseSuccess, respAddRule.Response.GetCode()) ruleId = respAddRule.RuleIds[0] }) - t.Run("delete when request is valid", func(t *testing.T) { + t.Run("delete rule, when request is valid, should pass", func(t *testing.T) { resp, err := datasource.Instance().DeleteRule(getContext(), &pb.DeleteServiceRulesRequest{ ServiceId: "delete_rule_group_ms_id", RuleIds: []string{ruleId}, @@ -538,7 +512,7 @@ func TestRuleDelete(t *testing.T) { func TestRuleUpdate(t *testing.T) { var ruleId string - t.Run("create service and rules", func(t *testing.T) { + t.Run("create service and rules, when request is valid, should pass", func(t *testing.T) { respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ Service: &pb.MicroService{ ServiceId: "update_rule_group_ms_id", @@ -568,7 +542,7 @@ func TestRuleUpdate(t *testing.T) { assert.NotEqual(t, "", respAddRule.RuleIds[0]) ruleId = respAddRule.RuleIds[0] }) - t.Run("update when request is valid", func(t *testing.T) { + t.Run("update rule, when request is valid, should pass", func(t *testing.T) { resp, err := datasource.Instance().UpdateRule(getContext(), &pb.UpdateServiceRuleRequest{ ServiceId: "update_rule_group_ms_id", RuleId: ruleId, @@ -584,70 +558,500 @@ func TestRuleUpdate(t *testing.T) { }) } -// 需要多集群mongo支持 -//func TestSchema(t *testing.T) { -// t.Run("create a schema in production env", func(t *testing.T) { -// respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ -// Service: &pb.MicroService{ -// ServiceId: "create_schema_prod_service_ms_id1", -// AppId: "create_schema_prod_service_ms", -// ServiceName: "create_schema_service_service_ms", -// Version: "1.0.0", -// Level: "FRONT", -// Status: pb.MS_UP, -// Environment: pb.ENV_PROD, -// }, -// }) -// assert.NoError(t, err) -// assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) -// -// respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ -// Service: &pb.MicroService{ -// ServiceId: "create_schema_prod_service_ms_id2", -// AppId: "create_schema_prod_service_ms", -// ServiceName: "create_schema_service_service_ms", -// Version: "1.0.1", -// Level: "FRONT", -// Schemas: []string{ -// "first_schemaId_service_ms", -// "second_schemaId_service_ms", -// }, -// Status: pb.MS_UP, -// Environment: pb.ENV_PROD, -// }, -// }) -// assert.NoError(t, err) -// assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) -// -// respModifySchema, err := datasource.Instance().ModifySchema(getContext(), &pb.ModifySchemaRequest{ -// ServiceId: "create_schema_prod_service_ms_id1", -// SchemaId: "first_schemaId_service_ms", -// Schema: "first_schema_service_ms", -// }) -// assert.NoError(t, err) -// assert.Equal(t, pb.ResponseSuccess, respModifySchema.Response.GetCode()) -// -// respModifySchema, err = datasource.Instance().ModifySchema(getContext(), &pb.ModifySchemaRequest{ -// ServiceId: "create_schema_prod_service_ms_id1", -// SchemaId: "first_schemaId_service_ms", -// Schema: "first_schema_change_service_ms", -// Summary: "first0summary1change_service_ms", -// }) -// assert.NoError(t, err) -// assert.Equal(t, pb.ResponseSuccess, respModifySchema.Response.GetCode()) -// existRes, err := datasource.Instance().ExistSchema(getContext(), &pb.GetExistenceRequest{ -// ServiceId: "create_schema_prod_service_ms_id1", -// SchemaId: "first_schemaId_service_ms", -// }) -// assert.NoError(t, err) -// assert.Equal(t, pb.ResponseSuccess, existRes.Response.GetCode()) -// assert.Equal(t, "first0summary1change_service_ms", existRes.Summary) -// -// resSchemas, err := datasource.Instance().GetAllSchemas(getContext(), &pb.GetAllSchemaRequest{ -// ServiceId: "create_schema_prod_service_ms_id1", -// WithSchema: true, -// }) -// assert.NoError(t, err) -// assert.Equal(t, 1, len(resSchemas.Schemas)) -// }) -//} +func TestInstance_Creat(t *testing.T) { + var serviceId string + + t.Run("create service, when request is valid, should pass", func(t *testing.T) { + insertRes, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "service1", + ServiceName: "create_instance_service_ms", + AppId: "create_instance_ms", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, insertRes.Response.GetCode()) + serviceId = insertRes.ServiceId + }) + + t.Run("register instance, when request is valid, should pass", func(t *testing.T) { + respCreateInst, err := datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + ServiceId: serviceId, + Endpoints: []string{ + "createInstance_ms:127.0.0.1:8080", + }, + HostName: "UT-HOST", + Status: pb.MSI_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateInst.Response.GetCode()) + assert.NotEqual(t, "ins_instance", respCreateInst.InstanceId) + + respCreateInst, err = datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + InstanceId: "instance2", + ServiceId: serviceId, + Endpoints: []string{ + "createInstance_ms:127.0.0.1:8080", + }, + HostName: "UT-HOST", + Status: pb.MSI_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateInst.Response.GetCode()) + assert.Equal(t, "instance2", respCreateInst.InstanceId) + }) + + t.Run("update the same instance, should pass", func(t *testing.T) { + resp, err := datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + ServiceId: serviceId, + InstanceId: "instance3", + Endpoints: []string{ + "sameInstance:127.0.0.1:8080", + }, + HostName: "UT-HOST", + Status: pb.MSI_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode()) + + resp, err = datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + ServiceId: serviceId, + InstanceId: "instance4", + Endpoints: []string{ + "sameInstance:127.0.0.1:8080", + }, + HostName: "UT-HOST", + Status: pb.MSI_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode()) + assert.Equal(t, "instance4", resp.InstanceId) + }) + + t.Run("delete test data", func(t *testing.T) { + _, err := client.GetMongoClient().Delete(getContext(), mongo.CollectionService, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + + _, err = client.GetMongoClient().Delete(getContext(), mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + }) +} + +func TestInstance_update(t *testing.T) { + + var ( + serviceId string + instanceId string + ) + + t.Run("register service and instance, when request is valid, should pass", func(t *testing.T) { + respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "service1", + ServiceName: "update_instance_service_ms", + AppId: "update_instance_service_ms", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + serviceId = respCreateService.ServiceId + + respCreateInstance, err := datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + ServiceId: serviceId, + InstanceId: "instance1", + Endpoints: []string{ + "updateInstance:127.0.0.1:8080", + }, + HostName: "UT-HOST-MS", + Status: pb.MSI_UP, + Properties: map[string]string{"nodeIP": "test"}, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateInstance.Response.GetCode()) + instanceId = respCreateInstance.InstanceId + }) + + t.Run("update instance status, should pass", func(t *testing.T) { + respUpdateStatus, err := datasource.Instance().UpdateInstanceStatus(getContext(), &pb.UpdateInstanceStatusRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + Status: pb.MSI_DOWN, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateStatus.Response.GetCode()) + + respUpdateStatus, err = datasource.Instance().UpdateInstanceStatus(getContext(), &pb.UpdateInstanceStatusRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + Status: pb.MSI_OUTOFSERVICE, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateStatus.Response.GetCode()) + + respUpdateStatus, err = datasource.Instance().UpdateInstanceStatus(getContext(), &pb.UpdateInstanceStatusRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + Status: pb.MSI_STARTING, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateStatus.Response.GetCode()) + + respUpdateStatus, err = datasource.Instance().UpdateInstanceStatus(getContext(), &pb.UpdateInstanceStatusRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + Status: pb.MSI_TESTING, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateStatus.Response.GetCode()) + + respUpdateStatus, err = datasource.Instance().UpdateInstanceStatus(getContext(), &pb.UpdateInstanceStatusRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + Status: pb.MSI_UP, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateStatus.Response.GetCode()) + + respUpdateStatus, err = datasource.Instance().UpdateInstanceStatus(getContext(), &pb.UpdateInstanceStatusRequest{ + ServiceId: serviceId, + InstanceId: "notexistins", + Status: pb.MSI_STARTING, + }) + assert.NoError(t, err) + assert.NotEqual(t, pb.ResponseSuccess, respUpdateStatus.Response.GetCode()) + }) + + t.Run("update instance properties, should pass", func(t *testing.T) { + respUpdateProperties, err := datasource.Instance().UpdateInstanceProperties(getContext(), + &pb.UpdateInstancePropsRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + Properties: map[string]string{ + "test": "test", + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateProperties.Response.GetCode()) + + size := 1000 + properties := make(map[string]string, size) + for i := 0; i < size; i++ { + s := strconv.Itoa(i) + strings.Repeat("x", 253) + properties[s] = s + } + respUpdateProperties, err = datasource.Instance().UpdateInstanceProperties(getContext(), + &pb.UpdateInstancePropsRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + Properties: properties, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateProperties.Response.GetCode()) + + respUpdateProperties, err = datasource.Instance().UpdateInstanceProperties(getContext(), + &pb.UpdateInstancePropsRequest{ + ServiceId: serviceId, + InstanceId: "not_exist_ins", + Properties: map[string]string{ + "test": "test", + }, + }) + assert.NoError(t, err) + assert.NotEqual(t, pb.ResponseSuccess, respUpdateProperties.Response.GetCode()) + + respUpdateProperties, err = datasource.Instance().UpdateInstanceProperties(getContext(), + &pb.UpdateInstancePropsRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respUpdateProperties.Response.GetCode()) + + respUpdateProperties, err = datasource.Instance().UpdateInstanceProperties(getContext(), + &pb.UpdateInstancePropsRequest{ + ServiceId: "not_exist_service", + InstanceId: instanceId, + Properties: map[string]string{ + "test": "test", + }, + }) + assert.NoError(t, err) + assert.NotEqual(t, pb.ResponseSuccess, respUpdateProperties.Response.GetCode()) + }) + + t.Run("delete test data", func(t *testing.T) { + _, err := client.GetMongoClient().Delete(getContext(), mongo.CollectionService, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + + _, err = client.GetMongoClient().Delete(getContext(), mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + }) +} + +func TestInstance_Query(t *testing.T) { + + var ( + serviceId1 string + instanceId1 string + ) + + t.Run("register services and instance for testInstance_query, when request is invalid, should pass", func(t *testing.T) { + insertServiceRes, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "service1", + AppId: "query_instance_ms", + ServiceName: "query_instance_service_ms", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, insertServiceRes.Response.GetCode()) + serviceId1 = insertServiceRes.ServiceId + + insertInstanceRes, err := datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + InstanceId: "instance1", + ServiceId: serviceId1, + HostName: "UT-HOST-MS", + Endpoints: []string{ + "find:127.0.0.1:8080", + }, + Status: pb.MSI_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, insertInstanceRes.Response.GetCode()) + instanceId1 = insertInstanceRes.InstanceId + }) + + t.Run("query instance, when request is invalid, should pass", func(t *testing.T) { + findRes, err := datasource.Instance().FindInstances(getContext(), &pb.FindInstancesRequest{ + ConsumerServiceId: serviceId1, + AppId: "query_instance_ms", + ServiceName: "query_instance_service_ms", + VersionRule: "latest", + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, findRes.Response.GetCode()) + assert.Equal(t, instanceId1, findRes.Instances[0].InstanceId) + }) + + t.Run("batch query instance, when request is invalid, should pass", func(t *testing.T) { + respFind, err := datasource.Instance().BatchFind(getContext(), &pb.BatchFindInstancesRequest{ + ConsumerServiceId: serviceId1, + Services: []*pb.FindService{ + { + Service: &pb.MicroServiceKey{ + AppId: "query_instance_ms", + ServiceName: "query_instance_service_ms", + Version: "latest", + }, + }, + { + Service: &pb.MicroServiceKey{ + AppId: "query_instance_ms", + ServiceName: "query_instance_service_ms", + Version: "1.0.0+", + }, + }, + { + Service: &pb.MicroServiceKey{ + AppId: "query_instance_ms", + ServiceName: "query_instance_service_ms", + Version: "0.0.0", + }, + }, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respFind.Response.GetCode()) + assert.Equal(t, int64(0), respFind.Services.Updated[0].Index) + + }) + + t.Run("delete test data", func(t *testing.T) { + _, err := client.GetMongoClient().Delete(getContext(), mongo.CollectionService, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + + _, err = client.GetMongoClient().Delete(getContext(), mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + }) +} + +func TestInstance_GetOne(t *testing.T) { + + var ( + serviceId1 string + serviceId2 string + serviceId3 string + instanceId2 string + ) + + t.Run("register service and instances, when request is invalid, should pass", func(t *testing.T) { + respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "service1", + AppId: "get_instance_ms", + ServiceName: "get_instance_service_ms", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + serviceId1 = respCreateService.ServiceId + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "service2", + AppId: "get_instance_ms", + ServiceName: "get_instance_service_ms", + Version: "1.0.5", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + serviceId2 = respCreateService.ServiceId + + respCreateInstance, err := datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + InstanceId: "instance1", + ServiceId: serviceId2, + HostName: "UT-HOST-MS", + Endpoints: []string{ + "get:127.0.0.2:8080", + }, + Status: pb.MSI_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + instanceId2 = respCreateInstance.InstanceId + + respCreateService, err = datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "service3", + AppId: "get_instance_cross_ms", + ServiceName: "get_instance_service_ms", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + serviceId3 = respCreateService.ServiceId + }) + + t.Run("get between diff apps, when request is invalid, should pass", func(t *testing.T) { + resp, err := datasource.Instance().GetInstance(getContext(), &pb.GetOneInstanceRequest{ + ConsumerServiceId: serviceId3, + ProviderServiceId: serviceId2, + ProviderInstanceId: instanceId2, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode()) + }) + + t.Run("get instances, when request is invalid, should pass", func(t *testing.T) { + resp, err := datasource.Instance().GetInstances(getContext(), &pb.GetInstancesRequest{ + ConsumerServiceId: "not-exist-service-ms", + ProviderServiceId: serviceId2, + }) + assert.NoError(t, err) + assert.NotEqual(t, pb.ResponseSuccess, resp.Response.GetCode()) + resp, err = datasource.Instance().GetInstances(getContext(), &pb.GetInstancesRequest{ + ConsumerServiceId: serviceId1, + ProviderServiceId: serviceId2, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode()) + }) + + t.Run("delete test data", func(t *testing.T) { + _, err := client.GetMongoClient().Delete(getContext(), mongo.CollectionService, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + + _, err = client.GetMongoClient().Delete(getContext(), mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + }) +} + +func TestInstance_Unregister(t *testing.T) { + var ( + serviceId string + instanceId string + ) + + t.Run("register service and instances, when request is invalid, should pass", func(t *testing.T) { + respCreateService, err := datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + ServiceId: "service1", + AppId: "unregister_instance_ms", + ServiceName: "unregister_instance_service_ms", + Version: "1.0.5", + Level: "FRONT", + Status: pb.MS_UP, + }, + Tags: map[string]string{ + "test": "test", + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateService.Response.GetCode()) + serviceId = respCreateService.ServiceId + + respCreateInstance, err := datasource.Instance().RegisterInstance(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + InstanceId: "instance1", + ServiceId: serviceId, + HostName: "UT-HOST-MS", + Endpoints: []string{ + "unregister:127.0.0.2:8080", + }, + Status: pb.MSI_UP, + }, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, respCreateInstance.Response.GetCode()) + instanceId = respCreateInstance.InstanceId + }) + + t.Run("unregister instance, when request is invalid, should pass", func(t *testing.T) { + resp, err := datasource.Instance().UnregisterInstance(getContext(), &pb.UnregisterInstanceRequest{ + ServiceId: serviceId, + InstanceId: instanceId, + }) + assert.NoError(t, err) + assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode()) + }) + + t.Run("delete test data", func(t *testing.T) { + _, err := client.GetMongoClient().Delete(getContext(), mongo.CollectionService, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + + _, err = client.GetMongoClient().Delete(getContext(), mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"}) + assert.NoError(t, err) + }) +} diff --git a/datasource/mongo/engine.go b/datasource/mongo/util.go similarity index 60% copy from datasource/mongo/engine.go copy to datasource/mongo/util.go index 246ce78..8de33ae 100644 --- a/datasource/mongo/engine.go +++ b/datasource/mongo/util.go @@ -17,29 +17,16 @@ package mongo -import ( - "context" - "time" +import "strings" - "github.com/apache/servicecomb-service-center/pkg/cluster" -) - -func (ds *DataSource) SelfRegister(ctx context.Context) error { - return nil -} -func (ds *DataSource) SelfUnregister(ctx context.Context) error { - return nil -} - -// OPS -func (ds *DataSource) ClearNoInstanceServices(ctx context.Context, ttl time.Duration) error { - return nil -} - -func (ds *DataSource) UpgradeVersion(ctx context.Context) error { - return nil -} - -func (ds *DataSource) GetClusters(ctx context.Context) (cluster.Clusters, error) { - return nil, nil +func StringBuilder(data []string) string { + var str strings.Builder + for index, value := range data { + if index == 0 { + str.WriteString(value) + } else { + str.WriteString("." + value) + } + } + return str.String() }