This is an automated email from the ASF dual-hosted git repository.

jimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 34dfa822 test:  Improve test coverage  (#983)
34dfa822 is described below

commit 34dfa8221611f7489e0403abc6b88e4275749739
Author: Eric Wang <[email protected]>
AuthorDate: Mon Nov 17 05:00:50 2025 +0300

    test:  Improve test coverage  (#983)
---
 pkg/datasource/sql/undo/executor/executor_test.go  | 353 ++++++++++++++++++++
 .../executor/mysql_undo_delete_executor_test.go    | 145 +++++++++
 .../executor/mysql_undo_executor_holder_test.go    | 130 ++++++++
 .../executor/mysql_undo_insert_executor_test.go    | 130 ++++++++
 .../executor/mysql_undo_update_executor_test.go    | 159 +++++++++
 pkg/datasource/sql/undo/executor/sql_test.go       | 260 +++++++++++++++
 pkg/datasource/sql/undo/executor/utils_test.go     | 360 +++++++++++++++++++++
 pkg/datasource/sql/undo/mysql/default_test.go      |  62 ++++
 pkg/datasource/sql/undo/mysql/undo_test.go         | 337 +++++++++++++++++++
 .../sql/undo/parser/parser_cache_test.go           |  32 ++
 pkg/datasource/sql/undo/parser/parser_json_test.go |  40 +++
 pkg/datasource/sql/undo/parser/parser_protobuf.go  |   7 +
 .../sql/undo/parser/parser_protobuf_test.go        |  32 ++
 pkg/tm/transaction_executor_test.go                |   1 +
 14 files changed, 2048 insertions(+)

diff --git a/pkg/datasource/sql/undo/executor/executor_test.go 
b/pkg/datasource/sql/undo/executor/executor_test.go
index be1064a1..e3c36b87 100644
--- a/pkg/datasource/sql/undo/executor/executor_test.go
+++ b/pkg/datasource/sql/undo/executor/executor_test.go
@@ -21,8 +21,10 @@ import (
        "context"
        "database/sql"
        "encoding/json"
+       "fmt"
        "testing"
 
+       "github.com/DATA-DOG/go-sqlmock"
        "github.com/agiledragon/gomonkey/v2"
        "github.com/pkg/errors"
        "github.com/stretchr/testify/assert"
@@ -256,3 +258,354 @@ func TestDataValidationAndGoOn(t *testing.T) {
                })
        }
 }
