This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 2c98c5d #1228 replica instance failed, resend again (#1264)
2c98c5d is described below
commit 2c98c5dc0de85c496482a0f887b7d60bb3ae5df6
Author: aseTo2016 <[email protected]>
AuthorDate: Tue Feb 15 18:02:19 2022 +0800
#1228 replica instance failed, resend again (#1264)
Co-authored-by: aseTo2016 <tys201193111>
---
syncer/service/event/manager.go | 20 +++++++++++++++-----
syncer/service/event/manager_test.go | 4 ++++
syncer/service/replicator/replicator_test.go | 4 ++++
syncer/service/replicator/resource/heartbeat.go | 1 +
syncer/service/replicator/resource/heartbeat_test.go | 7 +++++++
syncer/service/replicator/resource/instance.go | 4 ++++
syncer/service/replicator/resource/instance_test.go | 7 +++++++
syncer/service/replicator/resource/resource.go | 5 +++++
8 files changed, 47 insertions(+), 5 deletions(-)
diff --git a/syncer/service/event/manager.go b/syncer/service/event/manager.go
index dc8776c..11148e1 100644
--- a/syncer/service/event/manager.go
+++ b/syncer/service/event/manager.go
@@ -153,10 +153,6 @@ func (e *eventManager) resultHandle(ctx context.Context) {
if !ok {
continue
}
- if res.Error != nil {
- log.Error("result is error ", res.Error)
- continue
- }
id := res.ID
et, ok := e.cache.LoadAndDelete(id)
@@ -165,12 +161,26 @@ func (e *eventManager) resultHandle(ctx context.Context) {
continue
}
- r, result := resource.New(et.(*Event).Event)
+ event := et.(*Event).Event
+ r, result := resource.New(event)
if result != nil {
log.Warn(fmt.Sprintf("new resource failed, %s",
result.Message))
continue
}
+ if res.Error != nil {
+ log.Error(fmt.Sprintf("result is error %s",
event.Flag()), res.Error)
+ if r.CanDrop() {
+ log.Warn(fmt.Sprintf("drop event %s",
event.Flag()))
+ continue
+ }
+ e.Send(&Event{
+ Event: event,
+ })
+
+ continue
+ }
+
toSendEvent, err := r.FailHandle(ctx, res.Data.Code)
if err != nil {
continue
diff --git a/syncer/service/event/manager_test.go
b/syncer/service/event/manager_test.go
index 77e76e1..5230bde 100644
--- a/syncer/service/event/manager_test.go
+++ b/syncer/service/event/manager_test.go
@@ -115,6 +115,10 @@ func (f forkResources) FailHandle(_ context.Context, _
int32) (*v1sync.Event, er
return nil, nil
}
+func (f forkResources) CanDrop() bool {
+ return true
+}
+
func TestNewManager(t *testing.T) {
nm := NewManager(ManagerInternal(defaultInternal),
Replicator(new(mockReplicator)))
assert.NotNil(t, nm)
diff --git a/syncer/service/replicator/replicator_test.go
b/syncer/service/replicator/replicator_test.go
index 38b05e5..d8e4d6d 100644
--- a/syncer/service/replicator/replicator_test.go
+++ b/syncer/service/replicator/replicator_test.go
@@ -85,6 +85,10 @@ func (f mockResources) FailHandle(_ context.Context, _
int32) (*v1sync.Event, er
return nil, nil
}
+func (f mockResources) CanDrop() bool {
+ return true
+}
+
func Test_pageEvents(t *testing.T) {
t.Run("no page case", func(t *testing.T) {
source := &v1sync.EventList{
diff --git a/syncer/service/replicator/resource/heartbeat.go
b/syncer/service/replicator/resource/heartbeat.go
index 0214739..0f085e8 100644
--- a/syncer/service/replicator/resource/heartbeat.go
+++ b/syncer/service/replicator/resource/heartbeat.go
@@ -32,6 +32,7 @@ type heartbeat struct {
input *pb.HeartbeatRequest
manager metadataManager
+ defaultFailHandler
}
func (h *heartbeat) loadInput() error {
diff --git a/syncer/service/replicator/resource/heartbeat_test.go
b/syncer/service/replicator/resource/heartbeat_test.go
index 9ec2df9..4a34e3d 100644
--- a/syncer/service/replicator/resource/heartbeat_test.go
+++ b/syncer/service/replicator/resource/heartbeat_test.go
@@ -66,3 +66,10 @@ func TestHeartbeat(t *testing.T) {
_, err = a.FailHandle(ctx, Success)
assert.Nil(t, err)
}
+
+func TestFailHandlerHeartbeat(t *testing.T) {
+ h := new(heartbeat)
+ assert.True(t, h.CanDrop())
+ _, err := h.FailHandle(context.TODO(), NonImplement)
+ assert.Nil(t, err)
+}
diff --git a/syncer/service/replicator/resource/instance.go
b/syncer/service/replicator/resource/instance.go
index 430b207..10f6aba 100644
--- a/syncer/service/replicator/resource/instance.go
+++ b/syncer/service/replicator/resource/instance.go
@@ -168,6 +168,10 @@ func (i *instance) NeedOperate(ctx context.Context)
*Result {
return c.needOperate(ctx)
}
+func (i *instance) CanDrop() bool {
+ return false
+}
+
func (i *instance) Operate(ctx context.Context) *Result {
return newOperator(i).operate(ctx, i.event.Action)
}
diff --git a/syncer/service/replicator/resource/instance_test.go
b/syncer/service/replicator/resource/instance_test.go
index cce3c78..e66318c 100644
--- a/syncer/service/replicator/resource/instance_test.go
+++ b/syncer/service/replicator/resource/instance_test.go
@@ -157,3 +157,10 @@ func TestNewInstance(t *testing.T) {
i := NewInstance(nil)
assert.NotNil(t, i)
}
+
+func TestFailHandlerInstance(t *testing.T) {
+ h := new(instance)
+ assert.False(t, h.CanDrop())
+ _, err := h.FailHandle(context.TODO(), NonImplement)
+ assert.Nil(t, err)
+}
diff --git a/syncer/service/replicator/resource/resource.go
b/syncer/service/replicator/resource/resource.go
index 5087b05..5e18219 100644
--- a/syncer/service/replicator/resource/resource.go
+++ b/syncer/service/replicator/resource/resource.go
@@ -157,6 +157,7 @@ func NonImplementResult() *Result {
type FailHandler interface {
FailHandle(context.Context, int32) (*v1sync.Event, error)
+ CanDrop() bool
}
type defaultFailHandler struct {
@@ -166,6 +167,10 @@ func (d *defaultFailHandler) FailHandle(context.Context,
int32) (*v1sync.Event,
return nil, nil
}
+func (d *defaultFailHandler) CanDrop() bool {
+ return true
+}
+
type OperateHandler interface {
LoadCurrentResource(context.Context) *Result
NeedOperate(context.Context) *Result