This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new dcfc2ae  [feat] add schema sync func and ut when db mode is etcd
     new f1d9781  Merge pull request #1209 from robotLJW/master
dcfc2ae is described below

commit dcfc2ae871e9525795c964e0101ca20f7b739b6e
Author: robotljw <[email protected]>
AuthorDate: Fri Jan 7 19:32:01 2022 +0800

    [feat] add schema sync func and ut when db mode is etcd
---
 datasource/etcd/ms.go          |  21 ++-
 datasource/etcd/schema.go      |  94 ++++++++++++--
 datasource/etcd/schema_test.go | 281 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 378 insertions(+), 18 deletions(-)

diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index ce5ecf4..220030c 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -1544,13 +1544,24 @@ func (ds *MetadataManager) DeleteSchema(ctx 
context.Context, request *pb.DeleteS
                return nil, schema.ErrSchemaNotFound
        }
        epSummaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, 
request.ServiceId, request.SchemaId)
-       resp, errDo := etcdadpt.TxnWithCmp(ctx,
-               etcdadpt.Ops(
-                       etcdadpt.OpDel(etcdadpt.WithStrKey(epSummaryKey)),
-                       etcdadpt.OpDel(etcdadpt.WithStrKey(key)),
-               ),
+       opts := 
[]etcdadpt.OpOptions{etcdadpt.OpDel(etcdadpt.WithStrKey(epSummaryKey)), 
etcdadpt.OpDel(etcdadpt.WithStrKey(key))}
+       schemaKeyOpt, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, 
key, key)
+       if err != nil {
+               log.Error("fail to create delete opts", err)
+               return nil, err
+       }
+       opts = append(opts, schemaKeyOpt...)
+       schemaSummaryKeyOpt, err := esync.GenDeleteOpts(ctx, 
datasource.ResourceKV, epSummaryKey, epSummaryKey)
+       if err != nil {
+               log.Error("fail to create delete opts", err)
+               return nil, err
+       }
+       opts = append(opts, schemaSummaryKeyOpt...)
+
+       resp, errDo := etcdadpt.TxnWithCmp(ctx, opts,
                
etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, 
request.ServiceId), 0)),
                nil)