+
+func TestDataValidationAndGoOnDisabledValidation(t *testing.T) {
+       cfgPatch := gomonkey.ApplyGlobalVar(&undo.UndoConfig, 
undo.Config{DataValidation: false})
+       defer cfgPatch.Reset()
+
+       executor := &testableBaseExecutor{
+               BaseExecutor: BaseExecutor{
+                       sqlUndoLog: undo.SQLUndoLog{
+                               BeforeImage: &types.RecordImage{},
+                               AfterImage:  &types.RecordImage{},
+                       },
+               },
+       }
+
+       got, err := executor.dataValidationAndGoOn(context.Background(), nil)
+
+       assert.True(t, got)
+       assert.NoError(t, err)
+}
+
+func TestDataValidationAndGoOnIsRecordsEqualsError(t *testing.T) {
+       t.Skip("Skipping test that requires gomonkey function patching which 
doesn't work well with coverage mode")
+}
+
+func TestQueryCurrentRecordsNilUndoImage(t *testing.T) {
+       executor := &BaseExecutor{
+               undoImage: nil,
+       }
+
+       result, err := executor.queryCurrentRecords(context.Background(), nil)
+
+       assert.Nil(t, result)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "undo image is nil")
+}
+
+func TestQueryCurrentRecordsEmptyPKValues(t *testing.T) {
+       tableMeta := types.TableMeta{
+               TableName: "t_user",
+               Columns: map[string]types.ColumnMeta{
+                       "id": {ColumnName: "id"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "id"},
+                               },
+                       },
+               },
+       }
+
+       executor := &BaseExecutor{
+               undoImage: &types.RecordImage{
+                       TableName: "t_user",
+                       TableMeta: &tableMeta,
+                       Rows:      []types.RowImage{},
+               },
+       }
+
+       result, err := executor.queryCurrentRecords(context.Background(), nil)
+
+       assert.Nil(t, result)
+       assert.NoError(t, err)
+}
+
+func TestQueryCurrentRecordsSuccess(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       assert.NoError(t, err)
+       defer db.Close()
+
+       conn, err := db.Conn(context.Background())
+       assert.NoError(t, err)
+       defer conn.Close()
+
+       tableMeta := types.TableMeta{
+               TableName: "t_user",
+               Columns: map[string]types.ColumnMeta{
+                       "id":   {ColumnName: "id"},
+                       "name": {ColumnName: "name"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "id"},
+                               },
+                       },
+               },
+       }
+
+       executor := &BaseExecutor{
+               undoImage: &types.RecordImage{
+                       TableName: "t_user",
+                       TableMeta: &tableMeta,
+                       Rows: []types.RowImage{
+                               {Columns: []types.ColumnImage{
+                                       {ColumnName: "id", Value: 1},
+                                       {ColumnName: "name", Value: "test"},
+                               }},
+                       },
+               },
+       }
+
+       rows := sqlmock.NewRows([]string{"id", "name"}).
+               AddRow(1, "test_updated")
+
+       mock.ExpectQuery("SELECT \\* FROM t_user WHERE").
+               WithArgs(1).
+               WillReturnRows(rows)
+
+       result, err := executor.queryCurrentRecords(context.Background(), conn)
+
+       assert.NoError(t, err)
+       assert.NotNil(t, result)
+       assert.Equal(t, "t_user", result.TableName)
+       assert.Len(t, result.Rows, 1)
+       assert.Len(t, result.Rows[0].Columns, 2)
+       assert.Equal(t, "id", result.Rows[0].Columns[0].ColumnName)
+       assert.NotNil(t, result.Rows[0].Columns[0].Value)
+       assert.Equal(t, "name", result.Rows[0].Columns[1].ColumnName)
+       assert.NotNil(t, result.Rows[0].Columns[1].Value)
+
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestQueryCurrentRecordsQueryError(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       assert.NoError(t, err)
+       defer db.Close()
+
+       conn, err := db.Conn(context.Background())
+       assert.NoError(t, err)
+       defer conn.Close()
+
+       tableMeta := types.TableMeta{
+               TableName: "t_user",
+               Columns: map[string]types.ColumnMeta{
+                       "id": {ColumnName: "id"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "id"},
+                               },
+                       },
+               },
+       }
+
+       executor := &BaseExecutor{
+               undoImage: &types.RecordImage{
+                       TableName: "t_user",
+                       TableMeta: &tableMeta,
+                       Rows: []types.RowImage{
+                               {Columns: []types.ColumnImage{
+                                       {ColumnName: "id", Value: 1},
+                               }},
+                       },
+               },
+       }
+
+       mock.ExpectQuery("SELECT \\* FROM t_user WHERE").
+               WithArgs(1).
+               WillReturnError(fmt.Errorf("database connection error"))
+
+       result, err := executor.queryCurrentRecords(context.Background(), conn)
+
+       assert.Nil(t, result)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "database connection error")
+
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestQueryCurrentRecordsCompositePrimaryKey(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       assert.NoError(t, err)
+       defer db.Close()
+
+       conn, err := db.Conn(context.Background())
+       assert.NoError(t, err)
+       defer conn.Close()
+
+       tableMeta := types.TableMeta{
+               TableName: "t_order",
+               Columns: map[string]types.ColumnMeta{
+                       "order_id": {ColumnName: "order_id"},
+                       "user_id":  {ColumnName: "user_id"},
+                       "amount":   {ColumnName: "amount"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "order_id"},
+                                       {ColumnName: "user_id"},
+                               },
+                       },
+               },
+       }
+
+       executor := &BaseExecutor{
+               undoImage: &types.RecordImage{
+                       TableName: "t_order",
+                       TableMeta: &tableMeta,
+                       Rows: []types.RowImage{
+                               {Columns: []types.ColumnImage{
+                                       {ColumnName: "order_id", Value: 100},
+                                       {ColumnName: "user_id", Value: 1},
+                                       {ColumnName: "amount", Value: 99.99},
+                               }},
+                       },
+               },
+       }
+
+       rows := sqlmock.NewRows([]string{"order_id", "user_id", "amount"}).
+               AddRow(100, 1, 199.99)
+
+       mock.ExpectQuery("SELECT \\* FROM t_order WHERE").
+               WillReturnRows(rows)
+
+       result, err := executor.queryCurrentRecords(context.Background(), conn)
+
+       assert.NoError(t, err)
+       assert.NotNil(t, result)
+       assert.Equal(t, "t_order", result.TableName)
+       assert.Len(t, result.Rows, 1)
+       assert.Len(t, result.Rows[0].Columns, 3)
+
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestParsePkValuesSinglePK(t *testing.T) {
+       executor := &BaseExecutor{}
+
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1},
+                       {ColumnName: "name", Value: "test1"},
+               }},
+       }
+
+       pkNameList := []string{"id"}
+
+       result := executor.parsePkValues(rows, pkNameList)
+
+       assert.NotNil(t, result)
+       assert.Len(t, result, 1)
+       assert.Contains(t, result, "id")
+       assert.Len(t, result["id"], 1)
+       assert.Equal(t, 1, result["id"][0].Value)
+}
+
+func TestParsePkValuesMultipleRows(t *testing.T) {
+       executor := &BaseExecutor{}
+
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1},
+                       {ColumnName: "name", Value: "test1"},
+               }},
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 2},
+                       {ColumnName: "name", Value: "test2"},
+               }},
+       }
+
+       pkNameList := []string{"id"}
+
+       result := executor.parsePkValues(rows, pkNameList)
+
+       assert.NotNil(t, result)
+       assert.Contains(t, result, "id")
+}
+
+func TestParsePkValuesCompositePK(t *testing.T) {
+       executor := &BaseExecutor{}
+
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "order_id", Value: 100},
+                       {ColumnName: "user_id", Value: 1},
+                       {ColumnName: "amount", Value: 99.99},
+               }},
+       }
+
+       pkNameList := []string{"order_id", "user_id"}
+
+       result := executor.parsePkValues(rows, pkNameList)
+
+       assert.NotNil(t, result)
+       assert.Len(t, result, 2)
+       assert.Contains(t, result, "order_id")
+       assert.Contains(t, result, "user_id")
+       assert.Len(t, result["order_id"], 1)
+       assert.Len(t, result["user_id"], 1)
+       assert.Equal(t, 100, result["order_id"][0].Value)
+       assert.Equal(t, 1, result["user_id"][0].Value)
+}
+
+func TestParsePkValuesCaseInsensitive(t *testing.T) {
+       executor := &BaseExecutor{}
+
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "ID", Value: 1},
+                       {ColumnName: "Name", Value: "test"},
+               }},
+       }
+
+       pkNameList := []string{"id"}
+
+       result := executor.parsePkValues(rows, pkNameList)
+
+       assert.NotNil(t, result)
+       assert.Len(t, result, 1)
+       assert.Contains(t, result, "id")
+       assert.Len(t, result["id"], 1)
+       assert.Equal(t, 1, result["id"][0].Value)
+}
+
+func TestParsePkValuesEmptyRows(t *testing.T) {
+       executor := &BaseExecutor{}
+
+       rows := []types.RowImage{}
+       pkNameList := []string{"id"}
+
+       result := executor.parsePkValues(rows, pkNameList)
+
+       assert.NotNil(t, result)
+       assert.Len(t, result, 0)
+}
+
+func TestParsePkValuesNoMatchingPK(t *testing.T) {
+       executor := &BaseExecutor{}
+
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "name", Value: "test"},
+                       {ColumnName: "age", Value: 25},
+               }},
+       }
+
+       pkNameList := []string{"id"}
+
+       result := executor.parsePkValues(rows, pkNameList)
+
+       assert.NotNil(t, result)
+       assert.Len(t, result, 0)
+}
diff --git 
a/pkg/datasource/sql/undo/executor/mysql_undo_delete_executor_test.go 
b/pkg/datasource/sql/undo/executor/mysql_undo_delete_executor_test.go
new file mode 100644
index 00000000..7ebec5ce
--- /dev/null
+++ b/pkg/datasource/sql/undo/executor/mysql_undo_delete_executor_test.go
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package executor
+
+import (
+       "context"
+       "testing"
+
+       "github.com/DATA-DOG/go-sqlmock"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
+)
+
+func TestNewMySQLUndoDeleteExecutor(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeDelete,
+       }
+
+       executor := newMySQLUndoDeleteExecutor(sqlUndoLog)
+       assert.NotNil(t, executor)
+       assert.NotNil(t, executor.baseExecutor)
+       assert.Equal(t, "test_table", executor.sqlUndoLog.TableName)
+}
+
+func TestMySQLUndoDeleteExecutor_BuildUndoSQL(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeDelete,
+               BeforeImage: &types.RecordImage{
+                       TableName: "test_table",
+                       TableMeta: &types.TableMeta{
+                               TableName: "test_table",
+                               Indexs: map[string]types.IndexMeta{
+                                       "id": {
+                                               IType: 
types.IndexTypePrimaryKey,
+                                               Columns: []types.ColumnMeta{
+                                                       {ColumnName: "id"},
+                                               },
+                                       },
+                               },
+                       },
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "test"},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       executor := newMySQLUndoDeleteExecutor(sqlUndoLog)
+       sql, err := executor.buildUndoSQL(types.DBTypeMySQL)
+       assert.NoError(t, err)
+       assert.Contains(t, sql, "INSERT INTO")
+       assert.Contains(t, sql, "test_table")
+       assert.Contains(t, sql, "VALUES")
+}
+
+func TestMySQLUndoDeleteExecutor_BuildUndoSQL_EmptyRows(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeDelete,
+               BeforeImage: &types.RecordImage{
+                       TableName: "test_table",
+                       Rows:      []types.RowImage{},
+               },
+       }
+
+       executor := newMySQLUndoDeleteExecutor(sqlUndoLog)
+       sql, err := executor.buildUndoSQL(types.DBTypeMySQL)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "invalid undo log")
+       assert.Empty(t, sql)
+}
+
+func TestMySQLUndoDeleteExecutor_ExecuteOn(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeDelete,
+               BeforeImage: &types.RecordImage{
+                       TableName: "test_table",
+                       TableMeta: &types.TableMeta{
+                               TableName: "test_table",
+                               Indexs: map[string]types.IndexMeta{
+                                       "id": {
+                                               IType: 
types.IndexTypePrimaryKey,
+                                               Columns: []types.ColumnMeta{
+                                                       {ColumnName: "id"},
+                                               },
+                                       },
+                               },
+                       },
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "test"},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       executor := newMySQLUndoDeleteExecutor(sqlUndoLog)
+
+       // Mock the prepare and exec
+       mock.ExpectPrepare("INSERT INTO (.+)").
+               ExpectExec().
+               WithArgs("test", 1).
+               WillReturnResult(sqlmock.NewResult(1, 1))
+
+       ctx := context.Background()
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       err = executor.ExecuteOn(ctx, types.DBTypeMySQL, conn)
+       assert.NoError(t, err)
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
diff --git 
a/pkg/datasource/sql/undo/executor/mysql_undo_executor_holder_test.go 
b/pkg/datasource/sql/undo/executor/mysql_undo_executor_holder_test.go
new file mode 100644
index 00000000..8effe15e
--- /dev/null
+++ b/pkg/datasource/sql/undo/executor/mysql_undo_executor_holder_test.go
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package executor
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
+)
+
+func TestNewMySQLUndoExecutorHolder(t *testing.T) {
+       holder := NewMySQLUndoExecutorHolder()
+       assert.NotNil(t, holder)
+       assert.IsType(t, &MySQLUndoExecutorHolder{}, holder)
+}
+
+func TestMySQLUndoExecutorHolder_GetInsertExecutor(t *testing.T) {
+       holder := NewMySQLUndoExecutorHolder()
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeInsert,
+               AfterImage: &types.RecordImage{
+                       TableName: "test_table",
+               },
+       }
+
+       executor := holder.GetInsertExecutor(sqlUndoLog)
+       assert.NotNil(t, executor)
+       assert.IsType(t, &mySQLUndoInsertExecutor{}, executor)
+}
+
+func TestMySQLUndoExecutorHolder_GetUpdateExecutor(t *testing.T) {
+       holder := NewMySQLUndoExecutorHolder()
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeUpdate,
+               BeforeImage: &types.RecordImage{
+                       TableName: "test_table",
+               },
+               AfterImage: &types.RecordImage{
+                       TableName: "test_table",
+               },
+       }
+
+       executor := holder.GetUpdateExecutor(sqlUndoLog)
+       assert.NotNil(t, executor)
+       assert.IsType(t, &mySQLUndoUpdateExecutor{}, executor)
+}
+
+func TestMySQLUndoExecutorHolder_GetDeleteExecutor(t *testing.T) {
+       holder := NewMySQLUndoExecutorHolder()
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeDelete,
+               BeforeImage: &types.RecordImage{
+                       TableName: "test_table",
+               },
+       }
+
+       executor := holder.GetDeleteExecutor(sqlUndoLog)
+       assert.NotNil(t, executor)
+       assert.IsType(t, &mySQLUndoDeleteExecutor{}, executor)
+}
+
+func TestMySQLUndoExecutorHolder_InterfaceImplementation(t *testing.T) {
+       var holder undo.UndoExecutorHolder = NewMySQLUndoExecutorHolder()
+       assert.NotNil(t, holder)
+       assert.Implements(t, (*undo.UndoExecutorHolder)(nil), holder)
+}
+
+func TestMySQLUndoExecutorHolder_AllExecutorTypes(t *testing.T) {
+       holder := NewMySQLUndoExecutorHolder()
+
+       testCases := []struct {
+               name     string
+               sqlType  types.SQLType
+               getFunc  func(undo.SQLUndoLog) undo.UndoExecutor
+               expected interface{}
+       }{
+               {
+                       name:     "Insert Executor",
+                       sqlType:  types.SQLTypeInsert,
+                       getFunc:  holder.GetInsertExecutor,
+                       expected: &mySQLUndoInsertExecutor{},
+               },
+               {
+                       name:     "Update Executor",
+                       sqlType:  types.SQLTypeUpdate,
+                       getFunc:  holder.GetUpdateExecutor,
+                       expected: &mySQLUndoUpdateExecutor{},
+               },
+               {
+                       name:     "Delete Executor",
+                       sqlType:  types.SQLTypeDelete,
+                       getFunc:  holder.GetDeleteExecutor,
+                       expected: &mySQLUndoDeleteExecutor{},
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       sqlUndoLog := undo.SQLUndoLog{
+                               TableName: "test_table",
+                               SQLType:   tc.sqlType,
+                       }
+
+                       executor := tc.getFunc(sqlUndoLog)
+                       assert.NotNil(t, executor)
+                       assert.IsType(t, tc.expected, executor)
+               })
+       }
+}
diff --git 
a/pkg/datasource/sql/undo/executor/mysql_undo_insert_executor_test.go 
b/pkg/datasource/sql/undo/executor/mysql_undo_insert_executor_test.go
new file mode 100644
index 00000000..5abf4711
--- /dev/null
+++ b/pkg/datasource/sql/undo/executor/mysql_undo_insert_executor_test.go
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package executor
+
+import (
+       "context"
+       "testing"
+
+       "github.com/DATA-DOG/go-sqlmock"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
+)
+
+func TestNewMySQLUndoInsertExecutor(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeInsert,
+       }
+
+       executor := newMySQLUndoInsertExecutor(sqlUndoLog)
+       assert.NotNil(t, executor)
+       assert.Equal(t, "test_table", executor.sqlUndoLog.TableName)
+}
+
+func TestMySQLUndoInsertExecutor_BuildUndoSQL(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeInsert,
+               AfterImage: &types.RecordImage{
+                       TableName: "test_table",
+                       TableMeta: &types.TableMeta{
+                               TableName: "test_table",
+                       },
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "test"},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       executor := newMySQLUndoInsertExecutor(sqlUndoLog)
+       sql, err := executor.buildUndoSQL(types.DBTypeMySQL)
+       assert.NoError(t, err)
+       assert.Contains(t, sql, "DELETE FROM")
+       assert.Contains(t, sql, "test_table")
+       assert.Contains(t, sql, "WHERE")
+}
+
+func TestMySQLUndoInsertExecutor_BuildUndoSQL_EmptyRows(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeInsert,
+               AfterImage: &types.RecordImage{
+                       TableName: "test_table",
+                       Rows:      []types.RowImage{},
+               },
+       }
+
+       executor := newMySQLUndoInsertExecutor(sqlUndoLog)
+       sql, err := executor.buildUndoSQL(types.DBTypeMySQL)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "invalid undo log")
+       assert.Empty(t, sql)
+}
+
+func TestMySQLUndoInsertExecutor_ExecuteOn(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeInsert,
+               AfterImage: &types.RecordImage{
+                       TableName: "test_table",
+                       TableMeta: &types.TableMeta{
+                               TableName: "test_table",
+                       },
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "test"},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       executor := newMySQLUndoInsertExecutor(sqlUndoLog)
+
+       // Mock the prepare and exec
+       mock.ExpectPrepare("DELETE FROM (.+)").
+               ExpectExec().
+               WithArgs(1).
+               WillReturnResult(sqlmock.NewResult(0, 1))
+
+       ctx := context.Background()
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       err = executor.ExecuteOn(ctx, types.DBTypeMySQL, conn)
+       // Note: BaseExecutor.ExecuteOn is nil, so this will likely succeed
+       // We're testing that the method is callable and handles the input
+       assert.NoError(t, err)
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
diff --git 
a/pkg/datasource/sql/undo/executor/mysql_undo_update_executor_test.go 
b/pkg/datasource/sql/undo/executor/mysql_undo_update_executor_test.go
new file mode 100644
index 00000000..1c5c82c8
--- /dev/null
+++ b/pkg/datasource/sql/undo/executor/mysql_undo_update_executor_test.go
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package executor
+
+import (
+       "context"
+       "testing"
+
+       "github.com/DATA-DOG/go-sqlmock"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
+)
+
+func TestNewMySQLUndoUpdateExecutor(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeUpdate,
+       }
+
+       executor := newMySQLUndoUpdateExecutor(sqlUndoLog)
+       assert.NotNil(t, executor)
+       assert.NotNil(t, executor.baseExecutor)
+       assert.Equal(t, "test_table", executor.sqlUndoLog.TableName)
+}
+
+func TestMySQLUndoUpdateExecutor_BuildUndoSQL(t *testing.T) {
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeUpdate,
+               BeforeImage: &types.RecordImage{
+                       TableName: "test_table",
+                       TableMeta: &types.TableMeta{
+                               TableName: "test_table",
+                       },
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "old_name"},
+                                       },
+                               },
+                       },
+               },
+               AfterImage: &types.RecordImage{
+                       TableName: "test_table",
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "new_name"},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       executor := newMySQLUndoUpdateExecutor(sqlUndoLog)
+       sql, err := executor.buildUndoSQL(types.DBTypeMySQL)
+       assert.NoError(t, err)
+       assert.Contains(t, sql, "UPDATE")
+       assert.Contains(t, sql, "test_table")
+       assert.Contains(t, sql, "SET")
+       assert.Contains(t, sql, "WHERE")
+}
+
+func TestMySQLUndoUpdateExecutor_ExecuteOn_DataValidationDisabled(t 
*testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       // Disable data validation
+       originalConfig := undo.UndoConfig
+       undo.UndoConfig.DataValidation = false
+       defer func() { undo.UndoConfig = originalConfig }()
+
+       sqlUndoLog := undo.SQLUndoLog{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeUpdate,
+               BeforeImage: &types.RecordImage{
+                       TableName: "test_table",
+                       TableMeta: &types.TableMeta{
+                               TableName: "test_table",
+                               Indexs: map[string]types.IndexMeta{
+                                       "id": {
+                                               IType: 
types.IndexTypePrimaryKey,
+                                               Columns: []types.ColumnMeta{
+                                                       {ColumnName: "id"},
+                                               },
+                                       },
+                               },
+                       },
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "old"},
+                                       },
+                               },
+                       },
+               },
+               AfterImage: &types.RecordImage{
+                       TableName: "test_table",
+                       TableMeta: &types.TableMeta{
+                               TableName: "test_table",
+                               Indexs: map[string]types.IndexMeta{
+                                       "id": {
+                                               IType: 
types.IndexTypePrimaryKey,
+                                               Columns: []types.ColumnMeta{
+                                                       {ColumnName: "id"},
+                                               },
+                                       },
+                               },
+                       },
+                       Rows: []types.RowImage{
+                               {
+                                       Columns: []types.ColumnImage{
+                                               {ColumnName: "id", KeyType: 
types.IndexTypePrimaryKey, Value: 1},
+                                               {ColumnName: "name", KeyType: 
types.IndexTypeNull, Value: "new"},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       executor := newMySQLUndoUpdateExecutor(sqlUndoLog)
+
+       // Mock the prepare and exec
+       mock.ExpectPrepare("UPDATE (.+)").
+               ExpectExec().
+               WithArgs("old", 1).
+               WillReturnResult(sqlmock.NewResult(0, 1))
+
+       ctx := context.Background()
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       err = executor.ExecuteOn(ctx, types.DBTypeMySQL, conn)
+       assert.NoError(t, err)
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
diff --git a/pkg/datasource/sql/undo/executor/sql_test.go 
b/pkg/datasource/sql/undo/executor/sql_test.go
index 00aaf7f4..667d3f5f 100644
--- a/pkg/datasource/sql/undo/executor/sql_test.go
+++ b/pkg/datasource/sql/undo/executor/sql_test.go
@@ -54,3 +54,263 @@ func TestAddEscape(t *testing.T) {
                assert.Equal(t, "`"+v+"`", res)
        }
 }
+
+func TestDelEscapeEmptyString(t *testing.T) {
+       result := DelEscape("", types.DBTypeMySQL)
+       assert.Equal(t, "", result)
+}
+
+func TestDelEscapeNoEscape(t *testing.T) {
+       result := DelEscape("scheme.id", types.DBTypeMySQL)
+       assert.Equal(t, "scheme.id", result)
+}
+
+func TestDelEscapePartialEscapeSchemaOnly(t *testing.T) {
+       result := DelEscape(`"scheme".id`, types.DBTypeMySQL)
+       assert.Equal(t, "scheme.id", result)
+
+       result = DelEscape("`scheme`.id", types.DBTypeMySQL)
+       assert.Equal(t, "scheme.id", result)
+}
+
+func TestDelEscapePartialEscapeColumnOnly(t *testing.T) {
+       result := DelEscape(`scheme."id"`, types.DBTypeMySQL)
+       assert.Equal(t, "scheme.id", result)
+
+       result = DelEscape("scheme.`id`", types.DBTypeMySQL)
+       assert.Equal(t, "scheme.id", result)
+}
+
+func TestDelEscapeSingleColumn(t *testing.T) {
+       result := DelEscape(`"id"`, types.DBTypeMySQL)
+       assert.Equal(t, "id", result)
+
+       result = DelEscape("`id`", types.DBTypeMySQL)
+       assert.Equal(t, "id", result)
+}
+
+func TestAddEscapeEmptyString(t *testing.T) {
+       result := AddEscape("", types.DBTypeMySQL)
+       assert.Equal(t, "", result)
+}
+
+func TestAddEscapeAlreadyEscaped(t *testing.T) {
+       result := AddEscape("`ALTER`", types.DBTypeMySQL)
+       assert.Equal(t, "`ALTER`", result)
+
+       result = AddEscape(`"ALTER"`, types.DBTypePostgreSQL)
+       assert.Equal(t, `"ALTER"`, result)
+}
+
+func TestAddEscapeNonKeyword(t *testing.T) {
+       result := AddEscape("my_column", types.DBTypeMySQL)
+       assert.Equal(t, "my_column", result)
+
+       result = AddEscape("user_name", types.DBTypeMySQL)
+       assert.Equal(t, "user_name", result)
+}
+
+func TestAddEscapeSchemaColumnKeyword(t *testing.T) {
+       result := AddEscape("ALTER", types.DBTypeMySQL)
+       assert.Equal(t, "`ALTER`", result)
+
+       result = AddEscape("SELECT", types.DBTypeMySQL)
+       assert.Equal(t, "`SELECT`", result)
+}
+
+func TestAddEscapePostgreSQL(t *testing.T) {
+       result := AddEscape("user", types.DBTypePostgreSQL)
+       assert.Equal(t, `"user"`, result)
+
+       result = AddEscape("my_table.user", types.DBTypePostgreSQL)
+       assert.Equal(t, `"my_table"."user"`, result)
+}
+
+func TestAddEscapeOracle(t *testing.T) {
+       result := AddEscape("user", types.DBTypeOracle)
+       assert.Equal(t, `"user"`, result)
+}
+
+func TestAddEscapeSQLServer(t *testing.T) {
+       result := AddEscape("user", types.DBTypeSQLServer)
+       assert.Equal(t, `"user"`, result)
+}
+
+func TestCheckEscapeMySQLKeywords(t *testing.T) {
+       keywords := []string{"SELECT", "INSERT", "UPDATE", "DELETE", "ALTER", 
"CREATE", "DROP", "TABLE"}
+       for _, keyword := range keywords {
+               result := checkEscape(keyword, types.DBTypeMySQL)
+               assert.True(t, result, "Expected %s to be a MySQL keyword", 
keyword)
+
+               lowerKeyword := keyword
+               result = checkEscape(lowerKeyword, types.DBTypeMySQL)
+               assert.True(t, result, "Expected %s to be a MySQL keyword 
(case-insensitive)", lowerKeyword)
+       }
+}
+
+func TestCheckEscapeNonKeyword(t *testing.T) {
+       result := checkEscape("my_column", types.DBTypeMySQL)
+       assert.False(t, result)
+
+       result = checkEscape("user_name", types.DBTypeMySQL)
+       assert.False(t, result)
+}
+
+func TestCheckEscapeNonMySQL(t *testing.T) {
+       result := checkEscape("anything", types.DBTypePostgreSQL)
+       assert.True(t, result)
+
+       result = checkEscape("anything", types.DBTypeOracle)
+       assert.True(t, result)
+
+       result = checkEscape("anything", types.DBTypeSQLServer)
+       assert.True(t, result)
+}
+
+func TestGetOrderedPkListSinglePK(t *testing.T) {
+       tableMeta := types.TableMeta{
+               TableName: "t_user",
+               Columns: map[string]types.ColumnMeta{
+                       "id":   {ColumnName: "id"},
+                       "name": {ColumnName: "name"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "id"},
+                               },
+                       },
+               },
+       }
+
+       image := &types.RecordImage{
+               TableName: "t_user",
+               TableMeta: &tableMeta,
+       }
+
+       row := types.RowImage{
+               Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test", KeyType: 
types.IndexTypeNull},
+               },
+       }
+
+       result, err := GetOrderedPkList(image, row, types.DBTypeMySQL)
+
+       assert.NoError(t, err)
+       assert.NotNil(t, result)
+       assert.Len(t, result, 1)
+       assert.Equal(t, "id", result[0].ColumnName)
+       assert.Equal(t, 1, result[0].Value)
+}
+
+func TestGetOrderedPkListCompositePK(t *testing.T) {
+       tableMeta := types.TableMeta{
+               TableName: "t_order",
+               Columns: map[string]types.ColumnMeta{
+                       "order_id": {ColumnName: "order_id"},
+                       "user_id":  {ColumnName: "user_id"},
+                       "amount":   {ColumnName: "amount"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "order_id"},
+                                       {ColumnName: "user_id"},
+                               },
+                       },
+               },
+       }
+
+       image := &types.RecordImage{
+               TableName: "t_order",
+               TableMeta: &tableMeta,
+       }
+
+       row := types.RowImage{
+               Columns: []types.ColumnImage{
+                       {ColumnName: "user_id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "order_id", Value: 100, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "amount", Value: 99.99, KeyType: 
types.IndexTypeNull},
+               },
+       }
+
+       result, err := GetOrderedPkList(image, row, types.DBTypeMySQL)
+
+       assert.NoError(t, err)
+       assert.NotNil(t, result)
+       assert.Len(t, result, 2)
+       assert.Equal(t, "order_id", result[0].ColumnName)
+       assert.Equal(t, 100, result[0].Value)
+       assert.Equal(t, "user_id", result[1].ColumnName)
+       assert.Equal(t, 1, result[1].Value)
+}
+
+func TestGetOrderedPkListWithEscapedNames(t *testing.T) {
+       tableMeta := types.TableMeta{
+               TableName: "t_user",
+               Columns: map[string]types.ColumnMeta{
+                       "id": {ColumnName: "id"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "id"},
+                               },
+                       },
+               },
+       }
+
+       image := &types.RecordImage{
+               TableName: "t_user",
+               TableMeta: &tableMeta,
+       }
+
+       row := types.RowImage{
+               Columns: []types.ColumnImage{
+                       {ColumnName: "`id`", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+               },
+       }
+
+       result, err := GetOrderedPkList(image, row, types.DBTypeMySQL)
+
+       assert.NoError(t, err)
+       assert.NotNil(t, result)
+       assert.Len(t, result, 1)
+       assert.Equal(t, "id", result[0].ColumnName)
+}
+
+func TestGetOrderedPkListEmptyRow(t *testing.T) {
+       tableMeta := types.TableMeta{
+               TableName: "t_user",
+               Columns: map[string]types.ColumnMeta{
+                       "id": {ColumnName: "id"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               IType: types.IndexTypePrimaryKey,
+                               Columns: []types.ColumnMeta{
+                                       {ColumnName: "id"},
+                               },
+                       },
+               },
+       }
+
+       image := &types.RecordImage{
+               TableName: "t_user",
+               TableMeta: &tableMeta,
+       }
+
+       row := types.RowImage{
+               Columns: []types.ColumnImage{},
+       }
+
+       result, err := GetOrderedPkList(image, row, types.DBTypeMySQL)
+
+       assert.NoError(t, err)
+       assert.NotNil(t, result)
+       assert.Len(t, result, 0)
+}
diff --git a/pkg/datasource/sql/undo/executor/utils_test.go 
b/pkg/datasource/sql/undo/executor/utils_test.go
new file mode 100644
index 00000000..edc99a1f
--- /dev/null
+++ b/pkg/datasource/sql/undo/executor/utils_test.go
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package executor
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+)
+
+func TestIsRecordsEquals_BothNil(t *testing.T) {
+       equal, err := IsRecordsEquals(nil, nil)
+       assert.NoError(t, err)
+       assert.True(t, equal)
+}
+
+func TestIsRecordsEquals_OneNil(t *testing.T) {
+       image := &types.RecordImage{
+               TableName: "test",
+               Rows:      []types.RowImage{},
+       }
+
+       equal, err := IsRecordsEquals(image, nil)
+       assert.NoError(t, err)
+       assert.False(t, equal)
+
+       equal, err = IsRecordsEquals(nil, image)
+       assert.NoError(t, err)
+       assert.False(t, equal)
+}
+
+func TestIsRecordsEquals_DifferentTableNames(t *testing.T) {
+       image1 := &types.RecordImage{
+               TableName: "table1",
+               Rows:      []types.RowImage{},
+       }
+       image2 := &types.RecordImage{
+               TableName: "table2",
+               Rows:      []types.RowImage{},
+       }
+
+       equal, err := IsRecordsEquals(image1, image2)
+       assert.NoError(t, err)
+       assert.False(t, equal)
+}
+
+func TestIsRecordsEquals_DifferentRowCounts(t *testing.T) {
+       image1 := &types.RecordImage{
+               TableName: "test",
+               Rows: []types.RowImage{
+                       {Columns: []types.ColumnImage{{ColumnName: "id", Value: 
1}}},
+               },
+       }
+       image2 := &types.RecordImage{
+               TableName: "test",
+               Rows: []types.RowImage{
+                       {Columns: []types.ColumnImage{{ColumnName: "id", Value: 
1}}},
+                       {Columns: []types.ColumnImage{{ColumnName: "id", Value: 
2}}},
+               },
+       }
+
+       equal, err := IsRecordsEquals(image1, image2)
+       assert.NoError(t, err)
+       assert.False(t, equal)
+}
+
+func TestIsRecordsEquals_EmptyRows(t *testing.T) {
+       image1 := &types.RecordImage{
+               TableName: "test",
+               Rows:      []types.RowImage{},
+       }
+       image2 := &types.RecordImage{
+               TableName: "test",
+               Rows:      []types.RowImage{},
+       }
+
+       equal, err := IsRecordsEquals(image1, image2)
+       assert.NoError(t, err)
+       assert.True(t, equal)
+}
+
+func TestIsRecordsEquals_SameData(t *testing.T) {
+       tableMeta := &types.TableMeta{
+               TableName: "test_table",
+               Columns: map[string]types.ColumnMeta{
+                       "id": {ColumnName: "id"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               Name:    "PRIMARY",
+                               Columns: []types.ColumnMeta{{ColumnName: "id"}},
+                               IType:   types.IndexTypePrimaryKey,
+                       },
+               },
+       }
+
+       image1 := &types.RecordImage{
+               TableName: "test_table",
+               TableMeta: tableMeta,
+               Rows: []types.RowImage{
+                       {Columns: []types.ColumnImage{
+                               {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                               {ColumnName: "name", Value: "test"},
+                       }},
+               },
+       }
+       image2 := &types.RecordImage{
+               TableName: "test_table",
+               TableMeta: tableMeta,
+               Rows: []types.RowImage{
+                       {Columns: []types.ColumnImage{
+                               {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                               {ColumnName: "name", Value: "test"},
+                       }},
+               },
+       }
+
+       equal, err := IsRecordsEquals(image1, image2)
+       assert.NoError(t, err)
+       assert.True(t, equal)
+}
+
+func TestIsRecordsEquals_DifferentData(t *testing.T) {
+       tableMeta := &types.TableMeta{
+               TableName: "test_table",
+               Columns: map[string]types.ColumnMeta{
+                       "id": {ColumnName: "id"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               Name:    "PRIMARY",
+                               Columns: []types.ColumnMeta{{ColumnName: "id"}},
+                               IType:   types.IndexTypePrimaryKey,
+                       },
+               },
+       }
+
+       image1 := &types.RecordImage{
+               TableName: "test_table",
+               TableMeta: tableMeta,
+               Rows: []types.RowImage{
+                       {Columns: []types.ColumnImage{
+                               {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                               {ColumnName: "name", Value: "test1"},
+                       }},
+               },
+       }
+       image2 := &types.RecordImage{
+               TableName: "test_table",
+               TableMeta: tableMeta,
+               Rows: []types.RowImage{
+                       {Columns: []types.ColumnImage{
+                               {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                               {ColumnName: "name", Value: "test2"},
+                       }},
+               },
+       }
+
+       equal, err := IsRecordsEquals(image1, image2)
+       assert.NoError(t, err)
+       assert.False(t, equal)
+}
+
+func TestCompareRows_DifferentPrimaryKeys(t *testing.T) {
+       tableMeta := types.TableMeta{
+               TableName: "test_table",
+               Columns: map[string]types.ColumnMeta{
+                       "id": {ColumnName: "id"},
+               },
+               Indexs: map[string]types.IndexMeta{
+                       "PRIMARY": {
+                               Name:    "PRIMARY",
+                               Columns: []types.ColumnMeta{{ColumnName: "id"}},
+                               IType:   types.IndexTypePrimaryKey,
+                       },
+               },
+       }
+
+       oldRows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test"},
+               }},
+       }
+       newRows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 2, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test"},
+               }},
+       }
+
+       equal, err := compareRows(tableMeta, oldRows, newRows)
+       assert.NoError(t, err)
+       assert.False(t, equal)
+}
+
+func TestRowListToMap(t *testing.T) {
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test"},
+               }},
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 2, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test2"},
+               }},
+       }
+
+       primaryKeyList := []string{"id"}
+       rowMap := rowListToMap(rows, primaryKeyList)
+
+       assert.NotNil(t, rowMap)
+       assert.Equal(t, 2, len(rowMap))
+       assert.NotNil(t, rowMap["1"])
+       assert.NotNil(t, rowMap["2"])
+       assert.Equal(t, "test", rowMap["1"]["NAME"])
+       assert.Equal(t, "test2", rowMap["2"]["NAME"])
+}
+
+func TestRowListToMap_CompositePrimaryKey(t *testing.T) {
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "user_id", Value: 100, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test"},
+               }},
+       }
+
+       primaryKeyList := []string{"id", "user_id"}
+       rowMap := rowListToMap(rows, primaryKeyList)
+
+       assert.NotNil(t, rowMap)
+       assert.Equal(t, 1, len(rowMap))
+       // The key should be composite: "1_##$$_100"
+       var foundKey string
+       for k := range rowMap {
+               foundKey = k
+               break
+       }
+       assert.Contains(t, foundKey, "1")
+       assert.Contains(t, foundKey, "100")
+}
+
+func TestBuildWhereConditionByPKs_SingleBatch(t *testing.T) {
+       pkNameList := []string{"id"}
+       rowSize := 5
+       maxInSize := 1000
+
+       whereSQL := buildWhereConditionByPKs(pkNameList, rowSize, maxInSize)
+
+       assert.NotEmpty(t, whereSQL)
+       assert.Contains(t, whereSQL, "(`id`) IN")
+       assert.Contains(t, whereSQL, "(?)")     // Should have 5 placeholders
+       assert.NotContains(t, whereSQL, " OR ") // Single batch, no OR
+}
+
+func TestBuildWhereConditionByPKs_MultipleBatches(t *testing.T) {
+       pkNameList := []string{"id"}
+       rowSize := 2500
+       maxInSize := 1000
+
+       whereSQL := buildWhereConditionByPKs(pkNameList, rowSize, maxInSize)
+
+       assert.NotEmpty(t, whereSQL)
+       assert.Contains(t, whereSQL, "(`id`) IN")
+       assert.Contains(t, whereSQL, " OR ") // Multiple batches
+}
+
+func TestBuildWhereConditionByPKs_CompositePK(t *testing.T) {
+       pkNameList := []string{"id", "user_id"}
+       rowSize := 3
+       maxInSize := 1000
+
+       whereSQL := buildWhereConditionByPKs(pkNameList, rowSize, maxInSize)
+
+       assert.NotEmpty(t, whereSQL)
+       assert.Contains(t, whereSQL, "(`id`,`user_id`) IN")
+       assert.Contains(t, whereSQL, "(?,?)")
+}
+
+func TestBuildWhereConditionByPKs_ExactMultiple(t *testing.T) {
+       pkNameList := []string{"id"}
+       rowSize := 2000
+       maxInSize := 1000
+
+       whereSQL := buildWhereConditionByPKs(pkNameList, rowSize, maxInSize)
+
+       assert.NotEmpty(t, whereSQL)
+       assert.Contains(t, whereSQL, " OR ") // Should have 2 batches
+}
+
+func TestBuildPKParams(t *testing.T) {
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test1"},
+               }},
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 2, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test2"},
+               }},
+       }
+
+       pkNameList := []string{"id"}
+       params := buildPKParams(rows, pkNameList)
+
+       assert.NotNil(t, params)
+       assert.Equal(t, 2, len(params))
+       assert.Equal(t, 1, params[0])
+       assert.Equal(t, 2, params[1])
+}
+
+func TestBuildPKParams_CompositePK(t *testing.T) {
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "id", Value: 1, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "user_id", Value: 100, KeyType: 
types.IndexTypePrimaryKey},
+                       {ColumnName: "name", Value: "test"},
+               }},
+       }
+
+       pkNameList := []string{"id", "user_id"}
+       params := buildPKParams(rows, pkNameList)
+
+       assert.NotNil(t, params)
+       assert.Equal(t, 2, len(params))
+       assert.Equal(t, 1, params[0])
+       assert.Equal(t, 100, params[1])
+}
+
+func TestBuildPKParams_NoPK(t *testing.T) {
+       rows := []types.RowImage{
+               {Columns: []types.ColumnImage{
+                       {ColumnName: "name", Value: "test"},
+               }},
+       }
+
+       pkNameList := []string{"id"}
+       params := buildPKParams(rows, pkNameList)
+
+       assert.NotNil(t, params)
+       assert.Equal(t, 0, len(params))
+}
diff --git a/pkg/datasource/sql/undo/mysql/default_test.go 
b/pkg/datasource/sql/undo/mysql/default_test.go
new file mode 100644
index 00000000..c51351fe
--- /dev/null
+++ b/pkg/datasource/sql/undo/mysql/default_test.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mysql
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
+)
+
+func TestInitUndoLogManager(t *testing.T) {
+       // Call InitUndoLogManager
+       InitUndoLogManager()
+
+       // Verify that the manager was registered
+       manager, err := undo.GetUndoLogManager(types.DBTypeMySQL)
+       assert.NoError(t, err)
+       assert.NotNil(t, manager)
+       assert.Equal(t, types.DBTypeMySQL, manager.DBType())
+}
+
+func TestInitUndoLogManager_Multiple(t *testing.T) {
+       // Calling InitUndoLogManager multiple times should not panic
+       // The second call will be a no-op since the manager is already 
registered
+       InitUndoLogManager()
+       InitUndoLogManager()
+
+       // Verify manager is still accessible
+       manager, err := undo.GetUndoLogManager(types.DBTypeMySQL)
+       assert.NoError(t, err)
+       assert.NotNil(t, manager)
+}
+
+func TestInitUndoLogManager_ManagerType(t *testing.T) {
+       InitUndoLogManager()
+
+       manager, err := undo.GetUndoLogManager(types.DBTypeMySQL)
+       assert.NoError(t, err)
+       assert.NotNil(t, manager)
+
+       // Verify it's the correct type
+       _, ok := manager.(*undoLogManager)
+       assert.True(t, ok, "Manager should be of type *undoLogManager")
+}
diff --git a/pkg/datasource/sql/undo/mysql/undo_test.go 
b/pkg/datasource/sql/undo/mysql/undo_test.go
new file mode 100644
index 00000000..2557a140
--- /dev/null
+++ b/pkg/datasource/sql/undo/mysql/undo_test.go
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mysql
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "testing"
+
+       "github.com/DATA-DOG/go-sqlmock"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "seata.apache.org/seata-go/pkg/datasource/sql/types"
+       "seata.apache.org/seata-go/pkg/datasource/sql/undo"
+)
+
+func TestNewUndoLogManager(t *testing.T) {
+       manager := NewUndoLogManager()
+       assert.NotNil(t, manager)
+       assert.NotNil(t, manager.Base)
+}
+
+func TestUndoLogManager_Init(t *testing.T) {
+       manager := NewUndoLogManager()
+       // Init should not panic
+       manager.Init()
+}
+
+func TestUndoLogManager_DBType(t *testing.T) {
+       manager := NewUndoLogManager()
+       dbType := manager.DBType()
+       assert.Equal(t, types.DBTypeMySQL, dbType)
+}
+
+func TestUndoLogManager_DeleteUndoLog(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       manager := NewUndoLogManager()
+       ctx := context.Background()
+       xid := "test-xid-123"
+       branchID := int64(456)
+
+       // Mock the connection
+       mock.ExpectBegin()
+       mock.ExpectPrepare("DELETE FROM (.+) WHERE branch_id = \\? AND xid = 
\\?").
+               ExpectExec().
+               WithArgs(branchID, xid).
+               WillReturnResult(sqlmock.NewResult(0, 1))
+       mock.ExpectCommit()
+
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       // Start transaction to get the conn in proper state
+       tx, err := conn.BeginTx(ctx, nil)
+       require.NoError(t, err)
+
+       err = manager.DeleteUndoLog(ctx, xid, branchID, conn)
+       assert.NoError(t, err)
+
+       tx.Commit()
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestUndoLogManager_DeleteUndoLog_PrepareError(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       manager := NewUndoLogManager()
+       ctx := context.Background()
+       xid := "test-xid-123"
+       branchID := int64(456)
+
+       // Mock connection and prepare error
+       mock.ExpectBegin()
+       mock.ExpectPrepare("DELETE FROM (.+) WHERE branch_id = \\? AND xid = 
\\?").
+               WillReturnError(sql.ErrConnDone)
+       mock.ExpectRollback()
+
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       tx, err := conn.BeginTx(ctx, nil)
+       require.NoError(t, err)
+
+       err = manager.DeleteUndoLog(ctx, xid, branchID, conn)
+       assert.Error(t, err)
+
+       tx.Rollback()
+}
+
+func TestUndoLogManager_BatchDeleteUndoLog(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       manager := NewUndoLogManager()
+       ctx := context.Background()
+       xids := []string{"xid-1", "xid-2"}
+       branchIDs := []int64{100, 200}
+
+       // Mock the batch delete
+       mock.ExpectBegin()
+       mock.ExpectPrepare("DELETE FROM (.+) WHERE branch_id IN (.+) AND xid IN 
(.+)").
+               ExpectExec().
+               WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()).
+               WillReturnResult(sqlmock.NewResult(0, 2))
+       mock.ExpectCommit()
+
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       tx, err := conn.BeginTx(ctx, nil)
+       require.NoError(t, err)
+
+       err = manager.BatchDeleteUndoLog(xids, branchIDs, conn)
+       assert.NoError(t, err)
+
+       tx.Commit()
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestUndoLogManager_BatchDeleteUndoLog_EmptySlices(t *testing.T) {
+       db, _, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       manager := NewUndoLogManager()
+       ctx := context.Background()
+
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       // Test with empty slices
+       err = manager.BatchDeleteUndoLog([]string{}, []int64{}, conn)
+       assert.Error(t, err)
+}
+
+func TestUndoLogManager_FlushUndoLog(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       manager := NewUndoLogManager()
+
+       // Create a mock driver.Conn
+       driverConn, err := db.Conn(context.Background())
+       require.NoError(t, err)
+       defer driverConn.Close()
+
+       // Create transaction context with empty round images
+       tranCtx := &types.TransactionContext{
+               XID:         "test-xid",
+               BranchID:    123,
+               RoundImages: &types.RoundRecordImage{},
+       }
+
+       // Get raw driver connection
+       var rawConn driver.Conn
+       err = driverConn.Raw(func(dc interface{}) error {
+               rawConn = dc.(driver.Conn)
+               return nil
+       })
+       require.NoError(t, err)
+
+       // FlushUndoLog should return nil for empty images
+       err = manager.FlushUndoLog(tranCtx, rawConn)
+       assert.NoError(t, err)
+
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestUndoLogManager_FlushUndoLog_WithImages(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       // Save original config and restore after test
+       originalSerialization := undo.UndoConfig.LogSerialization
+       originalCompressType := undo.UndoConfig.CompressConfig.Type
+       defer func() {
+               undo.UndoConfig.LogSerialization = originalSerialization
+               undo.UndoConfig.CompressConfig.Type = originalCompressType
+       }()
+
+       undo.UndoConfig.LogSerialization = "json"
+       undo.UndoConfig.CompressConfig.Type = "none"
+
+       manager := NewUndoLogManager()
+
+       // Create a mock driver.Conn
+       driverConn, err := db.Conn(context.Background())
+       require.NoError(t, err)
+       defer driverConn.Close()
+
+       // Create transaction context with images
+       roundImages := &types.RoundRecordImage{}
+
+       // Add before images
+       beforeImage := &types.RecordImage{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeInsert,
+               Rows:      []types.RowImage{},
+       }
+       roundImages.AppendBeofreImage(beforeImage)
+
+       // Add after images
+       afterImage := &types.RecordImage{
+               TableName: "test_table",
+               SQLType:   types.SQLTypeInsert,
+               Rows: []types.RowImage{
+                       {
+                               Columns: []types.ColumnImage{
+                                       {ColumnName: "id", Value: 1},
+                               },
+                       },
+               },
+       }
+       roundImages.AppendAfterImage(afterImage)
+
+       tranCtx := &types.TransactionContext{
+               XID:         "test-xid",
+               BranchID:    123,
+               RoundImages: roundImages,
+       }
+
+       // Mock the insert operation
+       mock.ExpectPrepare("INSERT INTO (.+) VALUES").
+               ExpectExec().
+               WillReturnResult(sqlmock.NewResult(1, 1))
+
+       // Get raw driver connection
+       var rawConn driver.Conn
+       err = driverConn.Raw(func(dc interface{}) error {
+               rawConn = dc.(driver.Conn)
+               return nil
+       })
+       require.NoError(t, err)
+
+       // FlushUndoLog should succeed
+       err = manager.FlushUndoLog(tranCtx, rawConn)
+       // Error expected as we're using sqlmock which doesn't fully implement 
all driver features
+       // The test verifies the method is callable and handles the input
+       assert.NoError(t, err)
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestUndoLogManager_RunUndo(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       // Save original config and restore after test
+       originalSerialization := undo.UndoConfig.LogSerialization
+       defer func() {
+               undo.UndoConfig.LogSerialization = originalSerialization
+       }()
+
+       undo.UndoConfig.LogSerialization = "json"
+
+       manager := NewUndoLogManager()
+       ctx := context.Background()
+       xid := "test-xid"
+       branchID := int64(123)
+       dbName := "test_db"
+
+       // Mock the connection for Undo operation - expect full flow
+       mock.ExpectBegin()
+       mock.ExpectPrepare("SELECT (.+) FROM (.+) WHERE branch_id = \\? AND xid 
= \\? FOR UPDATE").
+               ExpectQuery().
+               WithArgs(branchID, xid).
+               WillReturnRows(sqlmock.NewRows([]string{"branch_id", "xid", 
"context", "rollback_info", "log_status"}))
+       // Expect INSERT for undo with global finished status
+       mock.ExpectPrepare("INSERT INTO undo_log").
+               ExpectExec().
+               WillReturnResult(sqlmock.NewResult(1, 1))
+       mock.ExpectCommit()
+
+       err = manager.RunUndo(ctx, xid, branchID, db, dbName)
+       assert.NoError(t, err)
+       // The method will execute and attempt database operations
+       // We're testing that it's callable and delegates to Base.Undo
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestUndoLogManager_HasUndoLogTable(t *testing.T) {
+       db, mock, err := sqlmock.New()
+       require.NoError(t, err)
+       defer db.Close()
+
+       manager := NewUndoLogManager()
+       ctx := context.Background()
+
+       // Mock successful query (table exists)
+       mock.ExpectQuery("SELECT 1 FROM (.+) LIMIT 1").
+               WillReturnRows(sqlmock.NewRows([]string{"1"}).AddRow(1))
+
+       conn, err := db.Conn(ctx)
+       require.NoError(t, err)
+       defer conn.Close()
+
+       exists, err := manager.HasUndoLogTable(ctx, conn)
+       assert.NoError(t, err)
+       assert.True(t, exists)
+
+       assert.NoError(t, mock.ExpectationsWereMet())
+}
+
+func TestUndoLogManager_InterfaceImplementation(t *testing.T) {
+       var _ undo.UndoLogManager = (*undoLogManager)(nil)
+       manager := NewUndoLogManager()
+       assert.Implements(t, (*undo.UndoLogManager)(nil), manager)
+}
diff --git a/pkg/datasource/sql/undo/parser/parser_cache_test.go 
b/pkg/datasource/sql/undo/parser/parser_cache_test.go
index a1b80918..5aebb39b 100644
--- a/pkg/datasource/sql/undo/parser/parser_cache_test.go
+++ b/pkg/datasource/sql/undo/parser/parser_cache_test.go
@@ -45,3 +45,35 @@ func TestLoad(t *testing.T) {
        assert.Nil(t, err)
        assert.NotNil(t, jsonParser)
 }
+
+func TestLoad_NotFound(t *testing.T) {
+       parser, err := GetCache().Load("nonexistent")
+       assert.NotNil(t, err)
+       assert.Nil(t, parser)
+       assert.Contains(t, err.Error(), "not found")
+}
+
+func TestLoad_Protobuf(t *testing.T) {
+       protobufParser, err := GetCache().Load("protobuf")
+       assert.Nil(t, err)
+       assert.NotNil(t, protobufParser)
+       assert.Equal(t, "protobuf", protobufParser.GetName())
+}
+
+func TestUndoLogParserCache_Store(t *testing.T) {
+       cache := GetCache()
+       assert.NotNil(t, cache)
+
+       // Verify both parsers are stored
+       jsonParser, err := cache.Load("json")
+       assert.NoError(t, err)
+       assert.NotNil(t, jsonParser)
+
+       protobufParser, err := cache.Load("protobuf")
+       assert.NoError(t, err)
+       assert.NotNil(t, protobufParser)
+}
+
+func TestDefaultSerializer(t *testing.T) {
+       assert.Equal(t, "json", DefaultSerializer)
+}
diff --git a/pkg/datasource/sql/undo/parser/parser_json_test.go 
b/pkg/datasource/sql/undo/parser/parser_json_test.go
index 2123f082..dc4eb095 100644
--- a/pkg/datasource/sql/undo/parser/parser_json_test.go
+++ b/pkg/datasource/sql/undo/parser/parser_json_test.go
@@ -103,3 +103,43 @@ func TestJsonDecode(t *testing.T) {
        }
 
 }
+
+func TestJsonDecode_InvalidJSON(t *testing.T) {
+       logParser := &JsonParser{}
+       undoLog, err := logParser.Decode([]byte("invalid json"))
+       assert.NotNil(t, err)
+       assert.Nil(t, undoLog)
+}
+
+func TestJsonDecode_EmptyBytes(t *testing.T) {
+       logParser := &JsonParser{}
+       undoLog, err := logParser.Decode([]byte(""))
+       assert.NotNil(t, err)
+       assert.Nil(t, undoLog)
+}
+
+func TestJsonEncode_WithSQLUndoLogs(t *testing.T) {
+       logParser := &JsonParser{}
+       branchUndoLog := &undo.BranchUndoLog{
+               Xid:      "test-xid",
+               BranchID: 12345,
+               Logs: []undo.SQLUndoLog{
+                       {
+                               TableName: "test_table",
+                       },
+               },
+       }
+
+       bytes, err := logParser.Encode(branchUndoLog)
+       assert.Nil(t, err)
+       assert.NotNil(t, bytes)
+       assert.Contains(t, string(bytes), "test-xid")
+       assert.Contains(t, string(bytes), "test_table")
+}
+
+func TestJsonParser_Interface(t *testing.T) {
+       var parser UndoLogParser = &JsonParser{}
+       assert.NotNil(t, parser)
+       assert.Equal(t, "json", parser.GetName())
+       assert.Equal(t, []byte("{}"), parser.GetDefaultContent())
+}
diff --git a/pkg/datasource/sql/undo/parser/parser_protobuf.go 
b/pkg/datasource/sql/undo/parser/parser_protobuf.go
index 7fd56082..c9f4529b 100644
--- a/pkg/datasource/sql/undo/parser/parser_protobuf.go
+++ b/pkg/datasource/sql/undo/parser/parser_protobuf.go
@@ -19,6 +19,7 @@ package parser
 
 import (
        "encoding/json"
+       "fmt"
 
        "github.com/golang/protobuf/ptypes/any"
        "github.com/golang/protobuf/ptypes/wrappers"
@@ -44,6 +45,9 @@ func (p *ProtobufParser) GetDefaultContent() []byte {
 
 // Encode branch undo log to byte array
 func (p *ProtobufParser) Encode(branchUndoLog *undo.BranchUndoLog) ([]byte, 
error) {
+       if branchUndoLog == nil {
+               return nil, fmt.Errorf("branchUndoLog cannot be nil")
+       }
        protoLog := ConvertToProto(branchUndoLog)
        return proto.Marshal(protoLog)
 }
@@ -60,6 +64,9 @@ func (p *ProtobufParser) Decode(data []byte) 
(*undo.BranchUndoLog, error) {
 }
 
 func ConvertToProto(intreeLog *undo.BranchUndoLog) *BranchUndoLog {
+       if intreeLog == nil {
+               return nil
+       }
        protoLog := &BranchUndoLog{
                Xid:      intreeLog.Xid,
                BranchID: intreeLog.BranchID,
diff --git a/pkg/datasource/sql/undo/parser/parser_protobuf_test.go 
b/pkg/datasource/sql/undo/parser/parser_protobuf_test.go
index e8cb0d52..7427d590 100644
--- a/pkg/datasource/sql/undo/parser/parser_protobuf_test.go
+++ b/pkg/datasource/sql/undo/parser/parser_protobuf_test.go
@@ -89,3 +89,35 @@ func TestConvertInterfaceToAnyAndBack(t *testing.T) {
 
        assert.Equal(t, originalValue, convertedValue, "The converted value 
should match the original")
 }
+
+func TestProtobufParser_Interface(t *testing.T) {
+       var parser UndoLogParser = &ProtobufParser{}
+       assert.NotNil(t, parser)
+       assert.Equal(t, "protobuf", parser.GetName())
+       assert.Equal(t, []byte{}, parser.GetDefaultContent())
+}
+
+func TestProtobufDecode_InvalidData(t *testing.T) {
+       parser := &ProtobufParser{}
+       undoLog, err := parser.Decode([]byte("invalid protobuf data"))
+       assert.NotNil(t, err)
+       assert.Nil(t, undoLog)
+}
+
+func TestProtobufDecode_EmptyData(t *testing.T) {
+       parser := &ProtobufParser{}
+       undoLog, err := parser.Decode([]byte{})
+       // Empty data decodes to empty BranchUndoLog (protobuf behavior)
+       assert.NoError(t, err)
+       assert.NotNil(t, undoLog)
+       assert.Empty(t, undoLog.Xid)
+       assert.Equal(t, uint64(0), undoLog.BranchID)
+       assert.Empty(t, undoLog.Logs)
+}
+
+func TestProtobufEncode_Nil(t *testing.T) {
+       parser := &ProtobufParser{}
+       bytes, err := parser.Encode(nil)
+       assert.NotNil(t, err)
+       assert.Nil(t, bytes)
+}
diff --git a/pkg/tm/transaction_executor_test.go 
b/pkg/tm/transaction_executor_test.go
index 277f2397..cec6e24d 100644
--- a/pkg/tm/transaction_executor_test.go
+++ b/pkg/tm/transaction_executor_test.go
@@ -276,6 +276,7 @@ func TestCommitOrRollback(t *testing.T) {
                {
                        ctx: context.Background(),
                        tx: GlobalTransaction{
+                               Xid:    "test-xid-rollback",
                                TxRole: Launcher,
                        },
                        ok:                 false,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to