[ https://issues.apache.org/jira/browse/SCB-993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694761#comment-16694761 ]
ASF GitHub Bot commented on SCB-993: ------------------------------------ little-cui closed pull request #493: SCB-993 search sc cluster when key mismatch with aggregator cache URL: https://github.com/apache/servicecomb-service-center/pull/493 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 331666ea..bfd337ef 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -26,12 +26,12 @@ import ( type level1 struct { } -func (l *level1) Name(ctx context.Context) string { +func (l *level1) Name(ctx context.Context, _ *Node) string { return ctx.Value("key1").(string) } func (l *level1) Init(ctx context.Context, parent *Node) (node *Node, err error) { - p := l.Name(ctx) + p := l.Name(ctx, parent) if p == "err" { return nil, fmt.Errorf("wrong logic") } @@ -51,7 +51,7 @@ type level2 struct { changed string } -func (l *level2) Name(ctx context.Context) string { +func (l *level2) Name(ctx context.Context, _ *Node) string { return ctx.Value("key2").(string) } @@ -60,7 +60,7 @@ func (l *level2) Init(ctx context.Context, parent *Node) (node *Node, err error) return } - p := l.Name(ctx) + p := l.Name(ctx, parent) if p == "err" { return nil, fmt.Errorf("wrong logic") } diff --git a/pkg/cache/filter.go b/pkg/cache/filter.go index 51c64481..9652fcf9 100644 --- a/pkg/cache/filter.go +++ b/pkg/cache/filter.go @@ -19,6 +19,6 @@ package cache import "golang.org/x/net/context" type Filter interface { - Name(ctx context.Context) string + Name(ctx context.Context, parent *Node) string Init(ctx context.Context, parent *Node) (*Node, error) } diff --git a/pkg/cache/tree.go b/pkg/cache/tree.go index 9c89df08..d9adcf57 100644 --- a/pkg/cache/tree.go +++ b/pkg/cache/tree.go @@ -77,7 +77,7 @@ func (t *Tree) Remove(ctx context.Context) { return } - t.roots.Delete(t.filters[0].Name(ctx)) + t.roots.Delete(t.filters[0].Name(ctx, nil)) } func (t *Tree) getOrCreateRoot(ctx context.Context) (node *Node, err error) { @@ -86,7 +86,7 @@ func (t *Tree) getOrCreateRoot(ctx context.Context) (node *Node, err error) { } filter := t.filters[0] - name := filter.Name(ctx) + name := filter.Name(ctx, nil) item, err := t.roots.Fetch(name, t.Config.TTL(), func() (interface{}, error) { node, err := t.getOrCreateNode(ctx, 0, nil) if err != nil { @@ -108,7 +108,7 @@ func (t *Tree) getOrCreateRoot(ctx context.Context) (node *Node, err error) { func (t *Tree) getOrCreateNode(ctx context.Context, idx int, parent *Node) (node *Node, err error) { filter := t.filters[idx] - name := t.nodeFullName(filter.Name(ctx), parent) + name := t.nodeFullName(filter.Name(ctx, parent), parent) if parent == nil { // new a temp node diff --git a/pkg/client/sc/apis.go b/pkg/client/sc/apis.go index e0103af1..3fb8f1a5 100644 --- a/pkg/client/sc/apis.go +++ b/pkg/client/sc/apis.go @@ -25,17 +25,22 @@ import ( scerr "github.com/apache/servicecomb-service-center/server/error" "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" "github.com/apache/servicecomb-service-center/version" + "golang.org/x/net/context" "io/ioutil" "net/http" ) const ( - apiVersionURL = "/version" - apiDumpURL = "/v4/default/admin/dump" - apiClustersURL = "/v4/default/admin/clusters" - apiHealthURL = "/v4/default/registry/health" - apiSchemasURL = "/v4/%s/registry/microservices/%s/schemas" - apiSchemaURL = "/v4/%s/registry/microservices/%s/schemas/%s" + apiVersionURL = "/version" + apiDumpURL = "/v4/default/admin/dump" + apiClustersURL = "/v4/default/admin/clusters" + apiHealthURL = "/v4/default/registry/health" + apiSchemasURL = "/v4/%s/registry/microservices/%s/schemas" + apiSchemaURL = "/v4/%s/registry/microservices/%s/schemas/%s" + apiInstancesURL = "/v4/%s/registry/microservices/%s/instances" + apiInstanceURL = "/v4/%s/registry/microservices/%s/instances/%s" + + QueryGlobal = "global" ) func (c *SCClient) toError(body []byte) *scerr.Error { @@ -47,8 +52,18 @@ func (c *SCClient) toError(body []byte) *scerr.Error { return message } -func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) { - resp, err := c.RestDo(http.MethodGet, apiVersionURL, c.CommonHeaders(), nil) +func (c *SCClient) parseQuery(ctx context.Context) (q string) { + switch { + case ctx.Value(QueryGlobal) == "1": + q += "global=true" + default: + q += "global=false" + } + return +} + +func (c *SCClient) GetScVersion(ctx context.Context) (*version.VersionSet, *scerr.Error) { + resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiVersionURL, c.CommonHeaders(ctx), nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } @@ -66,18 +81,17 @@ func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) { v := &version.VersionSet{} err = json.Unmarshal(body, v) if err != nil { - fmt.Println(string(body)) return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } return v, nil } -func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) { - headers := c.CommonHeaders() +func (c *SCClient) GetScCache(ctx context.Context) (*model.Cache, *scerr.Error) { + headers := c.CommonHeaders(ctx) // only default domain has admin permission headers.Set("X-Domain-Name", "default") - resp, err := c.RestDo(http.MethodGet, apiDumpURL, headers, nil) + resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiDumpURL, headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } @@ -95,19 +109,18 @@ func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) { dump := &model.DumpResponse{} err = json.Unmarshal(body, dump) if err != nil { - fmt.Println(string(body)) return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } return dump.Cache, nil } -func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) { +func (c *SCClient) GetSchemasByServiceId(ctx context.Context, domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) { domain, project := core.FromDomainProject(domainProject) - headers := c.CommonHeaders() + headers := c.CommonHeaders(ctx) headers.Set("X-Domain-Name", domain) - resp, err := c.RestDo(http.MethodGet, - fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1", + resp, err := c.RestDoWithContext(ctx, http.MethodGet, + fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1&"+c.parseQuery(ctx), headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) @@ -126,19 +139,18 @@ func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb schemas := &pb.GetAllSchemaResponse{} err = json.Unmarshal(body, schemas) if err != nil { - fmt.Println(util.BytesToStringWithNoCopy(body)) return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } return schemas.Schemas, nil } -func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string) (*pb.Schema, *scerr.Error) { +func (c *SCClient) GetSchemaBySchemaId(ctx context.Context, domainProject, serviceId, schemaId string) (*pb.Schema, *scerr.Error) { domain, project := core.FromDomainProject(domainProject) - headers := c.CommonHeaders() + headers := c.CommonHeaders(ctx) headers.Set("X-Domain-Name", domain) - resp, err := c.RestDo(http.MethodGet, - fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId), + resp, err := c.RestDoWithContext(ctx, http.MethodGet, + fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId)+"?"+c.parseQuery(ctx), headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) @@ -157,7 +169,6 @@ func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string schema := &pb.GetSchemaResponse{} err = json.Unmarshal(body, schema) if err != nil { - fmt.Println(util.BytesToStringWithNoCopy(body)) return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } @@ -168,11 +179,11 @@ func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string }, nil } -func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) { - headers := c.CommonHeaders() +func (c *SCClient) GetClusters(ctx context.Context) (registry.Clusters, *scerr.Error) { + headers := c.CommonHeaders(ctx) // only default domain has admin permission headers.Set("X-Domain-Name", "default") - resp, err := c.RestDo(http.MethodGet, apiClustersURL, headers, nil) + resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiClustersURL, headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } @@ -190,18 +201,17 @@ func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) { clusters := &model.ClustersResponse{} err = json.Unmarshal(body, clusters) if err != nil { - fmt.Println(string(body)) return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } return clusters.Clusters, nil } -func (c *SCClient) HealthCheck() *scerr.Error { - headers := c.CommonHeaders() +func (c *SCClient) HealthCheck(ctx context.Context) *scerr.Error { + headers := c.CommonHeaders(ctx) // only default domain has admin permission headers.Set("X-Domain-Name", "default") - resp, err := c.RestDo(http.MethodGet, apiHealthURL, headers, nil) + resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiHealthURL, headers, nil) if err != nil { return scerr.NewError(scerr.ErrUnavailableBackend, err.Error()) } @@ -217,3 +227,65 @@ func (c *SCClient) HealthCheck() *scerr.Error { } return nil } + +func (c *SCClient) GetInstancesByServiceId(ctx context.Context, domainProject, providerId, consumerId string) ([]*pb.MicroServiceInstance, *scerr.Error) { + domain, project := core.FromDomainProject(domainProject) + headers := c.CommonHeaders(ctx) + headers.Set("X-Domain-Name", domain) + headers.Set("X-ConsumerId", consumerId) + resp, err := c.RestDoWithContext(ctx, http.MethodGet, + fmt.Sprintf(apiInstancesURL, project, providerId)+"?"+c.parseQuery(ctx), + headers, nil) + if err != nil { + return nil, scerr.NewError(scerr.ErrInternal, err.Error()) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, scerr.NewError(scerr.ErrInternal, err.Error()) + } + + if resp.StatusCode != http.StatusOK { + return nil, c.toError(body) + } + + instancesResp := &pb.GetInstancesResponse{} + err = json.Unmarshal(body, instancesResp) + if err != nil { + return nil, scerr.NewError(scerr.ErrInternal, err.Error()) + } + + return instancesResp.Instances, nil +} + +func (c *SCClient) GetInstanceByInstanceId(ctx context.Context, domainProject, providerId, instanceId, consumerId string) (*pb.MicroServiceInstance, *scerr.Error) { + domain, project := core.FromDomainProject(domainProject) + headers := c.CommonHeaders(ctx) + headers.Set("X-Domain-Name", domain) + headers.Set("X-ConsumerId", consumerId) + resp, err := c.RestDoWithContext(ctx, http.MethodGet, + fmt.Sprintf(apiInstanceURL, project, providerId, instanceId)+"?"+c.parseQuery(ctx), + headers, nil) + if err != nil { + return nil, scerr.NewError(scerr.ErrInternal, err.Error()) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, scerr.NewError(scerr.ErrInternal, err.Error()) + } + + if resp.StatusCode != http.StatusOK { + return nil, c.toError(body) + } + + instanceResp := &pb.GetOneInstanceResponse{} + err = json.Unmarshal(body, instanceResp) + if err != nil { + return nil, scerr.NewError(scerr.ErrInternal, err.Error()) + } + + return instanceResp.Instance, nil +} diff --git a/pkg/client/sc/client.go b/pkg/client/sc/client.go index 18ee9db2..570d5a2c 100644 --- a/pkg/client/sc/client.go +++ b/pkg/client/sc/client.go @@ -16,6 +16,7 @@ package sc import ( + "golang.org/x/net/context" "net/http" ) @@ -32,8 +33,9 @@ type SCClient struct { Cfg Config } -func (c *SCClient) CommonHeaders() http.Header { +func (c *SCClient) CommonHeaders(ctx context.Context) http.Header { var headers = make(http.Header) + // TODO overwrote by context values if len(c.Cfg.Token) > 0 { headers.Set("X-Auth-Token", c.Cfg.Token) } diff --git a/pkg/client/sc/client_lb.go b/pkg/client/sc/client_lb.go index d66b15ac..914b9020 100644 --- a/pkg/client/sc/client_lb.go +++ b/pkg/client/sc/client_lb.go @@ -19,6 +19,7 @@ import ( "github.com/apache/servicecomb-service-center/pkg/lb" "github.com/apache/servicecomb-service-center/pkg/rest" "github.com/apache/servicecomb-service-center/pkg/util" + "golang.org/x/net/context" "net/http" ) @@ -44,9 +45,9 @@ func (c *LBClient) Next() string { return c.LB.Next() } -func (c *LBClient) RestDo(method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) { +func (c *LBClient) RestDoWithContext(ctx context.Context, method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) { for i := 0; i < c.Retries; i++ { - resp, err = c.HttpDo(method, c.Next()+api, headers, body) + resp, err = c.HttpDoWithContext(ctx, method, c.Next()+api, headers, body) if err != nil { util.GetBackoff().Delay(i) continue diff --git a/pkg/rest/client.go b/pkg/rest/client.go index 495cf869..4a2d2f61 100644 --- a/pkg/rest/client.go +++ b/pkg/rest/client.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/apache/servicecomb-service-center/pkg/tlsutil" "github.com/apache/servicecomb-service-center/pkg/util" + "golang.org/x/net/context" "net/http" "net/url" "os" @@ -64,7 +65,7 @@ type URLClient struct { Cfg URLClientOption } -func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) { +func (client *URLClient) HttpDoWithContext(ctx context.Context, method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) { if strings.HasPrefix(rawURL, "https") { if transport, ok := client.Client.Transport.(*http.Transport); ok { transport.TLSClientConfig = client.TLS @@ -93,6 +94,7 @@ func (client *URLClient) HttpDo(method string, rawURL string, headers http.Heade if err != nil { return nil, errors.New(fmt.Sprintf("create request failed: %s", err.Error())) } + req = req.WithContext(ctx) req.Header = headers resp, err = client.Client.Do(req) @@ -122,6 +124,10 @@ func (client *URLClient) HttpDo(method string, rawURL string, headers http.Heade return resp, nil } +func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) { + return client.HttpDoWithContext(context.Background(), method, rawURL, headers, body) +} + func setOptionDefaultValue(o *URLClientOption) URLClientOption { if o == nil { return defaultURLClientOption diff --git a/pkg/util/util.go b/pkg/util/util.go index c89f9d5c..d824edfe 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -147,3 +147,11 @@ func ResetTimer(timer *time.Timer, d time.Duration) { } timer.Reset(d) } + +func StringTRUE(s string) bool { + s = strings.ToLower(strings.TrimSpace(s)) + if s == "1" || s == "true" { + return true + } + return false +} diff --git a/scctl/pkg/plugin/diagnose/diagnose.go b/scctl/pkg/plugin/diagnose/diagnose.go index 7d501263..b3e28339 100644 --- a/scctl/pkg/plugin/diagnose/diagnose.go +++ b/scctl/pkg/plugin/diagnose/diagnose.go @@ -65,7 +65,7 @@ func DiagnoseCommandFunc(_ *cobra.Command, args []string) { } // query sc - cache, scErr := scClient.GetScCache() + cache, scErr := scClient.GetScCache(context.Background()) if scErr != nil { cmd.StopAndExit(cmd.ExitError, scErr) } diff --git a/scctl/pkg/plugin/get/cluster/cluster_cmd.go b/scctl/pkg/plugin/get/cluster/cluster_cmd.go index c3d2327a..012c7ba8 100644 --- a/scctl/pkg/plugin/get/cluster/cluster_cmd.go +++ b/scctl/pkg/plugin/get/cluster/cluster_cmd.go @@ -21,6 +21,7 @@ import ( "github.com/apache/servicecomb-service-center/scctl/pkg/plugin/get" "github.com/apache/servicecomb-service-center/scctl/pkg/writer" "github.com/spf13/cobra" + "golang.org/x/net/context" ) func init() { @@ -43,7 +44,7 @@ func ClusterCommandFunc(_ *cobra.Command, args []string) { if err != nil { cmd.StopAndExit(cmd.ExitError, err) } - clusters, scErr := scClient.GetClusters() + clusters, scErr := scClient.GetClusters(context.Background()) if scErr != nil { cmd.StopAndExit(cmd.ExitError, scErr) } diff --git a/scctl/pkg/plugin/get/instance/instance_cmd.go b/scctl/pkg/plugin/get/instance/instance_cmd.go index e698ada8..4161e8c2 100644 --- a/scctl/pkg/plugin/get/instance/instance_cmd.go +++ b/scctl/pkg/plugin/get/instance/instance_cmd.go @@ -24,6 +24,7 @@ import ( admin "github.com/apache/servicecomb-service-center/server/admin/model" "github.com/apache/servicecomb-service-center/server/core" "github.com/spf13/cobra" + "golang.org/x/net/context" "strings" ) @@ -47,7 +48,7 @@ func InstanceCommandFunc(_ *cobra.Command, args []string) { if err != nil { cmd.StopAndExit(cmd.ExitError, err) } - cache, scErr := scClient.GetScCache() + cache, scErr := scClient.GetScCache(context.Background()) if scErr != nil { cmd.StopAndExit(cmd.ExitError, scErr) } diff --git a/scctl/pkg/plugin/get/schema/schema_cmd.go b/scctl/pkg/plugin/get/schema/schema_cmd.go index 97f9c7b8..412dc814 100644 --- a/scctl/pkg/plugin/get/schema/schema_cmd.go +++ b/scctl/pkg/plugin/get/schema/schema_cmd.go @@ -25,6 +25,7 @@ import ( adminModel "github.com/apache/servicecomb-service-center/server/admin/model" "github.com/apache/servicecomb-service-center/server/core" "github.com/spf13/cobra" + "golang.org/x/net/context" "io" "io/ioutil" "os" @@ -80,7 +81,7 @@ func SchemaCommandFunc(_ *cobra.Command, args []string) { if err != nil { cmd.StopAndExit(cmd.ExitError, err) } - cache, scErr := scClient.GetScCache() + cache, scErr := scClient.GetScCache(context.Background()) if scErr != nil { cmd.StopAndExit(cmd.ExitError, scErr) } @@ -109,7 +110,7 @@ func SchemaCommandFunc(_ *cobra.Command, args []string) { continue } - schemas, err := scClient.GetSchemasByServiceId(domainProject, ms.Value.ServiceId) + schemas, err := scClient.GetSchemasByServiceId(context.Background(), domainProject, ms.Value.ServiceId) if err != nil { cmd.StopAndExit(cmd.ExitError, err) } diff --git a/scctl/pkg/plugin/get/service/service_cmd.go b/scctl/pkg/plugin/get/service/service_cmd.go index 70df59b2..3c63f41c 100644 --- a/scctl/pkg/plugin/get/service/service_cmd.go +++ b/scctl/pkg/plugin/get/service/service_cmd.go @@ -24,6 +24,7 @@ import ( "github.com/apache/servicecomb-service-center/scctl/pkg/writer" "github.com/apache/servicecomb-service-center/server/core" "github.com/spf13/cobra" + "golang.org/x/net/context" "strings" ) @@ -48,7 +49,7 @@ func ServiceCommandFunc(_ *cobra.Command, args []string) { if err != nil { cmd.StopAndExit(cmd.ExitError, err) } - cache, scErr := scClient.GetScCache() + cache, scErr := scClient.GetScCache(context.Background()) if scErr != nil { cmd.StopAndExit(cmd.ExitError, scErr) } diff --git a/scctl/pkg/plugin/health/cmd.go b/scctl/pkg/plugin/health/cmd.go index c9326056..c9e5c313 100644 --- a/scctl/pkg/plugin/health/cmd.go +++ b/scctl/pkg/plugin/health/cmd.go @@ -20,6 +20,7 @@ import ( "github.com/apache/servicecomb-service-center/scctl/pkg/cmd" scerr "github.com/apache/servicecomb-service-center/server/error" "github.com/spf13/cobra" + "golang.org/x/net/context" ) const ( @@ -48,7 +49,7 @@ func HealthCommandFunc(_ *cobra.Command, args []string) { if err != nil { cmd.StopAndExit(ExistInternal, err) } - scErr := scClient.HealthCheck() + scErr := scClient.HealthCheck(context.Background()) if scErr != nil { switch scErr.Code { case scerr.ErrUnavailableBackend: diff --git a/scctl/pkg/plugin/version/cmd.go b/scctl/pkg/plugin/version/cmd.go index cba91727..877d23b2 100644 --- a/scctl/pkg/plugin/version/cmd.go +++ b/scctl/pkg/plugin/version/cmd.go @@ -21,6 +21,7 @@ import ( "github.com/apache/servicecomb-service-center/scctl/pkg/cmd" "github.com/apache/servicecomb-service-center/scctl/pkg/version" "github.com/spf13/cobra" + "golang.org/x/net/context" ) var ( @@ -51,7 +52,7 @@ func VersionCommandFunc(_ *cobra.Command, _ []string) { if err != nil { return } - v, err := scClient.GetScVersion() + v, err := scClient.GetScVersion(context.Background()) if err != nil { return } diff --git a/server/handler/cache/cache.go b/server/handler/cache/cache.go index 4bb6fd5d..adc3e664 100644 --- a/server/handler/cache/cache.go +++ b/server/handler/cache/cache.go @@ -19,6 +19,7 @@ package cache import ( "github.com/apache/servicecomb-service-center/pkg/chain" "github.com/apache/servicecomb-service-center/pkg/rest" + "github.com/apache/servicecomb-service-center/pkg/util" serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "net/http" ) @@ -32,17 +33,18 @@ func (l *CacheResponse) Handle(i *chain.Invocation) { r := i.Context().Value(rest.CTX_REQUEST).(*http.Request) query := r.URL.Query() - if r.Method != http.MethodGet { - i.WithContext(serviceUtil.CTX_REGISTRYONLY, "1") + global := util.StringTRUE(query.Get(serviceUtil.CTX_GLOBAL)) + if global && r.Method == http.MethodGet { + i.WithContext(serviceUtil.CTX_GLOBAL, "1") } - noCache := query.Get(serviceUtil.CTX_NOCACHE) == "1" + noCache := util.StringTRUE(query.Get(serviceUtil.CTX_NOCACHE)) if noCache { i.WithContext(serviceUtil.CTX_NOCACHE, "1") return } - cacheOnly := query.Get(serviceUtil.CTX_CACHEONLY) == "1" + cacheOnly := util.StringTRUE(query.Get(serviceUtil.CTX_CACHEONLY)) if cacheOnly { i.WithContext(serviceUtil.CTX_CACHEONLY, "1") return diff --git a/server/plugin/pkg/discovery/aggregate/indexer.go b/server/plugin/pkg/discovery/aggregate/indexer.go index 912c9977..b524a692 100644 --- a/server/plugin/pkg/discovery/aggregate/indexer.go +++ b/server/plugin/pkg/discovery/aggregate/indexer.go @@ -17,7 +17,6 @@ package aggregate import ( "github.com/apache/servicecomb-service-center/pkg/util" - "github.com/apache/servicecomb-service-center/server/core/backend" "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" "golang.org/x/net/context" @@ -42,9 +41,9 @@ func (i *AdaptorsIndexer) Search(ctx context.Context, opts ...registry.PluginOpO if _, ok := exists[key]; !ok { exists[key] = struct{}{} response.Kvs = append(response.Kvs, kv) - response.Count += 1 } } + response.Count += resp.Count } return &response, nil } @@ -54,31 +53,48 @@ func NewAdaptorsIndexer(as []discovery.Adaptor) *AdaptorsIndexer { } type AggregatorIndexer struct { - Indexer discovery.Indexer - Registry discovery.Indexer + *discovery.CacheIndexer + AdaptorsIndexer discovery.Indexer + LocalIndexer discovery.Indexer } -func (i *AggregatorIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (*discovery.Response, error) { +func (i *AggregatorIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (resp *discovery.Response, err error) { + op := registry.OpGet(opts...) + + if op.NoCache() || !op.Global { + return i.search(ctx, opts...) + } + + resp, err = i.CacheIndexer.Search(ctx, opts...) + if err != nil { + return + } + + if resp.Count > 0 || op.CacheOnly() { + return resp, nil + } + + return i.search(ctx, opts...) +} + +func (i *AggregatorIndexer) search(ctx context.Context, opts ...registry.PluginOpOption) (*discovery.Response, error) { op := registry.OptionsToOp(opts...) - if op.RegistryOnly { - return i.Registry.Search(ctx, opts...) + if !op.Global { + return i.LocalIndexer.Search(ctx, opts...) } - return i.Indexer.Search(ctx, opts...) + return i.AdaptorsIndexer.Search(ctx, opts...) } func NewAggregatorIndexer(as *Aggregator) *AggregatorIndexer { - ai := &AggregatorIndexer{} - switch as.Type { - case backend.SCHEMA: - // schema does not been cached - ai.Indexer = NewAdaptorsIndexer(as.Adaptors) - default: - ai.Indexer = discovery.NewCacheIndexer(as.Cache()) + indexer := NewAdaptorsIndexer(as.Adaptors) + ai := &AggregatorIndexer{ + CacheIndexer: discovery.NewCacheIndexer(as.Cache()), + AdaptorsIndexer: indexer, + LocalIndexer: indexer, } - ai.Registry = ai.Indexer if registryIndex >= 0 { - ai.Registry = as.Adaptors[registryIndex] + ai.LocalIndexer = as.Adaptors[registryIndex] } return ai } diff --git a/server/plugin/pkg/discovery/cacher.go b/server/plugin/pkg/discovery/cacher.go index 9a9f8841..086d928e 100644 --- a/server/plugin/pkg/discovery/cacher.go +++ b/server/plugin/pkg/discovery/cacher.go @@ -40,7 +40,7 @@ func (c *CommonCacher) Notify(action proto.EventType, key string, kv *KeyValue) default: c.cache.Put(key, kv) } - c.OnEvent(KvEvent{Type: action, KV: kv}) + c.OnEvent(KvEvent{Type: action, KV: kv, Revision: kv.ModRevision}) } func (c *CommonCacher) OnEvent(evt KvEvent) { diff --git a/server/plugin/pkg/discovery/etcd/indexer_cache.go b/server/plugin/pkg/discovery/etcd/indexer_cache.go index 35e7ac6c..09b371ad 100644 --- a/server/plugin/pkg/discovery/etcd/indexer_cache.go +++ b/server/plugin/pkg/discovery/etcd/indexer_cache.go @@ -33,10 +33,7 @@ func (i *CacheIndexer) Search(ctx context.Context, opts ...registry.PluginOpOpti op := registry.OpGet(opts...) key := util.BytesToStringWithNoCopy(op.Key) - if i.Cache == nil || - op.Mode == registry.MODE_NO_CACHE || - op.Revision > 0 || - (op.Offset >= 0 && op.Limit > 0) { + if op.NoCache() { return i.EtcdIndexer.Search(ctx, opts...) } @@ -49,7 +46,7 @@ func (i *CacheIndexer) Search(ctx context.Context, opts ...registry.PluginOpOpti return nil, err } - if resp.Count > 0 || op.Mode == registry.MODE_CACHE { + if resp.Count > 0 || op.CacheOnly() { return resp, nil } diff --git a/server/plugin/pkg/discovery/etcd/indexer_etcd.go b/server/plugin/pkg/discovery/etcd/indexer_etcd.go index 49e6d03a..a38e9baf 100644 --- a/server/plugin/pkg/discovery/etcd/indexer_etcd.go +++ b/server/plugin/pkg/discovery/etcd/indexer_etcd.go @@ -69,7 +69,7 @@ func (i *EtcdIndexer) Search(ctx context.Context, opts ...registry.PluginOpOptio kvs := make([]*discovery.KeyValue, 0, len(resp.Kvs)) for _, src := range resp.Kvs { - kv := new(discovery.KeyValue) + kv := discovery.NewKeyValue() if err = FromEtcdKeyValue(kv, src, p); err != nil { continue } diff --git a/server/plugin/pkg/discovery/servicecenter/adaptor.go b/server/plugin/pkg/discovery/servicecenter/adaptor.go index 04a87196..77e00868 100644 --- a/server/plugin/pkg/discovery/servicecenter/adaptor.go +++ b/server/plugin/pkg/discovery/servicecenter/adaptor.go @@ -48,13 +48,14 @@ func (se *ServiceCenterAdaptor) Ready() <-chan struct{} { func NewServiceCenterAdaptor(t discovery.Type, cfg *discovery.Config) *ServiceCenterAdaptor { if t == backend.SCHEMA { return &ServiceCenterAdaptor{ - Indexer: GetOrCreateClusterIndexer(), + Indexer: NewClusterIndexer(t, discovery.NullCache), Cacher: discovery.NullCacher, } - } - cache := discovery.NewKvCache(t.String(), cfg) - return &ServiceCenterAdaptor{ - Indexer: discovery.NewCacheIndexer(cache), - Cacher: BuildCacher(t, cfg, cache), + } else { + cache := discovery.NewKvCache(t.String(), cfg) + return &ServiceCenterAdaptor{ + Indexer: NewClusterIndexer(t, cache), + Cacher: BuildCacher(t, cfg, cache), + } } } diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go b/server/plugin/pkg/discovery/servicecenter/aggregate.go index 1cb097b0..bafa2963 100644 --- a/server/plugin/pkg/discovery/servicecenter/aggregate.go +++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go @@ -19,17 +19,25 @@ import ( "crypto/tls" "github.com/apache/servicecomb-service-center/pkg/client/sc" "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/admin/model" - pb "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/core" scerr "github.com/apache/servicecomb-service-center/server/error" mgr "github.com/apache/servicecomb-service-center/server/plugin" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + "golang.org/x/net/context" "strings" + "sync" ) -type SCClientAggregate []*sc.SCClient +var ( + scClient *SCClientAggregate + clientOnce sync.Once + clientTLS *tls.Config +) -var clientTLS *tls.Config +type SCClientAggregate []*sc.SCClient func getClientTLS() (*tls.Config, error) { if clientTLS != nil { @@ -40,11 +48,11 @@ func getClientTLS() (*tls.Config, error) { return clientTLS, err } -func (c *SCClientAggregate) GetScCache() (*model.Cache, map[string]error) { +func (c *SCClientAggregate) GetScCache(ctx context.Context) (*model.Cache, map[string]error) { var caches *model.Cache errs := make(map[string]error) for _, client := range *c { - cache, err := client.GetScCache() + cache, err := client.GetScCache(ctx) if err != nil { errs[client.Cfg.Name] = err continue @@ -53,72 +61,148 @@ func (c *SCClientAggregate) GetScCache() (*model.Cache, map[string]error) { if caches == nil { caches = &model.Cache{} } - caches.Microservices = append(caches.Microservices, cache.Microservices...) - caches.Indexes = append(caches.Indexes, cache.Indexes...) - caches.Aliases = append(caches.Aliases, cache.Aliases...) - caches.Tags = append(caches.Tags, cache.Tags...) - caches.Rules = append(caches.Rules, cache.Rules...) - caches.RuleIndexes = append(caches.RuleIndexes, cache.RuleIndexes...) - caches.DependencyRules = append(caches.DependencyRules, cache.DependencyRules...) - caches.Summaries = append(caches.Summaries, cache.Summaries...) - caches.Instances = append(caches.Instances, cache.Instances...) + c.cacheAppend(client.Cfg.Name, &caches.Microservices, &cache.Microservices) + c.cacheAppend(client.Cfg.Name, &caches.Indexes, &cache.Indexes) + c.cacheAppend(client.Cfg.Name, &caches.Aliases, &cache.Aliases) + c.cacheAppend(client.Cfg.Name, &caches.Tags, &cache.Tags) + c.cacheAppend(client.Cfg.Name, &caches.Rules, &cache.Rules) + c.cacheAppend(client.Cfg.Name, &caches.RuleIndexes, &cache.RuleIndexes) + c.cacheAppend(client.Cfg.Name, &caches.DependencyRules, &cache.DependencyRules) + c.cacheAppend(client.Cfg.Name, &caches.Summaries, &cache.Summaries) + c.cacheAppend(client.Cfg.Name, &caches.Instances, &cache.Instances) } return caches, errs } -func (c *SCClientAggregate) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) { - var schemas []*pb.Schema +func (c *SCClientAggregate) cacheAppend(name string, setter model.Setter, getter model.Getter) { + getter.ForEach(func(_ int, v *model.KV) bool { + v.ClusterName = name + setter.SetValue(v) + return true + }) +} + +func (c *SCClientAggregate) GetSchemasByServiceId(ctx context.Context, domainProject, serviceId string) (*discovery.Response, *scerr.Error) { + var response discovery.Response for _, client := range *c { - ss, err := client.GetSchemasByServiceId(domainProject, serviceId) + schemas, err := client.GetSchemasByServiceId(ctx, domainProject, serviceId) if err != nil && err.InternalError() { log.Errorf(err, "get schema by serviceId[%s/%s] failed", domainProject, serviceId) continue } - schemas = append(schemas, ss...) + if schemas == nil { + continue + } + response.Count = int64(len(schemas)) + for _, schema := range schemas { + response.Kvs = append(response.Kvs, &discovery.KeyValue{ + Key: []byte(core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)), + Value: util.StringToBytesWithNoCopy(schema.Schema), + ModRevision: 0, + ClusterName: client.Cfg.Name, + }) + } + return &response, nil } - - return schemas, nil + return &response, nil } -func (c *SCClientAggregate) GetSchemaBySchemaId(domainProject, serviceId, schemaId string) (schema *pb.Schema, err *scerr.Error) { +func (c *SCClientAggregate) GetSchemaBySchemaId(ctx context.Context, domainProject, serviceId, schemaId string) (*discovery.Response, *scerr.Error) { + var response discovery.Response for _, client := range *c { - schema, err = client.GetSchemaBySchemaId(domainProject, serviceId, schemaId) + schema, err := client.GetSchemaBySchemaId(ctx, domainProject, serviceId, schemaId) if err != nil && err.InternalError() { log.Errorf(err, "get schema by serviceId[%s/%s] failed", domainProject, serviceId) continue } - if schema != nil { - return + if schema == nil { + continue } + response.Count = 1 + response.Kvs = append(response.Kvs, &discovery.KeyValue{ + Key: []byte(core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)), + Value: util.StringToBytesWithNoCopy(schema.Schema), + ModRevision: 0, + ClusterName: client.Cfg.Name, + }) + return &response, nil } + return &response, nil +} - return +func (c *SCClientAggregate) GetInstancesByServiceId(ctx context.Context, domainProject, providerId, consumerId string) (*discovery.Response, *scerr.Error) { + var response discovery.Response + for _, client := range *c { + insts, err := client.GetInstancesByServiceId(ctx, domainProject, providerId, consumerId) + if err != nil && err.InternalError() { + log.Errorf(err, "consumer[%s] get provider[%s/%s] instances failed", consumerId, domainProject, providerId) + continue + } + if insts == nil { + continue + } + response.Count = int64(len(insts)) + for _, instance := range insts { + response.Kvs = append(response.Kvs, &discovery.KeyValue{ + Key: []byte(core.GenerateInstanceKey(domainProject, providerId, instance.InstanceId)), + Value: instance, + ModRevision: 0, + ClusterName: client.Cfg.Name, + }) + } + } + return &response, nil } -func NewSCClientAggregate() *SCClientAggregate { - c := &SCClientAggregate{} - clusters := registry.Configuration().Clusters - for name, endpoints := range clusters { - if len(name) == 0 || name == registry.Configuration().ClusterName { +func (c *SCClientAggregate) GetInstanceByInstanceId(ctx context.Context, domainProject, providerId, instanceId, consumerId string) (*discovery.Response, *scerr.Error) { + var response discovery.Response + for _, client := range *c { + instance, err := client.GetInstanceByInstanceId(ctx, domainProject, providerId, instanceId, consumerId) + if err != nil && err.InternalError() { + log.Errorf(err, "consumer[%s] get provider[%s/%s] instances failed", consumerId, domainProject, providerId) continue } - client, err := sc.NewSCClient(sc.Config{Name: name, Endpoints: endpoints}) - if err != nil { - log.Errorf(err, "new service center[%s]%v client failed", name, endpoints) + if instance == nil { continue } - client.Timeout = registry.Configuration().RequestTimeOut - // TLS - if strings.Index(endpoints[0], "https") >= 0 { - client.TLS, err = getClientTLS() + response.Count = 1 + response.Kvs = append(response.Kvs, &discovery.KeyValue{ + Key: []byte(core.GenerateInstanceKey(domainProject, providerId, instance.InstanceId)), + Value: instance, + ModRevision: 0, + ClusterName: client.Cfg.Name, + }) + return &response, nil + } + return &response, nil +} + +func GetOrCreateSCClient() *SCClientAggregate { + clientOnce.Do(func() { + scClient = &SCClientAggregate{} + clusters := registry.Configuration().Clusters + for name, endpoints := range clusters { + if len(name) == 0 || name == registry.Configuration().ClusterName { + continue + } + client, err := sc.NewSCClient(sc.Config{Name: name, Endpoints: endpoints}) if err != nil { - log.Errorf(err, "get service center[%s]%v tls config failed", name, endpoints) + log.Errorf(err, "new service center[%s]%v client failed", name, endpoints) continue } - } + client.Timeout = registry.Configuration().RequestTimeOut + // TLS + if strings.Index(endpoints[0], "https") >= 0 { + client.TLS, err = getClientTLS() + if err != nil { + log.Errorf(err, "get service center[%s]%v tls config failed", name, endpoints) + continue + } + } - *c = append(*c, client) - log.Infof("new service center[%s]%v client", name, endpoints) - } - return c + *scClient = append(*scClient, client) + log.Infof("new service center[%s]%v client", name, endpoints) + } + }) + return scClient } diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate_test.go b/server/plugin/pkg/discovery/servicecenter/aggregate_test.go index e906c2bf..3cf6c5c3 100644 --- a/server/plugin/pkg/discovery/servicecenter/aggregate_test.go +++ b/server/plugin/pkg/discovery/servicecenter/aggregate_test.go @@ -23,7 +23,7 @@ import ( func TestNewSCClientAggregate(t *testing.T) { registry.Configuration().ClusterAddresses = "sc-1=127.0.0.1:2379,127.0.0.2:2379" registry.Configuration().InitClusters() - c := NewSCClientAggregate() + c := GetOrCreateSCClient() if len(*c) == 0 { t.Fatalf("TestNewSCClientAggregate failed") } diff --git a/server/plugin/pkg/discovery/servicecenter/cacher.go b/server/plugin/pkg/discovery/servicecenter/cacher.go index 06347007..73faae66 100644 --- a/server/plugin/pkg/discovery/servicecenter/cacher.go +++ b/server/plugin/pkg/discovery/servicecenter/cacher.go @@ -24,7 +24,7 @@ type ServiceCenterCacher struct { } func (c *ServiceCenterCacher) Ready() <-chan struct{} { - return GetOrCreateClusterIndexer().Ready() + return closedCh } func NewServiceCenterCacher(cfg *discovery.Config, cache discovery.Cache) *ServiceCenterCacher { @@ -35,6 +35,6 @@ func NewServiceCenterCacher(cfg *discovery.Config, cache discovery.Cache) *Servi func BuildCacher(t discovery.Type, cfg *discovery.Config, cache discovery.Cache) discovery.Cacher { cr := NewServiceCenterCacher(cfg, cache) - GetOrCreateClusterIndexer().AddCacher(t, cr) + GetOrCreateSyncer().AddCacher(t, cr) return cr } diff --git a/server/plugin/pkg/discovery/servicecenter/indexer.go b/server/plugin/pkg/discovery/servicecenter/indexer.go index 746cfa34..5ac47687 100644 --- a/server/plugin/pkg/discovery/servicecenter/indexer.go +++ b/server/plugin/pkg/discovery/servicecenter/indexer.go @@ -16,262 +16,97 @@ package servicecenter import ( - "fmt" - "github.com/apache/servicecomb-service-center/pkg/gopool" + "github.com/apache/servicecomb-service-center/pkg/client/sc" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" - "github.com/apache/servicecomb-service-center/server/admin/model" "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/core/backend" - pb "github.com/apache/servicecomb-service-center/server/core/proto" + scerr "github.com/apache/servicecomb-service-center/server/error" "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" "golang.org/x/net/context" - "strings" - "sync" - "time" -) - -var ( - cluster *ClusterIndexer - clusterOnce sync.Once ) type ClusterIndexer struct { + *discovery.CacheIndexer Client *SCClientAggregate - - cachers map[discovery.Type]*ServiceCenterCacher -} - -func (c *ClusterIndexer) Initialize() { - c.cachers = make(map[discovery.Type]*ServiceCenterCacher) - c.Client = NewSCClientAggregate() + Type discovery.Type } -func (c *ClusterIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (r *discovery.Response, err error) { +func (i *ClusterIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (resp *discovery.Response, err error) { op := registry.OpGet(opts...) - key := util.BytesToStringWithNoCopy(op.Key) - switch { - case strings.Index(key, core.GetServiceSchemaRootKey("")) == 0: - domainProject, serviceId, schemaId := core.GetInfoFromSchemaKV(op.Key) - var schemas []*pb.Schema - if op.Prefix && len(schemaId) == 0 { - schemas, err = c.Client.GetSchemasByServiceId(domainProject, serviceId) - if err != nil { - return nil, err - } - } else { - schema, err := c.Client.GetSchemaBySchemaId(domainProject, serviceId, schemaId) - if err != nil { - return nil, err - } - schemas = append(schemas, schema) - } - var response discovery.Response - response.Count = int64(len(schemas)) - if op.CountOnly { - return &response, nil - } - for _, schema := range schemas { - response.Kvs = append(response.Kvs, &discovery.KeyValue{ - Key: util.StringToBytesWithNoCopy( - core.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)), - Value: util.StringToBytesWithNoCopy(schema.Schema), - }) - } - return &response, nil - default: - return nil, fmt.Errorf("no implement") - } -} -func (c *ClusterIndexer) Sync(ctx context.Context) error { - cache, errs := c.Client.GetScCache() - if cache == nil && len(errs) > 0 { - err := fmt.Errorf("%v", errs) - log.Errorf(err, "sync failed") - return err + if op.NoCache() { + return i.search(ctx, opts...) } - // microservice - serviceCacher, ok := c.cachers[backend.SERVICE] - if ok { - c.check(serviceCacher, &cache.Microservices, errs) + resp, err = i.CacheIndexer.Search(ctx, opts...) + if err != nil { + return } - aliasCacher, ok := c.cachers[backend.SERVICE_ALIAS] - if ok { - c.checkWithConflictHandleFunc(aliasCacher, &cache.Aliases, errs, c.logConflictFunc) - } - indexCacher, ok := c.cachers[backend.SERVICE_INDEX] - if ok { - c.checkWithConflictHandleFunc(indexCacher, &cache.Indexes, errs, c.logConflictFunc) - } - // instance - instCacher, ok := c.cachers[backend.INSTANCE] - if ok { - c.check(instCacher, &cache.Instances, errs) - } - // microservice meta - tagCacher, ok := c.cachers[backend.SERVICE_TAG] - if ok { - c.check(tagCacher, &cache.Tags, errs) - } - ruleCacher, ok := c.cachers[backend.RULE] - if ok { - c.check(ruleCacher, &cache.Rules, errs) - } - ruleIndexCacher, ok := c.cachers[backend.RULE_INDEX] - if ok { - c.check(ruleIndexCacher, &cache.RuleIndexes, errs) - } - depRuleCacher, ok := c.cachers[backend.DEPENDENCY_RULE] - if ok { - c.check(depRuleCacher, &cache.DependencyRules, errs) + + if resp.Count > 0 || op.CacheOnly() { + return resp, nil } - return nil -} -func (c *ClusterIndexer) check(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error) { - c.checkWithConflictHandleFunc(local, remote, skipClusters, c.skipHandleFunc) + return i.search(ctx, opts...) } -func (c *ClusterIndexer) checkWithConflictHandleFunc(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error, - conflictHandleFunc func(origin *model.KV, conflict model.Getter, index int)) { - exists := make(map[string]*model.KV) - remote.ForEach(func(i int, v *model.KV) bool { - // because the result of the remote return may contain the same data as - // the local cache of the current SC. So we need to ignore it and - // prevent the aggregation result from increasing. - if v.ClusterName == registry.Configuration().ClusterName { - return true - } - if kv, ok := exists[v.Key]; ok { - conflictHandleFunc(kv, remote, i) - return true - } - exists[v.Key] = v - kv := local.Cache().Get(v.Key) - newKv := &discovery.KeyValue{ - Key: util.StringToBytesWithNoCopy(v.Key), - Value: v.Value, - ModRevision: v.Rev, - ClusterName: v.ClusterName, - } - switch { - case kv == nil: - newKv.Version = 1 - newKv.CreateRevision = v.Rev - local.Notify(pb.EVT_CREATE, v.Key, newKv) - case kv.ModRevision != v.Rev: - // if connect to some cluster failed, then skip to notify changes - // of these clusters to prevent publish the wrong changes events of kvs. - if err, ok := skipClusters[kv.ClusterName]; ok { - log.Errorf(err, "cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s", - kv.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key) - break - } - newKv.Version = kv.ModRevision - kv.ModRevision - newKv.CreateRevision = kv.CreateRevision - local.Notify(pb.EVT_UPDATE, v.Key, newKv) - } - return true - }) +func (i *ClusterIndexer) search(ctx context.Context, opts ...registry.PluginOpOption) (r *discovery.Response, err error) { + op := registry.OpGet(opts...) + key := util.BytesToStringWithNoCopy(op.Key) - var deletes []*discovery.KeyValue - local.Cache().ForEach(func(key string, v *discovery.KeyValue) (next bool) { - var exist bool - remote.ForEach(func(_ int, v *model.KV) bool { - if v.ClusterName == registry.Configuration().ClusterName { - return true - } - exist = v.Key == key - return !exist - }) - if !exist { - if err, ok := skipClusters[v.ClusterName]; ok { - log.Errorf(err, "cluster[%s] temporarily unavailable, skip event %s %s", - v.ClusterName, pb.EVT_DELETE, v.Key) - return true - } - deletes = append(deletes, v) - } - return true - }) - for _, v := range deletes { - local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v) + ctx = context.WithValue(ctx, sc.QueryGlobal, "0") + switch i.Type { + case backend.SCHEMA: + r, err = i.searchSchemas(ctx, op) + case backend.INSTANCE: + r, err = i.searchInstances(ctx, op) + default: + return &discovery.Response{}, nil } + log.Debugf("search '%s' match special options, request sc server, opts: %s", key, op) + return } -func (c *ClusterIndexer) skipHandleFunc(origin *model.KV, conflict model.Getter, index int) { -} - -func (c *ClusterIndexer) logConflictFunc(origin *model.KV, conflict model.Getter, index int) { - switch conflict.(type) { - case *model.MicroserviceIndexSlice: - slice := conflict.(*model.MicroserviceIndexSlice) - kv := (*slice)[index] - if serviceId := origin.Value.(string); kv.Value != serviceId { - key := core.GetInfoFromSvcIndexKV(util.StringToBytesWithNoCopy(kv.Key)) - log.Warnf("conflict! can not merge microservice index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]", - kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version, - serviceId, origin.ClusterName) - } - case *model.MicroserviceAliasSlice: - slice := conflict.(*model.MicroserviceAliasSlice) - kv := (*slice)[index] - if serviceId := origin.Value.(string); kv.Value != serviceId { - key := core.GetInfoFromSvcAliasKV(util.StringToBytesWithNoCopy(kv.Key)) - log.Warnf("conflict! can not merge microservice alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]", - kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version, - serviceId, origin.ClusterName) - } +func (i *ClusterIndexer) searchSchemas(ctx context.Context, op registry.PluginOp) (*discovery.Response, error) { + var ( + resp *discovery.Response + scErr *scerr.Error + ) + domainProject, serviceId, schemaId := core.GetInfoFromSchemaKV(op.Key) + if op.Prefix && len(schemaId) == 0 { + resp, scErr = i.Client.GetSchemasByServiceId(ctx, domainProject, serviceId) + } else { + resp, scErr = i.Client.GetSchemaBySchemaId(ctx, domainProject, serviceId, schemaId) } -} - -func (c *ClusterIndexer) loop(ctx context.Context) { - select { - case <-ctx.Done(): - case <-time.After(minWaitInterval): - c.Sync(ctx) - d := registry.Configuration().AutoSyncInterval - if d == 0 { - return - } - loop: - for { - select { - case <-ctx.Done(): - break loop - case <-time.After(d): - // TODO support watching sc - c.Sync(ctx) - } - } + if scErr != nil { + return nil, scErr } - - log.Debug("service center client is stopped") + return resp, nil } -// unsafe -func (c *ClusterIndexer) AddCacher(t discovery.Type, cacher *ServiceCenterCacher) { - c.cachers[t] = cacher -} - -func (c *ClusterIndexer) Run() { - c.Initialize() - gopool.Go(c.loop) -} - -func (c *ClusterIndexer) Stop() {} - -func (c *ClusterIndexer) Ready() <-chan struct{} { - return closedCh +func (i *ClusterIndexer) searchInstances(ctx context.Context, op registry.PluginOp) (r *discovery.Response, err error) { + var ( + resp *discovery.Response + scErr *scerr.Error + ) + serviceId, instanceId, domainProject := core.GetInfoFromInstKV(op.Key) + if op.Prefix && len(instanceId) == 0 { + resp, scErr = i.Client.GetInstancesByServiceId(ctx, domainProject, serviceId, "") + } else { + resp, scErr = i.Client.GetInstanceByInstanceId(ctx, domainProject, serviceId, instanceId, "") + } + if scErr != nil { + return nil, scErr + } + return resp, nil } -func GetOrCreateClusterIndexer() *ClusterIndexer { - clusterOnce.Do(func() { - cluster = &ClusterIndexer{} - cluster.Run() - }) - return cluster +func NewClusterIndexer(t discovery.Type, cache discovery.Cache) *ClusterIndexer { + return &ClusterIndexer{ + CacheIndexer: discovery.NewCacheIndexer(cache), + Client: GetOrCreateSCClient(), + Type: t, + } } diff --git a/server/plugin/pkg/discovery/servicecenter/syncer.go b/server/plugin/pkg/discovery/servicecenter/syncer.go new file mode 100644 index 00000000..c504c28a --- /dev/null +++ b/server/plugin/pkg/discovery/servicecenter/syncer.go @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicecenter + +import ( + "fmt" + "github.com/apache/servicecomb-service-center/pkg/gopool" + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/admin/model" + "github.com/apache/servicecomb-service-center/server/core" + "github.com/apache/servicecomb-service-center/server/core/backend" + pb "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + "golang.org/x/net/context" + "sync" + "time" +) + +var ( + syncer *Syncer + syncerOnce sync.Once +) + +type Syncer struct { + Client *SCClientAggregate + + cachers map[discovery.Type]*ServiceCenterCacher +} + +func (c *Syncer) Initialize() { + c.cachers = make(map[discovery.Type]*ServiceCenterCacher) + c.Client = GetOrCreateSCClient() +} + +func (c *Syncer) Sync(ctx context.Context) error { + cache, errs := c.Client.GetScCache(ctx) + if cache == nil && len(errs) > 0 { + err := fmt.Errorf("%v", errs) + log.Errorf(err, "sync failed") + return err + } + + // microservice + serviceCacher, ok := c.cachers[backend.SERVICE] + if ok { + c.check(serviceCacher, &cache.Microservices, errs) + } + indexCacher, ok := c.cachers[backend.SERVICE_INDEX] + if ok { + c.checkWithConflictHandleFunc(indexCacher, &cache.Indexes, errs, c.logConflictFunc) + } + aliasCacher, ok := c.cachers[backend.SERVICE_ALIAS] + if ok { + c.checkWithConflictHandleFunc(aliasCacher, &cache.Aliases, errs, c.logConflictFunc) + } + // microservice meta + tagCacher, ok := c.cachers[backend.SERVICE_TAG] + if ok { + c.check(tagCacher, &cache.Tags, errs) + } + ruleCacher, ok := c.cachers[backend.RULE] + if ok { + c.check(ruleCacher, &cache.Rules, errs) + } + ruleIndexCacher, ok := c.cachers[backend.RULE_INDEX] + if ok { + c.check(ruleIndexCacher, &cache.RuleIndexes, errs) + } + depRuleCacher, ok := c.cachers[backend.DEPENDENCY_RULE] + if ok { + c.check(depRuleCacher, &cache.DependencyRules, errs) + } + schemaSummaryCacher, ok := c.cachers[backend.SCHEMA_SUMMARY] + if ok { + c.check(schemaSummaryCacher, &cache.Summaries, errs) + } + // instance + instCacher, ok := c.cachers[backend.INSTANCE] + if ok { + c.check(instCacher, &cache.Instances, errs) + } + return nil +} + +func (c *Syncer) check(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error) { + c.checkWithConflictHandleFunc(local, remote, skipClusters, c.skipHandleFunc) +} + +func (c *Syncer) checkWithConflictHandleFunc(local *ServiceCenterCacher, remote model.Getter, skipClusters map[string]error, + conflictHandleFunc func(origin *model.KV, conflict model.Getter, index int)) { + exists := make(map[string]*model.KV) + remote.ForEach(func(i int, v *model.KV) bool { + // because the result of the remote return may contain the same data as + // the local cache of the current SC. So we need to ignore it and + // prevent the aggregation result from increasing. + if v.ClusterName == registry.Configuration().ClusterName { + return true + } + if kv, ok := exists[v.Key]; ok { + conflictHandleFunc(kv, remote, i) + return true + } + exists[v.Key] = v + old := local.Cache().Get(v.Key) + newKv := &discovery.KeyValue{ + Key: util.StringToBytesWithNoCopy(v.Key), + Value: v.Value, + ModRevision: v.Rev, + ClusterName: v.ClusterName, + } + switch { + case old == nil: + newKv.Version = 1 + newKv.CreateRevision = v.Rev + local.Notify(pb.EVT_CREATE, v.Key, newKv) + case old.ModRevision != v.Rev: + // if connect to some cluster failed, then skip to notify changes + // of these clusters to prevent publish the wrong changes events of kvs. + if err, ok := skipClusters[old.ClusterName]; ok { + log.Errorf(err, "cluster[%s] temporarily unavailable, skip cluster[%s] event %s %s", + old.ClusterName, v.ClusterName, pb.EVT_UPDATE, v.Key) + break + } + newKv.Version = 1 + old.Version + newKv.CreateRevision = old.CreateRevision + local.Notify(pb.EVT_UPDATE, v.Key, newKv) + } + return true + }) + + var deletes []*discovery.KeyValue + local.Cache().ForEach(func(key string, v *discovery.KeyValue) (next bool) { + var exist bool + remote.ForEach(func(_ int, v *model.KV) bool { + if v.ClusterName == registry.Configuration().ClusterName { + return true + } + exist = v.Key == key + return !exist + }) + if !exist { + if err, ok := skipClusters[v.ClusterName]; ok { + log.Errorf(err, "cluster[%s] temporarily unavailable, skip event %s %s", + v.ClusterName, pb.EVT_DELETE, v.Key) + return true + } + deletes = append(deletes, v) + } + return true + }) + for _, v := range deletes { + local.Notify(pb.EVT_DELETE, util.BytesToStringWithNoCopy(v.Key), v) + } +} + +func (c *Syncer) skipHandleFunc(origin *model.KV, conflict model.Getter, index int) { +} + +func (c *Syncer) logConflictFunc(origin *model.KV, conflict model.Getter, index int) { + switch conflict.(type) { + case *model.MicroserviceIndexSlice: + slice := conflict.(*model.MicroserviceIndexSlice) + kv := (*slice)[index] + if serviceId := origin.Value.(string); kv.Value != serviceId { + key := core.GetInfoFromSvcIndexKV(util.StringToBytesWithNoCopy(kv.Key)) + log.Warnf("conflict! can not merge microservice index[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]", + kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version, + serviceId, origin.ClusterName) + } + case *model.MicroserviceAliasSlice: + slice := conflict.(*model.MicroserviceAliasSlice) + kv := (*slice)[index] + if serviceId := origin.Value.(string); kv.Value != serviceId { + key := core.GetInfoFromSvcAliasKV(util.StringToBytesWithNoCopy(kv.Key)) + log.Warnf("conflict! can not merge microservice alias[%s][%s][%s/%s/%s/%s], found one[%s] in cluster[%s]", + kv.ClusterName, kv.Value, key.Environment, key.AppId, key.ServiceName, key.Version, + serviceId, origin.ClusterName) + } + } +} + +func (c *Syncer) loop(ctx context.Context) { + select { + case <-ctx.Done(): + case <-time.After(minWaitInterval): + c.Sync(ctx) + d := registry.Configuration().AutoSyncInterval + if d == 0 { + return + } + loop: + for { + select { + case <-ctx.Done(): + break loop + case <-time.After(d): + // TODO support watching sc + c.Sync(ctx) + } + } + } + + log.Debug("service center clusters syncer is stopped") +} + +// unsafe +func (c *Syncer) AddCacher(t discovery.Type, cacher *ServiceCenterCacher) { + c.cachers[t] = cacher +} + +func (c *Syncer) Run() { + c.Initialize() + gopool.Go(c.loop) +} + +func GetOrCreateSyncer() *Syncer { + syncerOnce.Do(func() { + syncer = &Syncer{} + syncer.Run() + }) + return syncer +} diff --git a/server/plugin/pkg/discovery/servicecenter/indexer_test.go b/server/plugin/pkg/discovery/servicecenter/syncer_test.go similarity index 81% rename from server/plugin/pkg/discovery/servicecenter/indexer_test.go rename to server/plugin/pkg/discovery/servicecenter/syncer_test.go index 6d622172..f465832a 100644 --- a/server/plugin/pkg/discovery/servicecenter/indexer_test.go +++ b/server/plugin/pkg/discovery/servicecenter/syncer_test.go @@ -26,7 +26,7 @@ import ( ) func TestClusterIndexer_Sync(t *testing.T) { - indexer := &ClusterIndexer{} + syncer := &Syncer{} cache := discovery.NewKvCache("test", discovery.Configure()) cfg := discovery.Configure() sccacher := NewServiceCenterCacher(cfg, cache) @@ -36,7 +36,7 @@ func TestClusterIndexer_Sync(t *testing.T) { cfg.WithEventFunc(func(discovery.KvEvent) { t.Fatalf("TestClusterIndexer_Sync failed") }) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { t.Fatalf("TestClusterIndexer_Sync failed") }) @@ -49,7 +49,7 @@ func TestClusterIndexer_Sync(t *testing.T) { }) arr = model.MicroserviceIndexSlice{} arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: "a"}) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { t.Fatalf("TestClusterIndexer_Sync failed") }) @@ -62,7 +62,7 @@ func TestClusterIndexer_Sync(t *testing.T) { }) arr = model.MicroserviceIndexSlice{} arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"}) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) { t.Fatalf("TestClusterIndexer_Sync failed %v", kv) }) @@ -70,7 +70,7 @@ func TestClusterIndexer_Sync(t *testing.T) { cfg.WithEventFunc(func(evt discovery.KvEvent) { t.Fatalf("TestClusterIndexer_Sync failed, %v", evt) }) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { t.Fatalf("TestClusterIndexer_Sync failed") }) @@ -81,7 +81,7 @@ func TestClusterIndexer_Sync(t *testing.T) { arr = model.MicroserviceIndexSlice{} arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"}) arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "b"}) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, indexer.logConflictFunc) + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, syncer.logConflictFunc) // case: conflict and print log func() { @@ -92,7 +92,7 @@ func TestClusterIndexer_Sync(t *testing.T) { arr = model.MicroserviceIndexSlice{} arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"}) arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 2, ClusterName: "b"}) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, indexer.logConflictFunc) + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, syncer.logConflictFunc) // '/a' is incorrect key and logConflictFunc will be excepted to panic here t.Fatalf("TestClusterIndexer_Sync failed") }() @@ -103,7 +103,7 @@ func TestClusterIndexer_Sync(t *testing.T) { }) arr = model.MicroserviceIndexSlice{} arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 3, ClusterName: "b"}) - indexer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) { t.Fatalf("TestClusterIndexer_Sync failed %v", kv) }) @@ -112,7 +112,7 @@ func TestClusterIndexer_Sync(t *testing.T) { t.Fatalf("TestClusterIndexer_Sync failed, %v", evt) }) arr = model.MicroserviceIndexSlice{} - indexer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) { t.Fatalf("TestClusterIndexer_Sync failed %v", kv) }) @@ -124,7 +124,7 @@ func TestClusterIndexer_Sync(t *testing.T) { } }) arr = model.MicroserviceIndexSlice{} - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) { t.Fatalf("TestClusterIndexer_Sync failed %v", kv) }) @@ -137,7 +137,7 @@ func TestClusterIndexer_Sync(t *testing.T) { }) arr = model.MicroserviceIndexSlice{} arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: registry.Configuration().ClusterName}) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) { t.Fatalf("TestClusterIndexer_Sync failed") }) @@ -151,7 +151,7 @@ func TestClusterIndexer_Sync(t *testing.T) { arr = model.MicroserviceIndexSlice{} arr.SetValue(&model.KV{Key: "/a", Value: "x", Rev: 2, ClusterName: registry.Configuration().ClusterName}) arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"}) - indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) { + syncer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) { t.Fatalf("TestClusterIndexer_Sync failed %v", kv) }) } diff --git a/server/plugin/pkg/registry/option.go b/server/plugin/pkg/registry/option.go index f81a22c1..b6be5146 100644 --- a/server/plugin/pkg/registry/option.go +++ b/server/plugin/pkg/registry/option.go @@ -37,7 +37,7 @@ type PluginOp struct { WatchCallback WatchCallback Offset int64 Limit int64 - RegistryOnly bool + Global bool } func (op PluginOp) String() string { @@ -88,12 +88,22 @@ func (op PluginOp) FormatUrlParams() string { if op.Limit > 0 { buf.WriteString(fmt.Sprintf("&limit=%d", op.Limit)) } - if op.RegistryOnly { - buf.WriteString("®istryOnly=true") + if op.Global { + buf.WriteString("&global=true") } return buf.String() } +func (op PluginOp) NoCache() bool { + return op.Mode == MODE_NO_CACHE || + op.Revision > 0 || + (op.Offset >= 0 && op.Limit > 0) +} + +func (op PluginOp) CacheOnly() bool { + return op.Mode == MODE_CACHE +} + type Operation func(...PluginOpOption) (op PluginOp) type PluginOpOption func(*PluginOp) @@ -111,7 +121,7 @@ func WithPrevKv() PluginOpOption { return func(op *PluginOp) { op.Pr func WithLease(leaseID int64) PluginOpOption { return func(op *PluginOp) { op.Lease = leaseID } } func WithKeyOnly() PluginOpOption { return func(op *PluginOp) { op.KeyOnly = true } } func WithCountOnly() PluginOpOption { return func(op *PluginOp) { op.CountOnly = true } } -func WithRegistryOnly() PluginOpOption { return func(op *PluginOp) { op.RegistryOnly = true } } +func WithGlobal() PluginOpOption { return func(op *PluginOp) { op.Global = true } } func WithNoneOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_NONE } } func WithAscendOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_ASCEND } } func WithDescendOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_DESCEND } } diff --git a/server/service/cache/common.go b/server/service/cache/common.go index b17da58e..b25c92d8 100644 --- a/server/service/cache/common.go +++ b/server/service/cache/common.go @@ -17,10 +17,11 @@ package cache const ( - CTX_FIND_CONSUMER = "consumer" - CTX_FIND_PROVIDER = "provider" - CTX_FIND_TAGS = "tags" + CTX_FIND_CONSUMER = "_consumer" + CTX_FIND_PROVIDER = "_provider" + CTX_FIND_TAGS = "_tags" + CTX_FIND_REQUEST_REV = "_rev" - CACHE_FIND = "find" - CACHE_DEP = "dep" + CACHE_FIND = "_find" + CACHE_DEP = "_dep" ) diff --git a/server/service/cache/filter_consumer.go b/server/service/cache/filter_consumer.go index 387c9986..3691cb52 100644 --- a/server/service/cache/filter_consumer.go +++ b/server/service/cache/filter_consumer.go @@ -24,7 +24,7 @@ import ( type ConsumerFilter struct { } -func (f *ConsumerFilter) Name(ctx context.Context) string { +func (f *ConsumerFilter) Name(ctx context.Context, _ *cache.Node) string { return ctx.Value(CTX_FIND_CONSUMER).(string) } diff --git a/server/service/cache/filter_instances.go b/server/service/cache/filter_instances.go index c981ab0a..0f267508 100644 --- a/server/service/cache/filter_instances.go +++ b/server/service/cache/filter_instances.go @@ -26,24 +26,49 @@ import ( "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "golang.org/x/net/context" + "sort" ) +var clustersIndex = make(map[string]int) + +func init() { + var clusters []string + for name := range registry.Configuration().Clusters { + clusters = append(clusters, name) + } + sort.Strings(clusters) + for i, name := range clusters { + clustersIndex[name] = i + } +} + type InstancesFilter struct { } -func (f *InstancesFilter) Name(ctx context.Context) string { +func (f *InstancesFilter) Name(ctx context.Context, _ *cache.Node) string { return "" } func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) { - provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey) pCopy := *parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) + pCopy.Instances, pCopy.Rev, err = f.FindInstances(ctx, pCopy.ServiceIds) + if err != nil { + return + } + + pCopy.InitBrokenQueue() + node = cache.NewNode() + node.Cache.Set(CACHE_FIND, &pCopy) + return +} +func (f *InstancesFilter) FindInstances(ctx context.Context, serviceIds []string) (instances []*pb.MicroServiceInstance, rev string, err error) { + provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey) var ( - instances []*pb.MicroServiceInstance - rev int64 + maxRevs = make([]int64, len(clustersIndex)) + counts = make([]int64, len(clustersIndex)) ) - for _, providerServiceId := range pCopy.ServiceIds { + for _, providerServiceId := range serviceIds { key := apt.GenerateInstanceKey(provider.Tenant, providerServiceId, "") opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix()) resp, err := backend.Store().Instance().Search(ctx, opts...) @@ -52,21 +77,20 @@ func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node *c findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId, provider.AppId, provider.ServiceName, provider.Version) log.Errorf(err, "Instance().Search failed, %s", findFlag) - return nil, err + return nil, "", err } for _, kv := range resp.Kvs { - if kv.ModRevision > rev { - rev = kv.ModRevision + if i, ok := clustersIndex[kv.ClusterName]; ok { + if kv.ModRevision > maxRevs[i] { + maxRevs[i] = kv.ModRevision + } + counts[i]++ } instances = append(instances, kv.Value.(*pb.MicroServiceInstance)) } - } - pCopy.Instances = instances - pCopy.Rev = serviceUtil.FormatRevision(rev, int64(len(instances))) + } - node = cache.NewNode() - node.Cache.Set(CACHE_FIND, &pCopy) - return + return instances, serviceUtil.FormatRevision(maxRevs, counts), nil } diff --git a/server/service/cache/filter_permission.go b/server/service/cache/filter_permission.go index 2a806a55..f2726b18 100644 --- a/server/service/cache/filter_permission.go +++ b/server/service/cache/filter_permission.go @@ -28,14 +28,14 @@ import ( type AccessibleFilter struct { } -func (f *AccessibleFilter) Name(ctx context.Context) string { +func (f *AccessibleFilter) Name(ctx context.Context, _ *cache.Node) string { consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService) return consumer.ServiceId } func (f *AccessibleFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) { var ids []string - consumerId := f.Name(ctx) + consumerId := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService).ServiceId pCopy := *parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) for _, providerServiceId := range pCopy.ServiceIds { if err := serviceUtil.Accessible(ctx, consumerId, providerServiceId); err != nil { diff --git a/server/service/cache/filter_rev.go b/server/service/cache/filter_rev.go new file mode 100644 index 00000000..d3eb5a47 --- /dev/null +++ b/server/service/cache/filter_rev.go @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cache + +import ( + "github.com/apache/servicecomb-service-center/pkg/cache" + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" + serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" + "golang.org/x/net/context" +) + +type RevisionFilter struct { + InstancesFilter +} + +func (f *RevisionFilter) Name(ctx context.Context, parent *cache.Node) string { + item := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) + requestRev := ctx.Value(CTX_FIND_REQUEST_REV).(string) + if len(requestRev) == 0 || requestRev == item.Rev { + return "" + } + return requestRev +} + +func (f *RevisionFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) { + item := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem) + requestRev := ctx.Value(CTX_FIND_REQUEST_REV).(string) + if len(requestRev) == 0 || requestRev == item.Rev { + node = cache.NewNode() + node.Cache.Set(CACHE_FIND, item) + return + } + + if item.BrokenWait() { + node = cache.NewNode() + node.Cache.Set(CACHE_FIND, item) + return + } + + cloneCtx := util.CloneContext(ctx) + cloneCtx = util.SetContext(cloneCtx, serviceUtil.CTX_NOCACHE, "1") + + insts, _, err := f.FindInstances(cloneCtx, item.ServiceIds) + if err != nil { + item.InitBrokenQueue() + return nil, err + } + + log.Warnf("the cache of finding instances api is broken, req[%s]!=cache[%s]", + requestRev, item.Rev) + item.Instances = insts + item.Broken() + + node = cache.NewNode() + node.Cache.Set(CACHE_FIND, item) + return +} diff --git a/server/service/cache/filter_service.go b/server/service/cache/filter_service.go index 7cb0bba3..08fbc4e3 100644 --- a/server/service/cache/filter_service.go +++ b/server/service/cache/filter_service.go @@ -26,7 +26,7 @@ import ( type ServiceFilter struct { } -func (f *ServiceFilter) Name(ctx context.Context) string { +func (f *ServiceFilter) Name(ctx context.Context, _ *cache.Node) string { provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey) return util.StringJoin([]string{ provider.Tenant, diff --git a/server/service/cache/filter_tags.go b/server/service/cache/filter_tags.go index e5eb9954..41b1981b 100644 --- a/server/service/cache/filter_tags.go +++ b/server/service/cache/filter_tags.go @@ -31,7 +31,7 @@ import ( type TagsFilter struct { } -func (f *TagsFilter) Name(ctx context.Context) string { +func (f *TagsFilter) Name(ctx context.Context, _ *cache.Node) string { tags, _ := ctx.Value(CTX_FIND_TAGS).([]string) sort.Strings(tags) return strings.Join(tags, ",") diff --git a/server/service/cache/filter_version.go b/server/service/cache/filter_version.go index d4fbcab6..0c47f6c1 100644 --- a/server/service/cache/filter_version.go +++ b/server/service/cache/filter_version.go @@ -28,7 +28,7 @@ import ( type VersionRuleFilter struct { } -func (f *VersionRuleFilter) Name(ctx context.Context) string { +func (f *VersionRuleFilter) Name(ctx context.Context, _ *cache.Node) string { provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey) return provider.Version } diff --git a/server/service/cache/instance.go b/server/service/cache/instance.go index a0be59f3..f7e7f302 100644 --- a/server/service/cache/instance.go +++ b/server/service/cache/instance.go @@ -36,7 +36,9 @@ func init() { &VersionRuleFilter{}, &TagsFilter{}, &AccessibleFilter{}, - &InstancesFilter{}) + &InstancesFilter{}, + &RevisionFilter{}, + ) } type VersionRuleCacheItem struct { @@ -44,17 +46,40 @@ type VersionRuleCacheItem struct { ServiceIds []string Instances []*pb.MicroServiceInstance Rev string + + broken bool + queue chan struct{} +} + +func (vi *VersionRuleCacheItem) InitBrokenQueue() { + if vi.queue == nil { + vi.queue = make(chan struct{}, 1) + } + vi.broken = false + vi.queue <- struct{}{} +} + +func (vi *VersionRuleCacheItem) BrokenWait() bool { + <-vi.queue + return vi.broken +} + +func (vi *VersionRuleCacheItem) Broken() { + vi.broken = true + close(vi.queue) } type FindInstancesCache struct { *cache.Tree } -func (f *FindInstancesCache) Get(ctx context.Context, consumer *pb.MicroService, provider *pb.MicroServiceKey, tags []string) (*VersionRuleCacheItem, error) { - cloneCtx := context.WithValue(context.WithValue(context.WithValue(ctx, +func (f *FindInstancesCache) Get(ctx context.Context, consumer *pb.MicroService, provider *pb.MicroServiceKey, + tags []string, rev string) (*VersionRuleCacheItem, error) { + cloneCtx := context.WithValue(context.WithValue(context.WithValue(context.WithValue(ctx, CTX_FIND_CONSUMER, consumer), CTX_FIND_PROVIDER, provider), - CTX_FIND_TAGS, tags) + CTX_FIND_TAGS, tags), + CTX_FIND_REQUEST_REV, rev) node, err := f.Tree.Get(cloneCtx, cache.Options().Temporary(ctx.Value(serviceUtil.CTX_NOCACHE) == "1")) if node == nil { diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go index df0f23f4..6dc50f73 100644 --- a/server/service/event/dependency_event_handler.go +++ b/server/service/event/dependency_event_handler.go @@ -155,7 +155,7 @@ func (h *DependencyEventHandler) Handle() error { } func (h *DependencyEventHandler) dependencyRuleHandle(res interface{}) error { - ctx := context.Background() + ctx := context.WithValue(context.Background(), serviceUtil.CTX_GLOBAL, "1") dependencyEventHandlerRes := res.(*DependencyEventHandlerResource) r := dependencyEventHandlerRes.dep consumerFlag := util.StringJoin([]string{r.Consumer.Environment, r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/") @@ -204,7 +204,8 @@ func (h *DependencyEventHandler) removeKV(ctx context.Context, kv *discovery.Key func (h *DependencyEventHandler) CleanUp(domainProjects map[string]struct{}) { for domainProject := range domainProjects { - if err := serviceUtil.CleanUpDependencyRules(context.Background(), domainProject); err != nil { + ctx := context.WithValue(context.Background(), serviceUtil.CTX_GLOBAL, "1") + if err := serviceUtil.CleanUpDependencyRules(ctx, domainProject); err != nil { log.Errorf(err, "clean up '%s' dependency rules failed", domainProject) } } diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go index 6f2594d8..d526d740 100644 --- a/server/service/event/instance_event_handler.go +++ b/server/service/event/instance_event_handler.go @@ -66,7 +66,9 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) { } // 查询服务版本信息 - ctx := util.SetContext(context.Background(), serviceUtil.CTX_CACHEONLY, "1") + ctx := context.WithValue(context.WithValue(context.Background(), + serviceUtil.CTX_CACHEONLY, "1"), + serviceUtil.CTX_GLOBAL, "1") ms, err := serviceUtil.GetService(ctx, domainProject, providerId) if ms == nil { log.Errorf(err, "caught [%s] instance[%s/%s] event, get cached provider's file failed", diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go index 7ec68649..095dd0e2 100644 --- a/server/service/event/rule_event_handler.go +++ b/server/service/event/rule_event_handler.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/task" - "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/core/backend" pb "github.com/apache/servicecomb-service-center/server/core/proto" @@ -53,7 +52,9 @@ func (apt *RulesChangedTask) Err() error { } func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, providerId string, rev int64) error { - ctx = util.SetContext(ctx, serviceUtil.CTX_CACHEONLY, "1") + ctx = context.WithValue(context.WithValue(ctx, + serviceUtil.CTX_CACHEONLY, "1"), + serviceUtil.CTX_GLOBAL, "1") provider, err := serviceUtil.GetService(ctx, domainProject, providerId) if err != nil { diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go index 132a8e5c..fb7ea039 100644 --- a/server/service/event/tag_event_handler.go +++ b/server/service/event/tag_event_handler.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/task" - "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/core/backend" pb "github.com/apache/servicecomb-service-center/server/core/proto" @@ -54,7 +53,9 @@ func (apt *TagsChangedTask) Err() error { } func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumerId string, rev int64) error { - ctx = util.SetContext(ctx, serviceUtil.CTX_CACHEONLY, "1") + ctx = context.WithValue(context.WithValue(ctx, + serviceUtil.CTX_CACHEONLY, "1"), + serviceUtil.CTX_GLOBAL, "1") consumer, err := serviceUtil.GetService(ctx, domainProject, consumerId) if err != nil { diff --git a/server/service/instance.go b/server/service/instance.go index ad12ee38..667d6757 100644 --- a/server/service/instance.go +++ b/server/service/instance.go @@ -566,32 +566,20 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest) // cache var item *cache.VersionRuleCacheItem - noCache, cacheOnly := ctx.Value(serviceUtil.CTX_NOCACHE) == "1", ctx.Value(serviceUtil.CTX_CACHEONLY) == "1" rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(string) - reqRev, _ := serviceUtil.ParseRevision(rev) - cloneCtx := util.CloneContext(ctx) - - for i := 0; i < 2; i++ { - item, err = cache.FindInstances.Get(cloneCtx, service, provider, in.Tags) - if err != nil { - log.Errorf(err, "FindInstancesCache.Get failed, %s failed", findFlag()) - return &pb.FindInstancesResponse{ - Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), - }, err - } - if item == nil { - mes := fmt.Errorf("%s failed, provider does not exist.", findFlag()) - log.Errorf(mes, "FindInstancesCache.Get failed") - return &pb.FindInstancesResponse{ - Response: pb.CreateResponse(scerr.ErrServiceNotExists, mes.Error()), - }, nil - } - - cacheRev, _ := serviceUtil.ParseRevision(item.Rev) - if noCache || cacheOnly || reqRev <= cacheRev { - break - } - cloneCtx = util.SetContext(cloneCtx, serviceUtil.CTX_NOCACHE, "1") + item, err = cache.FindInstances.Get(ctx, service, provider, in.Tags, rev) + if err != nil { + log.Errorf(err, "FindInstancesCache.Get failed, %s failed", findFlag()) + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrInternal, err.Error()), + }, err + } + if item == nil { + mes := fmt.Errorf("%s failed, provider does not exist.", findFlag()) + log.Errorf(mes, "FindInstancesCache.Get failed") + return &pb.FindInstancesResponse{ + Response: pb.CreateResponse(scerr.ErrServiceNotExists, mes.Error()), + }, nil } // add dependency queue @@ -618,8 +606,9 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest) instances := item.Instances if rev == item.Rev { - instances = instances[:0] + instances = nil // for gRPC } + // TODO support gRPC output context ctx = util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev) return &pb.FindInstancesResponse{ Response: pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."), diff --git a/server/service/instance_test.go b/server/service/instance_test.go index 3e6e3cf8..d40dd63d 100644 --- a/server/service/instance_test.go +++ b/server/service/instance_test.go @@ -1226,12 +1226,10 @@ var _ = Describe("'Instance' service", func() { Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) rev, _ := ctx.Value(serviceUtil.CTX_RESPONSE_REVISION).(string) - reqRev, reqCount := serviceUtil.ParseRevision(rev) - Expect(int64(len(respFind.Instances))).To(Equal(reqCount)) Expect(respFind.Instances[0].InstanceId).To(Equal(instanceId8)) - Expect(reqRev).NotTo(Equal(0)) + Expect(len(rev)).NotTo(Equal(0)) - util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, strconv.FormatInt(reqRev-1, 10)) + util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, "x") respFind, err = instanceResource.Find(ctx, &pb.FindInstancesRequest{ ConsumerServiceId: serviceId8, AppId: "query_instance", @@ -1240,20 +1238,6 @@ var _ = Describe("'Instance' service", func() { }) Expect(err).To(BeNil()) Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(int64(len(respFind.Instances))).To(Equal(reqCount)) - Expect(respFind.Instances[0].InstanceId).To(Equal(instanceId8)) - Expect(ctx.Value(serviceUtil.CTX_RESPONSE_REVISION)).To(Equal(rev)) - - util.SetContext(ctx, serviceUtil.CTX_REQUEST_REVISION, strconv.FormatInt(reqRev+1, 10)) - respFind, err = instanceResource.Find(ctx, &pb.FindInstancesRequest{ - ConsumerServiceId: serviceId8, - AppId: "query_instance", - ServiceName: "query_instance_with_rev", - VersionRule: "1.0.0", - }) - Expect(err).To(BeNil()) - Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS)) - Expect(int64(len(respFind.Instances))).To(Equal(reqCount)) Expect(respFind.Instances[0].InstanceId).To(Equal(instanceId8)) Expect(ctx.Value(serviceUtil.CTX_RESPONSE_REVISION)).To(Equal(rev)) diff --git a/server/service/util/common.go b/server/service/util/common.go index 0b182aa2..88c5737e 100644 --- a/server/service/util/common.go +++ b/server/service/util/common.go @@ -18,9 +18,9 @@ package util const ( HEADER_REV = "X-Resource-Revision" - CTX_REGISTRYONLY = "_registryOnly" - CTX_NOCACHE = "_noCache" - CTX_CACHEONLY = "_cacheOnly" - CTX_REQUEST_REVISION = "_requestRev" - CTX_RESPONSE_REVISION = "_responseRev" + CTX_GLOBAL = "global" + CTX_NOCACHE = "noCache" + CTX_CACHEONLY = "cacheOnly" + CTX_REQUEST_REVISION = "requestRev" + CTX_RESPONSE_REVISION = "responseRev" ) diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go index cae67605..302347f2 100644 --- a/server/service/util/instance_util.go +++ b/server/service/util/instance_util.go @@ -17,6 +17,7 @@ package util import ( + "crypto/sha1" "encoding/json" "fmt" "github.com/apache/servicecomb-service-center/pkg/log" @@ -62,18 +63,11 @@ func GetInstance(ctx context.Context, domainProject string, serviceId string, in return resp.Kvs[0].Value.(*pb.MicroServiceInstance), nil } -func ParseRevision(rev string) (int64, int64) { - arrRev := strings.Split(rev, ".") - reqRev, _ := strconv.ParseInt(arrRev[0], 10, 64) - reqCount := int64(0) - if len(arrRev) > 1 { - reqCount, _ = strconv.ParseInt(arrRev[1], 10, 64) +func FormatRevision(revs, counts []int64) (s string) { + for i, rev := range revs { + s += fmt.Sprintf("%d.%d,", rev, counts[i]) } - return reqRev, reqCount -} - -func FormatRevision(rev, count int64) string { - return fmt.Sprintf("%d.%d", rev, count) + return fmt.Sprintf("%x", sha1.Sum(util.StringToBytesWithNoCopy(s))) } func GetAllInstancesOfOneService(ctx context.Context, domainProject string, serviceId string) ([]*pb.MicroServiceInstance, error) { diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go index 52e1403f..abe5182c 100644 --- a/server/service/util/instance_util_test.go +++ b/server/service/util/instance_util_test.go @@ -25,11 +25,17 @@ import ( ) func TestFormatRevision(t *testing.T) { - if "1.1" != FormatRevision(1, 1) { - t.Fatalf("TestFormatRevision failed") + // null + if x := FormatRevision(nil, nil); "da39a3ee5e6b4b0d3255bfef95601890afd80709" != x { + t.Fatalf("TestFormatRevision failed, %s", x) } - if a, b := ParseRevision("1.1"); a != 1 || b != 1 { - t.Fatalf("TestFormatRevision failed") + // 1.1,11.1, + if x := FormatRevision([]int64{1, 11}, []int64{1, 1}); "87aa7d310290ff4f93248c0aed6870b928edf45a" != x { + t.Fatalf("TestFormatRevision failed, %s", x) + } + // 1.11,1.1, + if x := FormatRevision([]int64{1, 1}, []int64{11, 1}); "24675d196e3dea5be0c774cab281366640fc99ef" != x { + t.Fatalf("TestFormatRevision failed, %s", x) } } diff --git a/server/service/util/util.go b/server/service/util/util.go index 2184b29d..677a5599 100644 --- a/server/service/util/util.go +++ b/server/service/util/util.go @@ -29,8 +29,8 @@ func FromContext(ctx context.Context) []registry.PluginOpOption { case ctx.Value(CTX_CACHEONLY) == "1": opts = append(opts, registry.WithCacheOnly()) } - if ctx.Value(CTX_REGISTRYONLY) == "1" { - opts = append(opts, registry.WithRegistryOnly()) + if ctx.Value(CTX_GLOBAL) == "1" { + opts = append(opts, registry.WithGlobal()) } return opts } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bug fixes > --------- > > Key: SCB-993 > URL: https://issues.apache.org/jira/browse/SCB-993 > Project: Apache ServiceComb > Issue Type: Bug > Components: Service-Center > Reporter: little-cui > Assignee: little-cui > Priority: Major > Fix For: service-center-1.1.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)