+
        if errDo != nil {
                log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: 
%s",
                        request.ServiceId, request.SchemaId, remoteIP), errDo)
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index 892cb8b..2f16dcd 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -22,17 +22,19 @@ import (
        "encoding/json"
        "fmt"
 
+       mapset "github.com/deckarep/golang-set"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/little-cui/etcdadpt"
+       "go.etcd.io/etcd/api/v3/mvccpb"
+
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
        "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-       serviceUtil 
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
+       "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+       eutil 
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
        "github.com/apache/servicecomb-service-center/datasource/schema"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
-       mapset "github.com/deckarep/golang-set"
-       "github.com/go-chassis/cari/discovery"
-       "github.com/little-cui/etcdadpt"
-       "go.etcd.io/etcd/api/v3/mvccpb"
 )
 
 func init() {
@@ -55,7 +57,7 @@ func (dao *SchemaDAO) GetRef(ctx context.Context, refRequest 
*schema.RefRequest)
        schemaID := refRequest.SchemaID
 
        refKey := path.GenerateServiceSchemaRefKey(domainProject, serviceID, 
schemaID)
-       refResp, err := sd.SchemaRef().Search(ctx, 
serviceUtil.ContextOptions(ctx, etcdadpt.WithStrKey(refKey))...)
+       refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx, 
etcdadpt.WithStrKey(refKey))...)
        if err != nil {
                log.Error(fmt.Sprintf("get service[%s] schema-ref[%s] failed", 
serviceID, schemaID), err)
                return nil, err
@@ -83,7 +85,7 @@ func (dao *SchemaDAO) GetRef(ctx context.Context, refRequest 
*schema.RefRequest)
 func getSummary(ctx context.Context, serviceID string, schemaID string) 
(string, error) {
        domainProject := util.ParseDomainProject(ctx)
        summaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, 
serviceID, schemaID)
-       summaryResp, err := sd.SchemaSummary().Search(ctx, 
serviceUtil.ContextOptions(ctx, etcdadpt.WithStrKey(summaryKey))...)
+       summaryResp, err := sd.SchemaSummary().Search(ctx, 
eutil.ContextOptions(ctx, etcdadpt.WithStrKey(summaryKey))...)
        if err != nil {
                return "", err
        }
@@ -101,7 +103,7 @@ func (dao *SchemaDAO) ListRef(ctx context.Context, 
refRequest *schema.RefRequest
        serviceID := refRequest.ServiceID
 
        refPrefixKey := path.GenerateServiceSchemaRefKey(domainProject, 
serviceID, "")
-       refResp, err := sd.SchemaRef().Search(ctx, 
serviceUtil.ContextOptions(ctx,
+       refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
                etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
        if err != nil {
                log.Error(fmt.Sprintf("get service[%s] schema-refs failed", 
serviceID), err)
@@ -132,7 +134,7 @@ func (dao *SchemaDAO) ListRef(ctx context.Context, 
refRequest *schema.RefRequest
 func getSummaryMap(ctx context.Context, serviceID string) (map[string]string, 
error) {
        domainProject := util.ParseDomainProject(ctx)
        summaryPrefixKey := path.GenerateServiceSchemaSummaryKey(domainProject, 
serviceID, "")
-       summaryResp, err := sd.SchemaSummary().Search(ctx, 
serviceUtil.ContextOptions(ctx,
+       summaryResp, err := sd.SchemaSummary().Search(ctx, 
eutil.ContextOptions(ctx,
                etcdadpt.WithStrKey(summaryPrefixKey), 
etcdadpt.WithPrefix())...)
        if err != nil {
                return nil, err
@@ -156,6 +158,19 @@ func (dao *SchemaDAO) DeleteRef(ctx context.Context, 
refRequest *schema.RefReque
                etcdadpt.OpDel(etcdadpt.WithStrKey(refKey)),
                etcdadpt.OpDel(etcdadpt.WithStrKey(summaryKey)),
        }
+       refOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, refKey, 
refKey)
+       if err != nil {
+               log.Error("fail to create delete opts", err)
+               return err
+       }
+       options = append(options, refOpts...)
+       summaryOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, 
summaryKey, summaryKey)
+       if err != nil {
+               log.Error("fail to create delete opts", err)
+               return err
+       }
+       options = append(options, summaryOpts...)
+
        cmp, err := etcdadpt.TxnWithCmp(ctx, options, 
etcdadpt.If(etcdadpt.ExistKey(refKey)), options)
        if err != nil {
                log.Error(fmt.Sprintf("delete service[%s] schema-ref[%s] 
failed", serviceID, schemaID), err)
@@ -213,6 +228,18 @@ func (dao *SchemaDAO) PutContent(ctx context.Context, 
contentRequest *schema.Put
                etcdadpt.OpPut(etcdadpt.WithStrKey(refKey), 
etcdadpt.WithStrValue(content.Hash)),
                etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), 
etcdadpt.WithStrValue(content.Summary)),
        }
+       refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, 
content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
+       if err != nil {
+               log.Error("fail to create update opts", err)
+               return err
+       }
+       summaryOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, 
content.Summary, sync.WithOpts(map[string]string{"key": summaryKey}))
+       if err != nil {
+               log.Error("fail to create update opts", err)
+               return err
+       }
+       existContentOptions = append(existContentOptions, refOpts...)
+       existContentOptions = append(existContentOptions, summaryOpts...)
 
        // append the schemaID into service.Schemas if schemaID is new
        if !util.SliceHave(service.Schemas, schemaID) {
@@ -230,6 +257,13 @@ func (dao *SchemaDAO) PutContent(ctx context.Context, 
contentRequest *schema.Put
        newContentOptions := append(existContentOptions,
                etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), 
etcdadpt.WithStrValue(content.Content)),
        )
+       contentOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, 
content.Content, sync.WithOpts(map[string]string{"key": contentKey}))
+       if err != nil {
+               log.Error("fail to create update opts", err)
+               return err
+       }
+       newContentOptions = append(newContentOptions, contentOpts...)
+
        cmp, err := etcdadpt.TxnWithCmp(ctx, newContentOptions, 
etcdadpt.If(etcdadpt.NotExistKey(contentKey)), existContentOptions)
        if err != nil {
                log.Error(fmt.Sprintf("put kv[%s] failed", refKey), err)
@@ -261,7 +295,7 @@ func (dao *SchemaDAO) PutManyContent(ctx context.Context, 
contentRequest *schema
        }
 
        // unsafe!
-       schemaIDs, options := transformSchemaIDsAndOptions(domainProject, 
serviceID, service.Schemas, contentRequest)
+       schemaIDs, options := transformSchemaIDsAndOptions(ctx, domainProject, 
serviceID, service.Schemas, contentRequest)
 
        // should update service.Schemas
        service.Schemas = schemaIDs
@@ -272,10 +306,18 @@ func (dao *SchemaDAO) PutManyContent(ctx context.Context, 
contentRequest *schema
        }
        serviceKey := path.GenerateServiceKey(domainProject, serviceID)
        options = append(options, 
etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body)))
+       // update service task
+       serviceOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, 
body, sync.WithOpts(map[string]string{"key": serviceKey}))
+       if err != nil {
+               log.Error("fail to create update opts", err)
+       }
+       options = append(options, serviceOpts...)
+
        return etcdadpt.Txn(ctx, options)
 }
 
-func transformSchemaIDsAndOptions(domainProject string, serviceID string, 
oldSchemaIDs []string, contentRequest *schema.PutManyContentRequest) ([]string, 
[]etcdadpt.OpOptions) {
+func transformSchemaIDsAndOptions(ctx context.Context, domainProject string, 
serviceID string,
+       oldSchemaIDs []string, contentRequest *schema.PutManyContentRequest) 
([]string, []etcdadpt.OpOptions) {
        pendingDeleteSchemaIDs := mapset.NewSet()
        for _, schemaID := range oldSchemaIDs {
                pendingDeleteSchemaIDs.Add(schemaID)
@@ -293,6 +335,22 @@ func transformSchemaIDsAndOptions(domainProject string, 
serviceID string, oldSch
                        etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), 
etcdadpt.WithStrValue(content.Content)),
                        etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), 
etcdadpt.WithStrValue(content.Summary)),
                )
+               refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, 
content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
+               if err != nil {
+                       log.Error("fail to create update opts", err)
+               }
+               options = append(options, refOpts...)
+               contentOpts, err := sync.GenUpdateOpts(ctx, 
datasource.ResourceKV, content.Content, sync.WithOpts(map[string]string{"key": 
contentKey}))
+               if err != nil {
+                       log.Error("fail to create update opts", err)
+               }
+               options = append(options, contentOpts...)
+               summaryOpts, err := sync.GenUpdateOpts(ctx, 
datasource.ResourceKV, content.Summary, sync.WithOpts(map[string]string{"key": 
summaryKey}))
+               if err != nil {
+                       log.Error("fail to create update opts", err)
+               }
+               options = append(options, summaryOpts...)
+
                schemaIDs = append(schemaIDs, schemaID)
                pendingDeleteSchemaIDs.Remove(schemaID)
        }
@@ -305,6 +363,16 @@ func transformSchemaIDsAndOptions(domainProject string, 
serviceID string, oldSch
                        etcdadpt.OpDel(etcdadpt.WithStrKey(refKey)),
                        etcdadpt.OpDel(etcdadpt.WithStrKey(summaryKey)),
                )
+               refOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, 
refKey, refKey)
+               if err != nil {
+                       log.Error("fail to create update opts", err)
+               }
+               options = append(options, refOpts...)
+               summaryOpt, err := sync.GenDeleteOpts(ctx, 
datasource.ResourceKV, summaryKey, summaryKey)
+               if err != nil {
+                       log.Error("fail to create update opts", err)
+               }
+               options = append(options, summaryOpt...)
        }
        return schemaIDs, options
 }
