This is an automated email from the ASF dual-hosted git repository. zfeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/master by this push: new 33fa891e handle Unexpected transaction rollback (#828) 33fa891e is described below commit 33fa891ed760ec2af5def1ea5e9936267a8bffcd Author: 1kasa <134709672+1k...@users.noreply.github.com> AuthorDate: Sat Aug 16 20:12:04 2025 +0800 handle Unexpected transaction rollback (#828) * handle Unexpected transaction rollback * fix ci Errpr * Fix wrong location * Fix sort import * Fix error * Fix comment * Fix comment * add Unit Testing * WIP: prepare for rebase * wrong commit * wrong commit * Remove incorrect go mods * sort import --------- Co-authored-by: FengZhang <zfc...@qq.com> --- pkg/datasource/sql/undo/base/undo.go | 9 +- pkg/datasource/sql/undo/executor/executor.go | 3 +- pkg/datasource/sql/undo/executor/executor_test.go | 258 ++++++++++++++++++++++ pkg/util/errors/code.go | 2 + pkg/util/errors/error_code_test.go | 1 + 5 files changed, 270 insertions(+), 3 deletions(-) diff --git a/pkg/datasource/sql/undo/base/undo.go b/pkg/datasource/sql/undo/base/undo.go index 752597fb..b196eee1 100644 --- a/pkg/datasource/sql/undo/base/undo.go +++ b/pkg/datasource/sql/undo/base/undo.go @@ -23,7 +23,6 @@ import ( "database/sql/driver" "encoding/json" "fmt" - "strconv" "strings" @@ -36,6 +35,7 @@ import ( "seata.apache.org/seata-go/pkg/datasource/sql/undo/factor" "seata.apache.org/seata-go/pkg/datasource/sql/undo/parser" "seata.apache.org/seata-go/pkg/util/collection" + serr "seata.apache.org/seata-go/pkg/util/errors" "seata.apache.org/seata-go/pkg/util/log" ) @@ -354,6 +354,10 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid if err = undoExecutor.ExecuteOn(ctx, dbType, conn); err != nil { log.Errorf("execute on fail, err: %v", err) + if undoErr, ok := err.(*serr.SeataError); ok && undoErr.Code == serr.SQLUndoDirtyError { + log.Errorf("Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = %s branchId = %d: %v", xid, branchID, undoErr) + return serr.New(serr.TransactionErrorCodeBranchRollbackFailedUnretriable, "dirty undo log, manual cleanup required", nil) + } return err } } @@ -375,7 +379,8 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid if err = tx.Commit(); err != nil { log.Errorf("[Undo] execute on fail, err: %v", err) - return nil + + return err } return nil } diff --git a/pkg/datasource/sql/undo/executor/executor.go b/pkg/datasource/sql/undo/executor/executor.go index f5b573a6..eafab8e7 100644 --- a/pkg/datasource/sql/undo/executor/executor.go +++ b/pkg/datasource/sql/undo/executor/executor.go @@ -29,6 +29,7 @@ import ( "seata.apache.org/seata-go/pkg/datasource/sql/datasource" "seata.apache.org/seata-go/pkg/datasource/sql/types" "seata.apache.org/seata-go/pkg/datasource/sql/undo" + serr "seata.apache.org/seata-go/pkg/util/errors" "seata.apache.org/seata-go/pkg/util/log" ) @@ -98,7 +99,7 @@ func (b *BaseExecutor) dataValidationAndGoOn(ctx context.Context, conn *sql.Conn newRowJson, _ := json.Marshal(currentImage.Rows) log.Infof("check dirty data failed, old and new data are not equal, "+ "tableName:[%s], oldRows:[%s],newRows:[%s].", afterImage.TableName, oldRowJson, newRowJson) - return false, fmt.Errorf("Has dirty records when undo.") + return false, serr.New(serr.SQLUndoDirtyError, "has dirty records when undo", nil) } } return true, nil diff --git a/pkg/datasource/sql/undo/executor/executor_test.go b/pkg/datasource/sql/undo/executor/executor_test.go new file mode 100644 index 00000000..be1064a1 --- /dev/null +++ b/pkg/datasource/sql/undo/executor/executor_test.go @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package executor + +import ( + "context" + "database/sql" + "encoding/json" + "testing" + + "github.com/agiledragon/gomonkey/v2" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + "seata.apache.org/seata-go/pkg/datasource/sql/types" + "seata.apache.org/seata-go/pkg/datasource/sql/undo" + serr "seata.apache.org/seata-go/pkg/util/errors" + "seata.apache.org/seata-go/pkg/util/log" +) + +type testableBaseExecutor struct { + BaseExecutor + mockCurrentImage *types.RecordImage +} + +func (t *testableBaseExecutor) queryCurrentRecords(ctx context.Context, conn *sql.Conn) (*types.RecordImage, error) { + return t.mockCurrentImage, nil +} + +func (t *testableBaseExecutor) dataValidationAndGoOn(ctx context.Context, conn *sql.Conn) (bool, error) { + if !undo.UndoConfig.DataValidation { + return true, nil + } + beforeImage := t.sqlUndoLog.BeforeImage + afterImage := t.sqlUndoLog.AfterImage + + equals, err := IsRecordsEquals(beforeImage, afterImage) + if err != nil { + return false, err + } + if equals { + log.Infof("Stop rollback because there is no data change between the before data snapshot and the after data snapshot.") + return false, nil + } + + currentImage, err := t.queryCurrentRecords(ctx, conn) + if err != nil { + return false, err + } + + equals, err = IsRecordsEquals(afterImage, currentImage) + if err != nil { + return false, err + } + if !equals { + equals, err = IsRecordsEquals(beforeImage, currentImage) + if err != nil { + return false, err + } + if equals { + log.Infof("Stop rollback because there is no data change between the before data snapshot and the current data snapshot.") + return false, nil + } else { + oldRowJson, _ := json.Marshal(afterImage.Rows) + newRowJson, _ := json.Marshal(currentImage.Rows) + log.Infof("check dirty data failed, old and new data are not equal, "+ + "tableName:[%s], oldRows:[%s],newRows:[%s].", afterImage.TableName, oldRowJson, newRowJson) + return false, serr.New(serr.SQLUndoDirtyError, "has dirty records when undo", nil) + } + } + return true, nil +} +func TestDataValidationAndGoOn(t *testing.T) { + tests := []struct { + name string + beforeImage *types.RecordImage + afterImage *types.RecordImage + currentImage *types.RecordImage + want bool + wantErr bool + }{ + { + name: "before == after, skip rollback", + beforeImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "a"}, + }}, + }, + }, + afterImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "a"}, + }}, + }, + }, + want: false, + wantErr: false, + }, + { + name: "after == current, continue rollback", + beforeImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "a"}, + }}, + }, + }, + afterImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "b"}, + }}, + }, + }, + currentImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "b"}, + }}, + }, + }, + want: true, + wantErr: false, + }, + { + name: "current == before, rollback already done", + beforeImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "a"}, + }}, + }, + }, + afterImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "b"}, + }}, + }, + }, + currentImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "a"}, + }}, + }, + }, + want: false, + wantErr: false, + }, + { + name: "dirty data", + beforeImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "a"}, + }}, + }, + }, + afterImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "b"}, + }}, + }, + }, + currentImage: &types.RecordImage{ + TableName: "t_user", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{ + {ColumnName: "id", Value: 1}, + {ColumnName: "name", Value: "c"}, + }}, + }, + }, + want: false, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // patch UndoConfig + cfgPatch := gomonkey.ApplyGlobalVar(&undo.UndoConfig, undo.Config{DataValidation: true}) + defer cfgPatch.Reset() + + // patch IsRecordsEquals + comparePatch := gomonkey.ApplyFunc(IsRecordsEquals, func(a, b *types.RecordImage) (bool, error) { + aj, _ := json.Marshal(a.Rows) + bj, _ := json.Marshal(b.Rows) + return string(aj) == string(bj), nil + }) + defer comparePatch.Reset() + + executor := &testableBaseExecutor{ + BaseExecutor: BaseExecutor{ + sqlUndoLog: undo.SQLUndoLog{ + BeforeImage: tt.beforeImage, + AfterImage: tt.afterImage, + }, + undoImage: tt.afterImage, + }, + mockCurrentImage: tt.currentImage, + } + + got, err := executor.dataValidationAndGoOn(context.Background(), nil) + + assert.Equal(t, tt.want, got) + if tt.wantErr { + var be *serr.SeataError + if errors.As(err, &be) { + assert.Equal(t, serr.SQLUndoDirtyError, be.Code) + } else { + t.Errorf("expected BusinessError, got: %v", err) + } + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/util/errors/code.go b/pkg/util/errors/code.go index dfe50205..b930380b 100644 --- a/pkg/util/errors/code.go +++ b/pkg/util/errors/code.go @@ -100,4 +100,6 @@ const ( // FencePhaseError have fence phase but is not illegal value FencePhaseError + + SQLUndoDirtyError ) diff --git a/pkg/util/errors/error_code_test.go b/pkg/util/errors/error_code_test.go index 6653beef..08ed5070 100644 --- a/pkg/util/errors/error_code_test.go +++ b/pkg/util/errors/error_code_test.go @@ -51,4 +51,5 @@ func TestTransactionErrorCode(t *testing.T) { assert.Equal(t, int(PrepareFenceError), 24) assert.Equal(t, int(FenceBusinessError), 25) assert.Equal(t, int(FencePhaseError), 26) + assert.Equal(t, int(SQLUndoDirtyError), 27) } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org