This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c98c5d  #1228 replica instance failed, resend again (#1264)
2c98c5d is described below

commit 2c98c5dc0de85c496482a0f887b7d60bb3ae5df6
Author: aseTo2016 <[email protected]>
AuthorDate: Tue Feb 15 18:02:19 2022 +0800

    #1228 replica instance failed, resend again (#1264)
    
    Co-authored-by: aseTo2016 <tys201193111>
---
 syncer/service/event/manager.go                      | 20 +++++++++++++++-----
 syncer/service/event/manager_test.go                 |  4 ++++
 syncer/service/replicator/replicator_test.go         |  4 ++++
 syncer/service/replicator/resource/heartbeat.go      |  1 +
 syncer/service/replicator/resource/heartbeat_test.go |  7 +++++++
 syncer/service/replicator/resource/instance.go       |  4 ++++
 syncer/service/replicator/resource/instance_test.go  |  7 +++++++
 syncer/service/replicator/resource/resource.go       |  5 +++++
 8 files changed, 47 insertions(+), 5 deletions(-)

diff --git a/syncer/service/event/manager.go b/syncer/service/event/manager.go
index dc8776c..11148e1 100644
--- a/syncer/service/event/manager.go
+++ b/syncer/service/event/manager.go
@@ -153,10 +153,6 @@ func (e *eventManager) resultHandle(ctx context.Context) {
                        if !ok {
                                continue
                        }
-                       if res.Error != nil {
-                               log.Error("result is error ", res.Error)
-                               continue
-                       }
 
                        id := res.ID
                        et, ok := e.cache.LoadAndDelete(id)
@@ -165,12 +161,26 @@ func (e *eventManager) resultHandle(ctx context.Context) {
                                continue
                        }
 
-                       r, result := resource.New(et.(*Event).Event)
+                       event := et.(*Event).Event
+                       r, result := resource.New(event)
                        if result != nil {
                                log.Warn(fmt.Sprintf("new resource failed, %s", 
result.Message))
                                continue
                        }
 
+                       if res.Error != nil {
+                               log.Error(fmt.Sprintf("result is error %s", 
event.Flag()), res.Error)
+                               if r.CanDrop() {
+                                       log.Warn(fmt.Sprintf("drop event %s", 
event.Flag()))
+                                       continue
+                               }
+                               e.Send(&Event{
+                                       Event: event,
+                               })
+
+                               continue
+                       }
+
                        toSendEvent, err := r.FailHandle(ctx, res.Data.Code)
                        if err != nil {
                                continue
diff --git a/syncer/service/event/manager_test.go 
b/syncer/service/event/manager_test.go
index 77e76e1..5230bde 100644
--- a/syncer/service/event/manager_test.go
+++ b/syncer/service/event/manager_test.go
@@ -115,6 +115,10 @@ func (f forkResources) FailHandle(_ context.Context, _ 
int32) (*v1sync.Event, er
        return nil, nil
 }
 
+func (f forkResources) CanDrop() bool {
+       return true
+}
+
 func TestNewManager(t *testing.T) {
        nm := NewManager(ManagerInternal(defaultInternal), 
Replicator(new(mockReplicator)))
        assert.NotNil(t, nm)
diff --git a/syncer/service/replicator/replicator_test.go 
b/syncer/service/replicator/replicator_test.go
index 38b05e5..d8e4d6d 100644
--- a/syncer/service/replicator/replicator_test.go
+++ b/syncer/service/replicator/replicator_test.go
@@ -85,6 +85,10 @@ func (f mockResources) FailHandle(_ context.Context, _ 
int32) (*v1sync.Event, er
        return nil, nil
 }
 
+func (f mockResources) CanDrop() bool {
+       return true
+}
+
 func Test_pageEvents(t *testing.T) {
        t.Run("no page case", func(t *testing.T) {
                source := &v1sync.EventList{
diff --git a/syncer/service/replicator/resource/heartbeat.go 
b/syncer/service/replicator/resource/heartbeat.go
index 0214739..0f085e8 100644
--- a/syncer/service/replicator/resource/heartbeat.go
+++ b/syncer/service/replicator/resource/heartbeat.go
@@ -32,6 +32,7 @@ type heartbeat struct {
        input *pb.HeartbeatRequest
 
        manager metadataManager
+       defaultFailHandler
 }
 
 func (h *heartbeat) loadInput() error {
diff --git a/syncer/service/replicator/resource/heartbeat_test.go 
b/syncer/service/replicator/resource/heartbeat_test.go
index 9ec2df9..4a34e3d 100644
--- a/syncer/service/replicator/resource/heartbeat_test.go
+++ b/syncer/service/replicator/resource/heartbeat_test.go
@@ -66,3 +66,10 @@ func TestHeartbeat(t *testing.T) {
        _, err = a.FailHandle(ctx, Success)
        assert.Nil(t, err)
 }
+
+func TestFailHandlerHeartbeat(t *testing.T) {
+       h := new(heartbeat)
+       assert.True(t, h.CanDrop())
+       _, err := h.FailHandle(context.TODO(), NonImplement)
+       assert.Nil(t, err)
+}
diff --git a/syncer/service/replicator/resource/instance.go 
b/syncer/service/replicator/resource/instance.go
index 430b207..10f6aba 100644
--- a/syncer/service/replicator/resource/instance.go
+++ b/syncer/service/replicator/resource/instance.go
@@ -168,6 +168,10 @@ func (i *instance) NeedOperate(ctx context.Context) 
*Result {
        return c.needOperate(ctx)
 }
 
+func (i *instance) CanDrop() bool {
+       return false
+}
+
 func (i *instance) Operate(ctx context.Context) *Result {
        return newOperator(i).operate(ctx, i.event.Action)
 }
diff --git a/syncer/service/replicator/resource/instance_test.go 
b/syncer/service/replicator/resource/instance_test.go
index cce3c78..e66318c 100644
--- a/syncer/service/replicator/resource/instance_test.go
+++ b/syncer/service/replicator/resource/instance_test.go
@@ -157,3 +157,10 @@ func TestNewInstance(t *testing.T) {
        i := NewInstance(nil)
        assert.NotNil(t, i)
 }
+
+func TestFailHandlerInstance(t *testing.T) {
+       h := new(instance)
+       assert.False(t, h.CanDrop())
+       _, err := h.FailHandle(context.TODO(), NonImplement)
+       assert.Nil(t, err)
+}
diff --git a/syncer/service/replicator/resource/resource.go 
b/syncer/service/replicator/resource/resource.go
index 5087b05..5e18219 100644
--- a/syncer/service/replicator/resource/resource.go
+++ b/syncer/service/replicator/resource/resource.go
@@ -157,6 +157,7 @@ func NonImplementResult() *Result {
 
 type FailHandler interface {
        FailHandle(context.Context, int32) (*v1sync.Event, error)
+       CanDrop() bool
 }
 
 type defaultFailHandler struct {
@@ -166,6 +167,10 @@ func (d *defaultFailHandler) FailHandle(context.Context, 
int32) (*v1sync.Event,
        return nil, nil
 }
 
+func (d *defaultFailHandler) CanDrop() bool {
+       return true
+}
+
 type OperateHandler interface {
        LoadCurrentResource(context.Context) *Result
        NeedOperate(context.Context) *Result

Reply via email to