@@ -341,7 +409,7 @@ func (dao *SchemaDAO) DeleteContent(ctx context.Context, 
contentRequest *schema.
 func getContentHashMap(ctx context.Context) (map[string]struct{}, error) {
        domainProject := util.ParseDomainProject(ctx)
        refPrefixKey := path.GetServiceSchemaRefRootKey(domainProject) + 
path.SPLIT
-       refResp, err := sd.SchemaRef().Search(ctx, 
serviceUtil.ContextOptions(ctx,
+       refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
                etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
        if err != nil {
                return nil, err
@@ -393,7 +461,7 @@ func filterNoRefContentHashes(ctx context.Context, kvs 
[]*mvccpb.KeyValue) (maps
        }
 
        refPrefixKey := path.GetServiceSchemaRefRootKey("")
-       resp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+       resp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
                etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
        if err != nil {
                return nil, err
diff --git a/datasource/etcd/schema_test.go b/datasource/etcd/schema_test.go
new file mode 100644
index 0000000..efc62e6
--- /dev/null
+++ b/datasource/etcd/schema_test.go
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+       "context"
+       "testing"
+
+       pb "github.com/go-chassis/cari/discovery"
+       csync "github.com/go-chassis/cari/sync"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/schema"
+       "github.com/apache/servicecomb-service-center/eventbase/model"
+       "github.com/apache/servicecomb-service-center/eventbase/service/task"
+       
"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       _ "github.com/apache/servicecomb-service-center/test"
+)
+
+func schemaContext() context.Context {
+       return util.WithNoCache(util.SetDomainProject(context.Background(), 
"sync-schema", "sync-schema"))
+}
+
+func TestSyncSchema(t *testing.T) {
+
+       datasource.EnableSync = true
+       var serviceID string
+
+       defer schema.Instance().DeleteContent(schemaContext(), 
&schema.ContentRequest{
+               Hash: "hash_1",
+       })
+       defer schema.Instance().DeleteContent(schemaContext(), 
&schema.ContentRequest{
+               Hash: "hash_2",
+       })
+       defer schema.Instance().DeleteContent(schemaContext(), 
&schema.ContentRequest{
+               Hash: "hash_2",
+       })
+
+       t.Run("register a micro service", func(t *testing.T) {
+               t.Run("register a service will create a service task should 
pass", func(t *testing.T) {
+                       resp, err := 
datasource.GetMetadataManager().RegisterService(schemaContext(), 
&pb.CreateServiceRequest{
+                               Service: &pb.MicroService{
+                                       AppId:       "sync_schemas_prod",
+                                       ServiceName: "sync_schemas_service",
+                                       Version:     "1.0.1",
+                                       Level:       "FRONT",
+                                       Status:      pb.MS_UP,
+                                       Environment: pb.ENV_PROD,
+                               },
+                       })
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
resp.Response.GetCode())
+                       serviceID = resp.ServiceId
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               Action:       csync.CreateAction,
+                               ResourceType: datasource.ResourceService,
+                               Status:       csync.PendingStatus,
+                       }
+                       tasks, err := task.List(context.Background(), 
&listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(context.Background(), tasks...)
+                       assert.NoError(t, err)
+               })
+       })
+
+       t.Run("put schema will execute the PutContent func", func(t *testing.T) 
{
+               t.Run("put content with valid request, will create 3 kv 
tasks(hash summary content) should pass", func(t *testing.T) {
+                       err := schema.Instance().PutContent(schemaContext(), 
&schema.PutContentRequest{
+                               ServiceID: serviceID,
+                               SchemaID:  "schemaID_1",
+                               Content: &schema.ContentItem{
+                                       Hash:    "hash_1",
+                                       Summary: "summary_1",
+                                       Content: "1111111111",
+                               },
+                       })
+                       assert.NoError(t, err)
+
+                       ref, err := schema.Instance().GetRef(schemaContext(), 
&schema.RefRequest{
+                               ServiceID: serviceID,
+                               SchemaID:  "schemaID_1",
+                       })
+                       assert.NoError(t, err)
+                       assert.NotNil(t, ref)
+                       assert.Equal(t, "summary_1", ref.Summary)
+                       assert.Equal(t, "hash_1", ref.Hash)
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               Action:       csync.UpdateAction,
+                               ResourceType: datasource.ResourceKV,
+                               Status:       csync.PendingStatus,
+                       }
+                       tasks, err := task.List(context.Background(), 
&listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 3, len(tasks))
+                       err = task.Delete(context.Background(), tasks...)
+                       assert.NoError(t, err)
+               })
+       })
+
+       t.Run("put schemas will execute the PutManyContent func", func(t 
*testing.T) {
+               t.Run("put many content with valid request, will create 7 kv 
update task (2 ref tasks, 2 content tasks, 2 summary tasks"+
+                       " 1 service task), two delete kv task, two 
tombstones(ref and summary) should pass", func(t *testing.T) {
+                       err := 
schema.Instance().PutManyContent(schemaContext(), &schema.PutManyContentRequest{
+                               ServiceID: serviceID,
+                               SchemaIDs: []string{"schemaID_2", "schemaID_3"},
+                               Contents: []*schema.ContentItem{
+                                       {
+                                               Hash:    "hash_2",
+                                               Content: "content_2",
+                                               Summary: "summary_2",
+                                       },
+                                       {
+                                               Hash:    "hash_3",
+                                               Content: "content_3",
+                                               Summary: "summary_3",
+                                       },
+                               },
+                       })
+                       assert.NoError(t, err)
+                       ref, err := schema.Instance().GetRef(schemaContext(), 
&schema.RefRequest{
+                               ServiceID: serviceID,
+                               SchemaID:  "schemaID_2",
+                       })
+                       assert.NoError(t, err)
+                       assert.NotNil(t, ref)
+                       assert.Equal(t, "summary_2", ref.Summary)
+                       assert.Equal(t, "hash_2", ref.Hash)
+                       ref, err = schema.Instance().GetRef(schemaContext(), 
&schema.RefRequest{
+                               ServiceID: serviceID,
+                               SchemaID:  "schemaID_3",
+                       })
+                       assert.NoError(t, err)
+                       assert.NotNil(t, ref)
+                       assert.Equal(t, "summary_3", ref.Summary)
+                       assert.Equal(t, "hash_3", ref.Hash)
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               Action:       csync.UpdateAction,
+                               ResourceType: datasource.ResourceKV,
+                               Status:       csync.PendingStatus,
+                       }
+                       tasks, err := task.List(context.Background(), 
&listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 7, len(tasks))
+                       err = task.Delete(context.Background(), tasks...)
+                       assert.NoError(t, err)
+                       listTaskReq = model.ListTaskRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               Action:       csync.DeleteAction,
+                               ResourceType: datasource.ResourceKV,
+                               Status:       csync.PendingStatus,
+                       }
+                       tasks, err = task.List(context.Background(), 
&listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 2, len(tasks))
+                       err = task.Delete(context.Background(), tasks...)
+                       assert.NoError(t, err)
+                       tombstoneListReq := model.ListTombstoneRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               ResourceType: datasource.ResourceKV,
+                       }
+                       tombstones, err := tombstone.List(context.Background(), 
&tombstoneListReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 2, len(tombstones))
+                       err = tombstone.Delete(context.Background(), 
tombstones...)
+                       assert.NoError(t, err)
+               })
+       })
+
+       t.Run("delete schemas will execute the DeleteRef func and DeleteSchema 
func ", func(t *testing.T) {
+               t.Run("delete schemaID_2 and schemaID_3 will create 4 tasks(2 
from DeleteRef, 2 from DeleteSchema ) "+
+                       "and 4 tombstones (2 from DeleteRef, 2 from 
DeleteSchema) should pass", func(t *testing.T) {
+                       err := schema.Instance().DeleteRef(schemaContext(), 
&schema.RefRequest{
+                               ServiceID: serviceID,
+                               SchemaID:  "schemaID_2",
+                       })
+                       assert.NoError(t, err)
+                       _, err = 
datasource.GetMetadataManager().DeleteSchema(schemaContext(), 
&pb.DeleteSchemaRequest{
+                               ServiceId: serviceID,
+                               SchemaId:  "schemaID_2",
+                       })
+                       assert.Equal(t, schema.ErrSchemaNotFound, err)
+                       err = schema.Instance().DeleteRef(schemaContext(), 
&schema.RefRequest{
+                               ServiceID: serviceID,
+                               SchemaID:  "schemaID_3",
+                       })
+                       assert.NoError(t, err)
+                       _, err = 
datasource.GetMetadataManager().DeleteSchema(schemaContext(), 
&pb.DeleteSchemaRequest{
+                               ServiceId: serviceID,
+                               SchemaId:  "schemaID_3",
+                       })
+                       assert.Equal(t, schema.ErrSchemaNotFound, err)
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               Action:       csync.DeleteAction,
+                               ResourceType: datasource.ResourceKV,
+                               Status:       csync.PendingStatus,
+                       }
+                       tasks, err := task.List(context.Background(), 
&listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 4, len(tasks))
+                       err = task.Delete(context.Background(), tasks...)
+                       assert.NoError(t, err)
+                       tombstoneListReq := model.ListTombstoneRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               ResourceType: datasource.ResourceKV,
+                       }
+                       tombstones, err := tombstone.List(context.Background(), 
&tombstoneListReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 4, len(tombstones))
+                       err = tombstone.Delete(context.Background(), 
tombstones...)
+                       assert.NoError(t, err)
+               })
+       })
+
+       t.Run("unregister micro-service", func(t *testing.T) {
+               t.Run("unregister a micro service will create a task and a 
tombstone should pass", func(t *testing.T) {
+                       resp, err := 
datasource.GetMetadataManager().UnregisterService(schemaContext(), 
&pb.DeleteServiceRequest{
+                               ServiceId: serviceID,
+                               Force:     true,
+                       })
+                       assert.NotNil(t, resp)
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
resp.Response.GetCode())
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               ResourceType: datasource.ResourceService,
+                               Action:       csync.DeleteAction,
+                               Status:       csync.PendingStatus,
+                       }
+                       tasks, err := task.List(schemaContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(context.Background(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(schemaContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+                       tombstoneListReq := model.ListTombstoneRequest{
+                               Domain:       "sync-schema",
+                               Project:      "sync-schema",
+                               ResourceType: datasource.ResourceService,
+                       }
+                       tombstones, err := tombstone.List(schemaContext(), 
&tombstoneListReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tombstones))
+                       err = tombstone.Delete(schemaContext(), tombstones...)
+                       assert.NoError(t, err)
+               })
+       })
+
+       datasource.EnableSync = false
+}

Reply via email to