[ https://issues.apache.org/jira/browse/SCB-1053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717122#comment-16717122 ]
ASF GitHub Bot commented on SCB-1053: ------------------------------------- little-cui closed pull request #508: SCB-1053 Add instances request struct URL: https://github.com/apache/servicecomb-service-center/pull/508 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/integration/instances_test.go b/integration/instances_test.go index 86976a5e..2f1dbaab 100644 --- a/integration/instances_test.go +++ b/integration/instances_test.go @@ -364,11 +364,27 @@ var _ = Describe("MicroService Api Test", func() { "version": serviceVersion, }, } + notExistsInstance := map[string]interface{}{ + "instance": map[string]interface{}{ + "serviceId": serviceId, + "instanceId": "notexisted", + }, + } + providerInstance := map[string]interface{}{ + "instance": map[string]interface{}{ + "serviceId": serviceId, + "instanceId": serviceInstanceID, + }, + } findRequest := map[string]interface{}{ "services": []map[string]interface{}{ provider, notExistsService, }, + "instances": []map[string]interface{}{ + providerInstance, + notExistsInstance, + }, } body, _ := json.Marshal(findRequest) bodyBuf := bytes.NewReader(body) @@ -378,8 +394,10 @@ var _ = Describe("MicroService Api Test", func() { resp, _ := scclient.Do(req) respbody, _ := ioutil.ReadAll(resp.Body) Expect(resp.StatusCode).To(Equal(http.StatusOK)) - servicesStruct := map[string][]map[string]interface{}{} - json.Unmarshal(respbody, &servicesStruct) + respStruct := map[string]map[string][]map[string]interface{}{} + json.Unmarshal(respbody, &respStruct) + servicesStruct := respStruct["services"] + instancesStruct := respStruct["instances"] failed := false for _, services := range servicesStruct["failed"] { a := services["indexes"].([]interface{})[0] == 1.0 @@ -393,6 +411,19 @@ var _ = Describe("MicroService Api Test", func() { Expect(servicesStruct["updated"][0]["index"]).To(Equal(0.0)) Expect(len(servicesStruct["updated"][0]["instances"].([]interface{}))). ToNot(Equal(0)) + failed = false + for _, instances := range instancesStruct["failed"] { + a := instances["indexes"].([]interface{})[0] == 1.0 + b := instances["error"].(map[string]interface{})["errorCode"] == "400017" + if a && b { + failed = true + break + } + } + Expect(failed).To(Equal(true)) + Expect(instancesStruct["updated"][0]["index"]).To(Equal(0.0)) + Expect(len(instancesStruct["updated"][0]["instances"].([]interface{}))). + ToNot(Equal(0)) }) }) diff --git a/server/core/proto/batch_find.go b/server/core/proto/batch_find.go index 16eb99cc..fc020213 100644 --- a/server/core/proto/batch_find.go +++ b/server/core/proto/batch_find.go @@ -24,6 +24,11 @@ type FindService struct { Rev string `protobuf:"bytes,2,opt,name=rev" json:"rev,omitempty"` } +type FindInstance struct { + Instance *HeartbeatSetElement `protobuf:"bytes,1,opt,name=instance" json:"instance"` + Rev string `protobuf:"bytes,2,opt,name=rev" json:"rev,omitempty"` +} + type FindResult struct { Index int64 `protobuf:"varint,1,opt,name=index" json:"index"` Rev string `protobuf:"bytes,2,opt,name=rev" json:"rev"` @@ -35,14 +40,20 @@ type FindFailedResult struct { Error *scerr.Error `protobuf:"bytes,2,opt,name=error" json:"error"` } +type BatchFindResult struct { + Failed []*FindFailedResult `protobuf:"bytes,1,rep,name=failed" json:"failed,omitempty"` + NotModified []int64 `protobuf:"varint,2,rep,packed,name=notModified" json:"notModified,omitempty"` + Updated []*FindResult `protobuf:"bytes,3,rep,name=updated" json:"updated,omitempty"` +} + type BatchFindInstancesRequest struct { - ConsumerServiceId string `protobuf:"bytes,1,opt,name=consumerServiceId" json:"consumerServiceId,omitempty"` - Services []*FindService `protobuf:"bytes,2,rep,name=services" json:"services"` + ConsumerServiceId string `protobuf:"bytes,1,opt,name=consumerServiceId" json:"consumerServiceId,omitempty"` + Services []*FindService `protobuf:"bytes,2,rep,name=services" json:"services,omitempty"` + Instances []*FindInstance `protobuf:"bytes,3,rep,name=instances" json:"instances,omitempty"` } type BatchFindInstancesResponse struct { - Response *Response `protobuf:"bytes,1,opt,name=response" json:"response,omitempty"` - Failed []*FindFailedResult `protobuf:"bytes,2,rep,name=failed" json:"failed,omitempty"` - NotModified []int64 `protobuf:"varint,3,rep,packed,name=notModified" json:"notModified,omitempty"` - Updated []*FindResult `protobuf:"bytes,4,rep,name=updated" json:"updated,omitempty"` + Response *Response `protobuf:"bytes,1,opt,name=response" json:"response,omitempty"` + Services *BatchFindResult `protobuf:"bytes,2,rep,name=services" json:"services,omitempty"` + Instances *BatchFindResult `protobuf:"bytes,3,rep,name=instances" json:"instances,omitempty"` } diff --git a/server/core/swagger/v4.yaml b/server/core/swagger/v4.yaml index 6e2e45b9..79947d12 100644 --- a/server/core/swagger/v4.yaml +++ b/server/core/swagger/v4.yaml @@ -1964,6 +1964,14 @@ definitions: rev: type: string description: 客户端缓存的版本号。 + FindInstance: + type: object + properties: + instance: + $ref: '#/definitions/HeartbeatSetElement' + rev: + type: string + description: 客户端缓存的版本号。 BatchFindRequest: type: object properties: @@ -1971,6 +1979,10 @@ definitions: type: array items: $ref: '#/definitions/FindService' + instances: + type: array + items: + $ref: '#/definitions/FindInstance' FindResult: type: object properties: @@ -1994,7 +2006,7 @@ definitions: description: 与请求数组对应的索引集合。 error: $ref: '#/definitions/Error' - BatchFindResponse: + BatchFindResult: type: object properties: failed: @@ -2010,6 +2022,13 @@ definitions: type: array items: $ref: '#/definitions/FindResult' + BatchFindResponse: + type: object + properties: + services: + $ref: '#/definitions/BatchFindResult' + instances: + $ref: '#/definitions/BatchFindResult' CreateDependenciesRequest: type: object properties: diff --git a/server/rest/controller/v4/instance_controller.go b/server/rest/controller/v4/instance_controller.go index 36efd165..5124a59d 100644 --- a/server/rest/controller/v4/instance_controller.go +++ b/server/rest/controller/v4/instance_controller.go @@ -191,9 +191,18 @@ func (this *MicroServiceInstanceService) GetOneInstance(w http.ResponseWriter, r ProviderInstanceId: query.Get(":instanceId"), Tags: ids, } + resp, _ := core.InstanceAPI.GetOneInstance(r.Context(), request) respInternal := resp.Response resp.Response = nil + + iv, _ := r.Context().Value(serviceUtil.CTX_REQUEST_REVISION).(string) + ov, _ := r.Context().Value(serviceUtil.CTX_RESPONSE_REVISION).(string) + w.Header().Set(serviceUtil.HEADER_REV, ov) + if len(iv) > 0 && iv == ov { + w.WriteHeader(http.StatusNotModified) + return + } controller.WriteResponse(w, respInternal, resp) } @@ -212,6 +221,14 @@ func (this *MicroServiceInstanceService) GetInstances(w http.ResponseWriter, r * resp, _ := core.InstanceAPI.GetInstances(r.Context(), request) respInternal := resp.Response resp.Response = nil + + iv, _ := r.Context().Value(serviceUtil.CTX_REQUEST_REVISION).(string) + ov, _ := r.Context().Value(serviceUtil.CTX_RESPONSE_REVISION).(string) + w.Header().Set(serviceUtil.HEADER_REV, ov) + if len(iv) > 0 && iv == ov { + w.WriteHeader(http.StatusNotModified) + return + } controller.WriteResponse(w, respInternal, resp) } diff --git a/server/service/cache/common.go b/server/service/cache/common.go index b25c92d8..d153a9dc 100644 --- a/server/service/cache/common.go +++ b/server/service/cache/common.go @@ -17,10 +17,11 @@ package cache const ( - CTX_FIND_CONSUMER = "_consumer" - CTX_FIND_PROVIDER = "_provider" - CTX_FIND_TAGS = "_tags" - CTX_FIND_REQUEST_REV = "_rev" + CTX_FIND_CONSUMER = "_consumer" + CTX_FIND_PROVIDER = "_provider" + CTX_FIND_PROVIDER_INSTANCE = "_provider_instance" + CTX_FIND_TAGS = "_tags" + CTX_FIND_REQUEST_REV = "_rev" CACHE_FIND = "_find" CACHE_DEP = "_dep" diff --git a/server/service/cache/filter_instances.go b/server/service/cache/filter_instances.go index 0f267508..4c0b58ba 100644 --- a/server/service/cache/filter_instances.go +++ b/server/service/cache/filter_instances.go @@ -46,12 +46,17 @@ type InstancesFilter struct { } func (f *InstancesFilter) Name(ctx context.Context, _ *cache.Node) string { + instanceKey, ok := ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement) + if ok { + return instanceKey.ServiceId + apt.SPLIT + instanceKey.InstanceId + } return "" } func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) { pCopy := *parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) - pCopy.Instances, pCopy.Rev, err = f.FindInstances(ctx, pCopy.ServiceIds) + + pCopy.Instances, pCopy.Rev, err = f.Find(ctx, parent) if err != nil { return } @@ -62,34 +67,76 @@ func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node *c return } -func (f *InstancesFilter) FindInstances(ctx context.Context, serviceIds []string) (instances []*pb.MicroServiceInstance, rev string, err error) { +func (f *InstancesFilter) Find(ctx context.Context, parent *cache.Node) ( + instances []*pb.MicroServiceInstance, rev string, err error) { + pCache := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey) + + instanceKey, ok := ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement) + if ok { + if len(pCache.ServiceIds) == 0 { + // can not find by instanceKey.ServiceId after pre-filters init + return + } + instances, rev, err = f.FindInstances(ctx, provider.Tenant, instanceKey) + } else { + instances, rev, err = f.BatchFindInstances(ctx, provider.Tenant, pCache.ServiceIds) + } + if err != nil { + consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService) + findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId, + provider.AppId, provider.ServiceName, provider.Version) + log.Errorf(err, "Find failed, %s", findFlag) + } + return +} + +func (f *InstancesFilter) findInstances(ctx context.Context, domainProject, serviceId, instanceId string, maxRevs []int64, counts []int64) (instances []*pb.MicroServiceInstance, err error) { + key := apt.GenerateInstanceKey(domainProject, serviceId, instanceId) + opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix()) + resp, err := backend.Store().Instance().Search(ctx, opts...) + if err != nil { + return nil, err + } + if len(resp.Kvs) == 0 { + return + } + + for _, kv := range resp.Kvs { + if i, ok := clustersIndex[kv.ClusterName]; ok { + if kv.ModRevision > maxRevs[i] { + maxRevs[i] = kv.ModRevision + } + counts[i]++ + } + instances = append(instances, kv.Value.(*pb.MicroServiceInstance)) + } + return +} + +func (f *InstancesFilter) FindInstances(ctx context.Context, domainProject string, instanceKey *pb.HeartbeatSetElement) (instances []*pb.MicroServiceInstance, rev string, err error) { + var ( + maxRevs = make([]int64, len(clustersIndex)) + counts = make([]int64, len(clustersIndex)) + ) + instances, err = f.findInstances(ctx, domainProject, instanceKey.ServiceId, instanceKey.InstanceId, maxRevs, counts) + if err != nil { + return + } + return instances, serviceUtil.FormatRevision(maxRevs, counts), nil +} + +func (f *InstancesFilter) BatchFindInstances(ctx context.Context, domainProject string, serviceIds []string) (instances []*pb.MicroServiceInstance, rev string, err error) { var ( maxRevs = make([]int64, len(clustersIndex)) counts = make([]int64, len(clustersIndex)) ) for _, providerServiceId := range serviceIds { - key := apt.GenerateInstanceKey(provider.Tenant, providerServiceId, "") - opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix()) - resp, err := backend.Store().Instance().Search(ctx, opts...) + insts, err := f.findInstances(ctx, domainProject, providerServiceId, "", maxRevs, counts) if err != nil { - consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService) - findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId, - provider.AppId, provider.ServiceName, provider.Version) - log.Errorf(err, "Instance().Search failed, %s", findFlag) return nil, "", err } - - for _, kv := range resp.Kvs { - if i, ok := clustersIndex[kv.ClusterName]; ok { - if kv.ModRevision > maxRevs[i] { - maxRevs[i] = kv.ModRevision - } - counts[i]++ - } - instances = append(instances, kv.Value.(*pb.MicroServiceInstance)) - } - + instances = append(instances, insts...) } return instances, serviceUtil.FormatRevision(maxRevs, counts), nil diff --git a/server/service/cache/filter_rev.go b/server/service/cache/filter_rev.go index d3eb5a47..381b0332 100644 --- a/server/service/cache/filter_rev.go +++ b/server/service/cache/filter_rev.go @@ -38,35 +38,34 @@ func (f *RevisionFilter) Name(ctx context.Context, parent *cache.Node) string { } func (f *RevisionFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) { - item := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) + pCache := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) requestRev := ctx.Value(CTX_FIND_REQUEST_REV).(string) - if len(requestRev) == 0 || requestRev == item.Rev { + if len(requestRev) == 0 || requestRev == pCache.Rev { node = cache.NewNode() - node.Cache.Set(CACHE_FIND, item) + node.Cache.Set(CACHE_FIND, pCache) return } - if item.BrokenWait() { + if pCache.BrokenWait() { node = cache.NewNode() - node.Cache.Set(CACHE_FIND, item) + node.Cache.Set(CACHE_FIND, pCache) return } cloneCtx := util.CloneContext(ctx) cloneCtx = util.SetContext(cloneCtx, serviceUtil.CTX_NOCACHE, "1") - - insts, _, err := f.FindInstances(cloneCtx, item.ServiceIds) + insts, _, err := f.Find(cloneCtx, parent) if err != nil { - item.InitBrokenQueue() + pCache.InitBrokenQueue() return nil, err } - log.Warnf("the cache of finding instances api is broken, req[%s]!=cache[%s]", - requestRev, item.Rev) - item.Instances = insts - item.Broken() + log.Warnf("the cache of finding instances api is broken, req[%s]!=cache[%s][%s]", + requestRev, pCache.Rev, parent.Name) + pCache.Instances = insts + pCache.Broken() node = cache.NewNode() - node.Cache.Set(CACHE_FIND, item) + node.Cache.Set(CACHE_FIND, pCache) return } diff --git a/server/service/cache/filter_version.go b/server/service/cache/filter_version.go index 0c47f6c1..099d2615 100644 --- a/server/service/cache/filter_version.go +++ b/server/service/cache/filter_version.go @@ -34,6 +34,15 @@ func (f *VersionRuleFilter) Name(ctx context.Context, _ *cache.Node) string { } func (f *VersionRuleFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) { + instance, ok := ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement) + if ok { + node = cache.NewNode() + node.Cache.Set(CACHE_FIND, &VersionRuleCacheItem{ + ServiceIds: []string{instance.ServiceId}, + }) + return + } + provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey) // 版本规则 ids, exist, err := serviceUtil.FindServiceIds(ctx, provider.Version, provider) @@ -41,7 +50,7 @@ func (f *VersionRuleFilter) Init(ctx context.Context, parent *cache.Node) (node consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService) findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId, provider.AppId, provider.ServiceName, provider.Version) - log.Errorf(err, "VersionRuleFilter failed, %s", findFlag) + log.Errorf(err, "FindServiceIds failed, %s", findFlag) return } if !exist { @@ -50,8 +59,7 @@ func (f *VersionRuleFilter) Init(ctx context.Context, parent *cache.Node) (node node = cache.NewNode() node.Cache.Set(CACHE_FIND, &VersionRuleCacheItem{ - VersionRule: provider.Version, - ServiceIds: ids, + ServiceIds: ids, }) return } diff --git a/server/service/cache/instance.go b/server/service/cache/instance.go index f7e7f302..b6ec6091 100644 --- a/server/service/cache/instance.go +++ b/server/service/cache/instance.go @@ -42,10 +42,9 @@ func init() { } type VersionRuleCacheItem struct { - VersionRule string - ServiceIds []string - Instances []*pb.MicroServiceInstance - Rev string + ServiceIds []string + Instances []*pb.MicroServiceInstance + Rev string broken bool queue chan struct{} @@ -88,6 +87,12 @@ func (f *FindInstancesCache) Get(ctx context.Context, consumer *pb.MicroService, return node.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem), nil } +func (f *FindInstancesCache) GetWithProviderId(ctx context.Context, consumer *pb.MicroService, provider *pb.MicroServiceKey, + instanceKey *pb.HeartbeatSetElement, tags []string, rev string) (*VersionRuleCacheItem, error) { + cloneCtx := context.WithValue(ctx, CTX_FIND_PROVIDER_INSTANCE, instanceKey) + return f.Get(cloneCtx, consumer, provider, tags, rev) +} + func (f *FindInstancesCache) Remove(provider *pb.MicroServiceKey) { f.Tree.Remove(context.WithValue(context.Background(), CTX_FIND_PROVIDER, provider)) if len(provider.Alias) > 0 { diff --git a/server/service/instance.go b/server/service/instance.go index 53eca314..a630958f 100644 --- a/server/service/instance.go +++ b/server/service/instance.go @@ -394,108 +394,172 @@ func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst } func (s *InstanceService) GetOneInstance(ctx context.Context, in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) { - if err := Validate(in); err != nil { + err := Validate(in) + if err != nil { log.Errorf(err, "get instance failed: invalid parameters") return &pb.GetOneInstanceResponse{ Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()), }, nil } - cpFunc := func() string { - return fmt.Sprintf("consumer[%s] get provider instance[%s/%s]", - in.ConsumerServiceId, in.ProviderServiceId, in.ProviderInstanceId) - } + domainProject := util.ParseDomainProject(ctx) - if checkErr := s.getInstancePreCheck(ctx, in.ProviderServiceId, in.ConsumerServiceId, in.Tags); checkErr != nil { - log.Errorf(checkErr, "%s failed: pre check failed", cpFunc()) - resp := &pb.GetOneInstanceResponse{ - Response: pb.CreateResponseWithSCErr(checkErr), + service := &pb.MicroService{} + if len(in.ConsumerServiceId) > 0 { + service, err = serviceUtil.GetService(ctx, domainProject, in.ConsumerServiceId) + if err != nil { + log.Errorf(err, "get consumer failed, consumer[%s] find provider instance[%s/%s]", + in.ConsumerServiceId, in.ProviderServiceId, in.ProviderInstanceId) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), + }, err } - if checkErr.InternalError() { - return resp, checkErr + if service == nil { + log.Errorf(nil, "consumer does not exist, consumer[%s] find provider instance[%s/%s]", + in.ConsumerServiceId, in.ProviderServiceId, in.ProviderInstanceId) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(scerr.ErrServiceNotExists, + fmt.Sprintf("Consumer[%s] does not exist.", in.ConsumerServiceId)), + }, nil } - return resp, nil } - serviceId := in.ProviderServiceId - instanceId := in.ProviderInstanceId - instance, err := serviceUtil.GetInstance(ctx, util.ParseTargetDomainProject(ctx), serviceId, instanceId) + provider, err := serviceUtil.GetService(ctx, domainProject, in.ProviderServiceId) if err != nil { - log.Errorf(err, "%s failed: get instance failed", cpFunc()) + log.Errorf(err, "get provider failed, consumer[%s] find provider instance[%s/%s]", + in.ConsumerServiceId, in.ProviderServiceId, in.ProviderInstanceId) return &pb.GetOneInstanceResponse{ Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), }, err } - if instance == nil { - log.Errorf(nil, "%s failed: instance does not exist", cpFunc()) + if provider == nil { + log.Errorf(nil, "provider does not exist, consumer[%s] find provider instance[%s/%s]", + in.ConsumerServiceId, in.ProviderServiceId, in.ProviderInstanceId) return &pb.GetOneInstanceResponse{ - Response: pb.CreateResponse(scerr.ErrInstanceNotExists, "Service instance does not exist."), + Response: pb.CreateResponse(scerr.ErrServiceNotExists, + fmt.Sprintf("Provider[%s] does not exist.", in.ProviderServiceId)), }, nil } + findFlag := func() string { + return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instance[%s]", + in.ConsumerServiceId, service.Environment, service.AppId, service.ServiceName, service.Version, + provider.ServiceId, provider.Environment, provider.AppId, provider.ServiceName, provider.Version, + in.ProviderInstanceId) + } + + var item *cache.VersionRuleCacheItem + rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(string) + item, err = cache.FindInstances.GetWithProviderId(ctx, service, pb.MicroServiceToKey(domainProject, provider), + &pb.HeartbeatSetElement{ + ServiceId: in.ProviderServiceId, InstanceId: in.ProviderInstanceId, + }, in.Tags, rev) + if err != nil { + log.Errorf(err, "FindInstances.GetWithProviderId failed, %s failed", findFlag()) + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), + }, err + } + if item == nil || len(item.Instances) == 0 { + mes := fmt.Errorf("%s failed, provider instance does not exist.", findFlag()) + log.Errorf(mes, "FindInstances.GetWithProviderId failed") + return &pb.GetOneInstanceResponse{ + Response: pb.CreateResponse(scerr.ErrInstanceNotExists, mes.Error()), + }, nil + } + + instance := item.Instances[0] + if rev == item.Rev { + instance = nil // for gRPC + } + ctx = util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev) + return &pb.GetOneInstanceResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Get instance successfully."), Instance: instance, }, nil } -func (s *InstanceService) getInstancePreCheck(ctx context.Context, providerServiceId, consumerServiceId string, tags []string) *scerr.Error { - targetDomainProject := util.ParseTargetDomainProject(ctx) - if !serviceUtil.ServiceExist(ctx, targetDomainProject, providerServiceId) { - return scerr.NewError(scerr.ErrServiceNotExists, "Provider serviceId is invalid") +func (s *InstanceService) GetInstances(ctx context.Context, in *pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) { + err := Validate(in) + if err != nil { + log.Errorf(err, "get instances failed: invalid parameters") + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()), + }, nil } - // Tag过滤 - if len(tags) > 0 { - tagsFromETCD, err := serviceUtil.GetTagsUtils(ctx, targetDomainProject, providerServiceId) + domainProject := util.ParseDomainProject(ctx) + + service := &pb.MicroService{} + if len(in.ConsumerServiceId) > 0 { + service, err = serviceUtil.GetService(ctx, domainProject, in.ConsumerServiceId) if err != nil { - return scerr.NewErrorf(scerr.ErrInternal, "An error occurred in query provider tags(%s)", err.Error()) - } - if len(tagsFromETCD) == 0 { - return scerr.NewError(scerr.ErrTagNotExists, "Provider has no tag") + log.Errorf(err, "get consumer failed, consumer[%s] find provider instances", + in.ConsumerServiceId, in.ProviderServiceId) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), + }, err } - for _, tag := range tags { - if _, ok := tagsFromETCD[tag]; !ok { - return scerr.NewErrorf(scerr.ErrTagNotExists, "Provider tags do not contain '%s'", tag) - } + if service == nil { + log.Errorf(nil, "consumer does not exist, consumer[%s] find provider instances", + in.ConsumerServiceId, in.ProviderServiceId) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrServiceNotExists, + fmt.Sprintf("Consumer[%s] does not exist.", in.ConsumerServiceId)), + }, nil } } - // 黑白名单 - // 跨应用调用 - return serviceUtil.Accessible(ctx, consumerServiceId, providerServiceId) -} -func (s *InstanceService) GetInstances(ctx context.Context, in *pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) { - if err := Validate(in); err != nil { - log.Errorf(err, "get instances failed: invalid parameters") + provider, err := serviceUtil.GetService(ctx, domainProject, in.ProviderServiceId) + if err != nil { + log.Errorf(err, "get provider failed, consumer[%s] find provider instances", + in.ConsumerServiceId, in.ProviderServiceId) return &pb.GetInstancesResponse{ - Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()), - }, nil + Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), + }, err } - - cpFunc := func() string { - return fmt.Sprintf("consumer[%s] get provider[%s] instances", + if provider == nil { + log.Errorf(nil, "provider does not exist, consumer[%s] find provider instances", in.ConsumerServiceId, in.ProviderServiceId) + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrServiceNotExists, + fmt.Sprintf("Provider[%s] does not exist.", in.ProviderServiceId)), + }, nil } - if checkErr := s.getInstancePreCheck(ctx, in.ProviderServiceId, in.ConsumerServiceId, in.Tags); checkErr != nil { - log.Errorf(checkErr, "%s failed: pre check failed", cpFunc()) - resp := &pb.GetInstancesResponse{ - Response: pb.CreateResponseWithSCErr(checkErr), - } - if checkErr.InternalError() { - return resp, checkErr - } - return resp, nil + findFlag := func() string { + return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instances", + in.ConsumerServiceId, service.Environment, service.AppId, service.ServiceName, service.Version, + provider.ServiceId, provider.Environment, provider.AppId, provider.ServiceName, provider.Version) } - instances, err := serviceUtil.GetAllInstancesOfOneService(ctx, util.ParseTargetDomainProject(ctx), in.ProviderServiceId) + var item *cache.VersionRuleCacheItem + rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(string) + item, err = cache.FindInstances.GetWithProviderId(ctx, service, pb.MicroServiceToKey(domainProject, provider), + &pb.HeartbeatSetElement{ + ServiceId: in.ProviderServiceId, + }, in.Tags, rev) if err != nil { - log.Errorf(err, "%s failed", cpFunc()) + log.Errorf(err, "FindInstances.GetWithProviderId failed, %s failed", findFlag()) return &pb.GetInstancesResponse{ Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), }, err } + if item == nil || len(item.ServiceIds) == 0 { + mes := fmt.Errorf("%s failed, provider instance does not exist.", findFlag()) + log.Errorf(mes, "FindInstances.GetWithProviderId failed") + return &pb.GetInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrServiceNotExists, mes.Error()), + }, nil + } + + instances := item.Instances + if rev == item.Rev { + instances = nil // for gRPC + } + ctx = util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev) + return &pb.GetInstancesResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."), Instances: instances, @@ -617,6 +681,14 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest) } func (s *InstanceService) BatchFind(ctx context.Context, in *pb.BatchFindInstancesRequest) (*pb.BatchFindInstancesResponse, error) { + if len(in.Services) == 0 && len(in.Instances) == 0 { + err := errors.New("Required services or instances") + log.Errorf(err, "batch find instance failed: invalid parameters") + return &pb.BatchFindInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()), + }, nil + } + err := Validate(in) if err != nil { log.Errorf(err, "batch find instance failed: invalid parameters") @@ -628,6 +700,32 @@ func (s *InstanceService) BatchFind(ctx context.Context, in *pb.BatchFindInstanc response := &pb.BatchFindInstancesResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Batch query service instances successfully."), } + + // find services + response.Services, err = s.batchFindServices(ctx, in) + if err != nil { + return &pb.BatchFindInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), + }, err + } + + // find instance + response.Instances, err = s.batchFindInstances(ctx, in) + if err != nil { + return &pb.BatchFindInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), + }, err + } + + return response, nil +} + +func (s *InstanceService) batchFindServices(ctx context.Context, in *pb.BatchFindInstancesRequest) (*pb.BatchFindResult, error) { + if len(in.Services) == 0 { + return nil, nil + } + + services := &pb.BatchFindResult{} failedResult := make(map[int32]*pb.FindFailedResult) for index, key := range in.Services { cloneCtx := util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, key.Rev) @@ -639,21 +737,49 @@ func (s *InstanceService) BatchFind(ctx context.Context, in *pb.BatchFindInstanc Environment: key.Service.Environment, }) if err != nil { - return &pb.BatchFindInstancesResponse{ - Response: resp.Response, - }, err + return nil, err } failed, ok := failedResult[resp.GetResponse().GetCode()] - serviceUtil.AppendFindResponse(cloneCtx, int64(index), resp, - &response.Updated, &response.NotModified, &failed) + serviceUtil.AppendFindResponse(cloneCtx, int64(index), resp.GetResponse(), resp.GetInstances(), + &services.Updated, &services.NotModified, &failed) if !ok && failed != nil { failedResult[resp.GetResponse().GetCode()] = failed } } for _, result := range failedResult { - response.Failed = append(response.Failed, result) + services.Failed = append(services.Failed, result) } - return response, nil + return services, nil +} + +func (s *InstanceService) batchFindInstances(ctx context.Context, in *pb.BatchFindInstancesRequest) (*pb.BatchFindResult, error) { + if len(in.Instances) == 0 { + return nil, nil + } + + instances := &pb.BatchFindResult{} + failedResult := make(map[int32]*pb.FindFailedResult) + for index, key := range in.Instances { + cloneCtx := util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, key.Rev) + resp, err := s.GetOneInstance(cloneCtx, &pb.GetOneInstanceRequest{ + ConsumerServiceId: in.ConsumerServiceId, + ProviderServiceId: key.Instance.ServiceId, + ProviderInstanceId: key.Instance.InstanceId, + }) + if err != nil { + return nil, err + } + failed, ok := failedResult[resp.GetResponse().GetCode()] + serviceUtil.AppendFindResponse(cloneCtx, int64(index), resp.GetResponse(), []*pb.MicroServiceInstance{resp.GetInstance()}, + &instances.Updated, &instances.NotModified, &failed) + if !ok && failed != nil { + failedResult[resp.GetResponse().GetCode()] = failed + } + } + for _, result := range failedResult { + instances.Failed = append(instances.Failed, result) + } + return instances, nil } func (s *InstanceService) reshapeProviderKey(ctx context.Context, provider *pb.MicroServiceKey, providerId string) (*pb.MicroServiceKey, error) { diff --git a/server/service/instance_test.go b/server/service/instance_test.go index 25b654fa..4359c36b 100644 --- a/server/service/instance_test.go +++ b/server/service/instance_test.go @@ -829,11 +829,13 @@ var _ = Describe("'Instance' service", func() { serviceId6 string serviceId7 string serviceId8 string + serviceId9 string instanceId1 string instanceId2 string instanceId4 string instanceId5 string instanceId8 string + instanceId9 string ) It("should be passed", func() { @@ -948,6 +950,19 @@ var _ = Describe("'Instance' service", func() { Expect(respCreate.Response.Code).To(Equal(pb.Response_SUCCESS)) serviceId8 = respCreate.ServiceId + respCreate, err = serviceResource.Create(getContext(), &pb.CreateServiceRequest{ + Service: &pb.MicroService{ + AppId: "query_instance", + ServiceName: "batch_query_instance_with_rev", + Version: "1.0.0", + Level: "FRONT", + Status: pb.MS_UP, + }, + }) + Expect(err).To(BeNil()) + Expect(respCreate.Response.Code).To(Equal(pb.Response_SUCCESS)) + serviceId9 = respCreate.ServiceId + resp, err := instanceResource.Register(getContext(), &pb.RegisterInstanceRequest{ Instance: &pb.MicroServiceInstance{ ServiceId: serviceId1, @@ -1017,6 +1032,20 @@ var _ = Describe("'Instance' service", func() { Expect(err).To(BeNil()) Expect(resp.Response.Code).To(Equal(pb.Response_SUCCESS)) instanceId8 = resp.InstanceId + + resp, err = instanceResource.Register(getContext(), &pb.RegisterInstanceRequest{ + Instance: &pb.MicroServiceInstance{ + ServiceId: serviceId9, + HostName: "UT-HOST", + Endpoints: []string{ + "find:127.0.0.9:8080", + }, + Status: pb.MSI_UP, + }, + }) + Expect(err).To(BeNil()) + Expect(resp.Response.Code).To(Equal(pb.Response_SUCCESS)) + instanceId9 = resp.InstanceId }) Context("when query invalid parameters", func() { @@ -1177,12 +1206,14 @@ var _ = Describe("'Instance' service", func() { respFind, err := instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ ConsumerServiceId: serviceId1, Services: nil, + Instances: nil, }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams)) respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ ConsumerServiceId: serviceId1, Services: []*pb.FindService{}, + Instances: []*pb.FindInstance{}, }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams)) @@ -1192,6 +1223,12 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams)) + respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ + ConsumerServiceId: serviceId1, + Instances: []*pb.FindInstance{{}}, + }) + Expect(err).To(BeNil()) + Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams)) By("invalid appId") respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ @@ -1339,6 +1376,32 @@ var _ = Describe("'Instance' service", func() { Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams)) + By("invalid instance") + respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ + ConsumerServiceId: serviceId1, + Instances: []*pb.FindInstance{ + { + Instance: &pb.HeartbeatSetElement{ + ServiceId: "query_instance", + }, + }, + }, + }) + Expect(err).To(BeNil()) + Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams)) + respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ + ConsumerServiceId: serviceId1, + Instances: []*pb.FindInstance{ + { + Instance: &pb.HeartbeatSetElement{ + InstanceId: "query_instance", + }, + }, + }, + }) + Expect(err).To(BeNil()) + Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams)) + By("consumerId is empty") respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ ConsumerServiceId: serviceId1, @@ -1370,8 +1433,23 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists)) - Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(0))) + Expect(respFind.Services.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists)) + Expect(respFind.Services.Failed[0].Indexes[0]).To(Equal(int64(0))) + respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ + ConsumerServiceId: serviceId1, + Instances: []*pb.FindInstance{ + { + Instance: &pb.HeartbeatSetElement{ + ServiceId: serviceId1, + InstanceId: "noninstance", + }, + }, + }, + }) + Expect(err).To(BeNil()) + Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) + Expect(respFind.Instances.Failed[0].Error.Code).To(Equal(scerr.ErrInstanceNotExists)) + Expect(respFind.Instances.Failed[0].Indexes[0]).To(Equal(int64(0))) By("provider does not contain 3.0.0+ versions") respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ @@ -1388,9 +1466,9 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(len(respFind.Updated[0].Instances)).To(Equal(0)) - Expect(respFind.Updated[0].Index).To(Equal(int64(0))) - Expect(respFind.Updated[0].Rev).ToNot(Equal("")) + Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(0)) + Expect(respFind.Services.Updated[0].Index).To(Equal(int64(0))) + Expect(respFind.Services.Updated[0].Rev).ToNot(Equal("")) By("consumer does not exist") respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ @@ -1407,8 +1485,8 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(0))) - Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists)) + Expect(respFind.Services.Failed[0].Indexes[0]).To(Equal(int64(0))) + Expect(respFind.Services.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists)) }) }) @@ -1626,12 +1704,12 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(respFind.Updated[0].Index).To(Equal(int64(0))) - Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId2)) - Expect(respFind.Updated[1].Index).To(Equal(int64(1))) - Expect(respFind.Updated[1].Instances[0].InstanceId).To(Equal(instanceId2)) - Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(2))) - Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists)) + Expect(respFind.Services.Updated[0].Index).To(Equal(int64(0))) + Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId2)) + Expect(respFind.Services.Updated[1].Index).To(Equal(int64(1))) + Expect(respFind.Services.Updated[1].Instances[0].InstanceId).To(Equal(instanceId2)) + Expect(respFind.Services.Failed[0].Indexes[0]).To(Equal(int64(2))) + Expect(respFind.Services.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists)) By("find with env") respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ @@ -1648,8 +1726,8 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(len(respFind.Updated[0].Instances)).To(Equal(1)) - Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4)) + Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1)) + Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4)) respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ Services: []*pb.FindService{ @@ -1665,8 +1743,8 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(len(respFind.Updated[0].Instances)).To(Equal(1)) - Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4)) + Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1)) + Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4)) By("find with rev") ctx := util.SetContext(getContext(), serviceUtil.CTX_NOCACHE, "") @@ -1680,13 +1758,43 @@ var _ = Describe("'Instance' service", func() { Version: "1.0.0", }, }, + { + Service: &pb.MicroServiceKey{ + AppId: "query_instance", + ServiceName: "batch_query_instance_with_rev", + Version: "1.0.0", + }, + }, + }, + Instances: []*pb.FindInstance{ + { + Instance: &pb.HeartbeatSetElement{ + ServiceId: serviceId9, + InstanceId: instanceId9, + }, + }, + { + Instance: &pb.HeartbeatSetElement{ + ServiceId: serviceId8, + InstanceId: instanceId8, + }, + }, }, }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - rev := respFind.Updated[0].Rev - Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8)) + rev := respFind.Services.Updated[0].Rev + Expect(respFind.Services.Updated[0].Index).To(Equal(int64(0))) + Expect(respFind.Services.Updated[1].Index).To(Equal(int64(1))) + Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8)) + Expect(respFind.Services.Updated[1].Instances[0].InstanceId).To(Equal(instanceId9)) Expect(len(rev)).NotTo(Equal(0)) + instanceRev := respFind.Instances.Updated[0].Rev + Expect(respFind.Instances.Updated[0].Index).To(Equal(int64(0))) + Expect(respFind.Instances.Updated[1].Index).To(Equal(int64(1))) + Expect(respFind.Instances.Updated[0].Instances[0].InstanceId).To(Equal(instanceId9)) + Expect(respFind.Instances.Updated[1].Instances[0].InstanceId).To(Equal(instanceId8)) + Expect(len(instanceRev)).NotTo(Equal(0)) respFind, err = instanceResource.BatchFind(ctx, &pb.BatchFindInstancesRequest{ ConsumerServiceId: serviceId8, @@ -1700,11 +1808,22 @@ var _ = Describe("'Instance' service", func() { Rev: "x", }, }, + Instances: []*pb.FindInstance{ + { + Instance: &pb.HeartbeatSetElement{ + ServiceId: serviceId9, + InstanceId: instanceId9, + }, + Rev: "x", + }, + }, }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8)) - Expect(respFind.Updated[0].Rev).To(Equal(rev)) + Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8)) + Expect(respFind.Services.Updated[0].Rev).To(Equal(rev)) + Expect(respFind.Instances.Updated[0].Instances[0].InstanceId).To(Equal(instanceId9)) + Expect(respFind.Instances.Updated[0].Rev).To(Equal(instanceRev)) respFind, err = instanceResource.BatchFind(ctx, &pb.BatchFindInstancesRequest{ ConsumerServiceId: serviceId8, @@ -1718,10 +1837,20 @@ var _ = Describe("'Instance' service", func() { Rev: rev, }, }, + Instances: []*pb.FindInstance{ + { + Instance: &pb.HeartbeatSetElement{ + ServiceId: serviceId9, + InstanceId: instanceId9, + }, + Rev: instanceRev, + }, + }, }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(respFind.NotModified[0]).To(Equal(int64(0))) + Expect(respFind.Services.NotModified[0]).To(Equal(int64(0))) + Expect(respFind.Instances.NotModified[0]).To(Equal(int64(0))) By("find should return 200 even if consumer is diff apps") respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ @@ -1738,7 +1867,7 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(len(respFind.Updated[0].Instances)).To(Equal(0)) + Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(0)) By("shared service discovery") os.Setenv("CSE_SHARED_SERVICES", "query_instance_shared_provider") @@ -1763,8 +1892,8 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(len(respFind.Updated[0].Instances)).To(Equal(1)) - Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5)) + Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1)) + Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5)) respFind, err = instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{ ConsumerServiceId: serviceId7, @@ -1780,8 +1909,8 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(len(respFind.Updated[0].Instances)).To(Equal(1)) - Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5)) + Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1)) + Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5)) core.Service.Environment = pb.ENV_DEV }) @@ -1799,7 +1928,7 @@ var _ = Describe("'Instance' service", func() { Expect(respFind.Response.Code).To(Equal(code)) } - UTFunc(serviceId3, scerr.ErrPermissionDeny) + UTFunc(serviceId3, scerr.ErrServiceNotExists) UTFunc(serviceId1, pb.Response_SUCCESS) @@ -1934,7 +2063,7 @@ var _ = Describe("'Instance' service", func() { Tags: []string{"not-exist-tag"}, }) Expect(err).To(BeNil()) - Expect(resp.Response.Code).ToNot(Equal(pb.Response_SUCCESS)) + Expect(resp.Response.Code).To(Equal(scerr.ErrInstanceNotExists)) By("provider tag exist") resp, err = instanceResource.GetOneInstance(getContext(), @@ -1957,7 +2086,7 @@ var _ = Describe("'Instance' service", func() { ProviderInstanceId: instanceId2, }) Expect(err).To(BeNil()) - Expect(resp.Response.Code).ToNot(Equal(pb.Response_SUCCESS)) + Expect(resp.Response.Code).To(Equal(scerr.ErrInstanceNotExists)) respAll, err := instanceResource.GetInstances(getContext(), &pb.GetInstancesRequest{ ConsumerServiceId: serviceId3, diff --git a/server/service/instance_validator.go b/server/service/instance_validator.go index bebb2c31..dde95f90 100644 --- a/server/service/instance_validator.go +++ b/server/service/instance_validator.go @@ -62,9 +62,12 @@ func BatchFindInstanceReqValidator() *validate.Validator { var findServiceValidator validate.Validator findServiceValidator.AddRule("Service", &validate.ValidateRule{Min: 1}) findServiceValidator.AddSub("Service", ExistenceReqValidator()) + var findInstanceValidator validate.Validator + findInstanceValidator.AddRule("Instance", &validate.ValidateRule{Min: 1}) + findInstanceValidator.AddSub("Instance", HeartbeatReqValidator()) v.AddRule("ConsumerServiceId", GetInstanceReqValidator().GetRule("ConsumerServiceId")) - v.AddRule("Services", &validate.ValidateRule{Min: 1}) v.AddSub("Services", &findServiceValidator) + v.AddSub("Instances", &findInstanceValidator) }) } diff --git a/server/service/rule_test.go b/server/service/rule_test.go index 0603d4c8..2a6f9950 100644 --- a/server/service/rule_test.go +++ b/server/service/rule_test.go @@ -681,7 +681,7 @@ var _ = Describe("'Rule' service", func() { ProviderServiceId: providerBlack, }) Expect(err).To(BeNil()) - Expect(resp.Response.Code).To(Equal(scerr.ErrPermissionDeny)) + Expect(resp.Response.Code).To(Equal(scerr.ErrServiceNotExists)) By("consumer tag in black list") resp, err = instanceResource.GetInstances(getContext(), &pb.GetInstancesRequest{ @@ -689,7 +689,7 @@ var _ = Describe("'Rule' service", func() { ProviderServiceId: providerBlack, }) Expect(err).To(BeNil()) - Expect(resp.Response.Code).To(Equal(scerr.ErrPermissionDeny)) + Expect(resp.Response.Code).To(Equal(scerr.ErrServiceNotExists)) By("find should return 200 even if consumer permission deny") respFind, err := instanceResource.Find(getContext(), &pb.FindInstancesRequest{ @@ -725,7 +725,7 @@ var _ = Describe("'Rule' service", func() { ProviderServiceId: providerWhite, }) Expect(err).To(BeNil()) - Expect(resp.Response.Code).To(Equal(scerr.ErrPermissionDeny)) + Expect(resp.Response.Code).To(Equal(scerr.ErrServiceNotExists)) By("consumer version in white list") resp, err = instanceResource.GetInstances(getContext(), &pb.GetInstancesRequest{ diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go index 1e0e7916..fea3692a 100644 --- a/server/service/util/instance_util.go +++ b/server/service/util/instance_util.go @@ -273,12 +273,12 @@ func UpdateInstance(ctx context.Context, domainProject string, instance *pb.Micr return nil } -func AppendFindResponse(ctx context.Context, index int64, find *pb.FindInstancesResponse, +func AppendFindResponse(ctx context.Context, index int64, resp *pb.Response, instances []*pb.MicroServiceInstance, updatedResult *[]*pb.FindResult, notModifiedResult *[]int64, failedResult **pb.FindFailedResult) { - if code := find.GetResponse().GetCode(); code != pb.Response_SUCCESS { + if code := resp.GetCode(); code != pb.Response_SUCCESS { if *failedResult == nil { *failedResult = &pb.FindFailedResult{ - Error: scerr.NewError(code, find.GetResponse().GetMessage()), + Error: scerr.NewError(code, resp.GetMessage()), } } (*failedResult).Indexes = append((*failedResult).Indexes, index) @@ -292,7 +292,7 @@ func AppendFindResponse(ctx context.Context, index int64, find *pb.FindInstances } *updatedResult = append(*updatedResult, &pb.FindResult{ Index: index, - Instances: find.Instances, + Instances: instances, Rev: ov, }) } diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go index 50087f2e..9e9f72d6 100644 --- a/server/service/util/instance_util_test.go +++ b/server/service/util/instance_util_test.go @@ -125,7 +125,7 @@ func TestAppendFindResponse(t *testing.T) { notModifiedResult []int64 failedResult *pb.FindFailedResult ) - AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(ctx, 1, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) if updatedResult == nil || notModifiedResult != nil || failedResult != nil { t.Fatal("TestAppendFindResponse failed") } @@ -135,7 +135,7 @@ func TestAppendFindResponse(t *testing.T) { updatedResult = nil cloneCtx := context.WithValue(ctx, CTX_RESPONSE_REVISION, "1") - AppendFindResponse(cloneCtx, 1, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(cloneCtx, 1, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) if updatedResult == nil || notModifiedResult != nil || failedResult != nil { t.Fatal("TestAppendFindResponse failed") } @@ -146,7 +146,7 @@ func TestAppendFindResponse(t *testing.T) { updatedResult = nil cloneCtx = context.WithValue(ctx, CTX_REQUEST_REVISION, "1") cloneCtx = context.WithValue(cloneCtx, CTX_RESPONSE_REVISION, "1") - AppendFindResponse(cloneCtx, 1, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(cloneCtx, 1, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) if updatedResult != nil || notModifiedResult == nil || failedResult != nil { t.Fatal("TestAppendFindResponse failed") } @@ -156,7 +156,7 @@ func TestAppendFindResponse(t *testing.T) { notModifiedResult = nil find.Response = pb.CreateResponse(scerr.ErrInternal, "test") - AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(ctx, 1, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) if updatedResult != nil || notModifiedResult != nil || failedResult == nil { t.Fatal("TestAppendFindResponse failed") } @@ -164,7 +164,7 @@ func TestAppendFindResponse(t *testing.T) { t.Fatal("TestAppendFindResponse failed") } find.Response = pb.CreateResponse(scerr.ErrInvalidParams, "test") - AppendFindResponse(ctx, 2, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(ctx, 2, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) if updatedResult != nil || notModifiedResult != nil || failedResult == nil { t.Fatal("TestAppendFindResponse failed") } @@ -174,15 +174,15 @@ func TestAppendFindResponse(t *testing.T) { failedResult = nil find.Response = nil - AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult, &failedResult) - AppendFindResponse(ctx, 2, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(ctx, 1, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(ctx, 2, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) cloneCtx = context.WithValue(ctx, CTX_REQUEST_REVISION, "1") cloneCtx = context.WithValue(cloneCtx, CTX_RESPONSE_REVISION, "1") - AppendFindResponse(cloneCtx, 3, &find, &updatedResult, ¬ModifiedResult, &failedResult) - AppendFindResponse(cloneCtx, 4, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(cloneCtx, 3, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(cloneCtx, 4, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) find.Response = pb.CreateResponse(scerr.ErrInternal, "test") - AppendFindResponse(ctx, 5, &find, &updatedResult, ¬ModifiedResult, &failedResult) - AppendFindResponse(ctx, 6, &find, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(ctx, 5, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) + AppendFindResponse(ctx, 6, find.GetResponse(), find.Instances, &updatedResult, ¬ModifiedResult, &failedResult) if updatedResult == nil || notModifiedResult == nil || failedResult == nil { t.Fatal("TestAppendFindResponse failed") } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch microservices instances discovery API > ------------------------------------------- > > Key: SCB-1053 > URL: https://issues.apache.org/jira/browse/SCB-1053 > Project: Apache ServiceComb > Issue Type: Improvement > Components: Service-Center > Reporter: little-cui > Assignee: little-cui > Priority: Major > Fix For: service-center-1.2.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)