This is an automated email from the ASF dual-hosted git repository.
lxfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/master by this push:
new a4815531 fix:fix bugs detected by doubao (#883)
a4815531 is described below
commit a4815531d483437aab2a033e34f9e567ccfaf29e
Author: heliang <[email protected]>
AuthorDate: Sun Sep 14 11:50:59 2025 +0800
fix:fix bugs detected by doubao (#883)
* fix:fix bugs detected by doubao
* fix: Fix CI lint errors and test interface issues
---
pkg/datasource/sql/conn_at_test.go | 35 ++++++++++--
pkg/datasource/sql/conn_xa.go | 2 +-
pkg/datasource/sql/conn_xa_test.go | 34 ++++-------
pkg/datasource/sql/db.go | 4 +-
pkg/datasource/sql/driver.go | 8 +--
pkg/datasource/sql/exec/at/base_executor.go | 66 ++++++++++++++++++++--
pkg/datasource/sql/exec/hook.go | 11 +++-
pkg/datasource/sql/tx.go | 15 +++--
pkg/datasource/sql/tx_at.go | 4 +-
pkg/datasource/sql/tx_xa.go | 4 +-
pkg/datasource/sql/undo/base/undo.go | 8 +--
...ql_insertonduplicate_update_undo_log_builder.go | 2 +-
pkg/discovery/etcd3.go | 10 ++--
pkg/discovery/file.go | 2 +-
pkg/rm/tcc/fence/fence_driver.go | 2 +-
pkg/tm/global_transaction.go | 2 +-
pkg/tm/transaction_executor.go | 2 +-
pkg/util/log/logging.go | 27 ++++++++-
18 files changed, 172 insertions(+), 66 deletions(-)
diff --git a/pkg/datasource/sql/conn_at_test.go
b/pkg/datasource/sql/conn_at_test.go
index 33961a70..d4999aa6 100644
--- a/pkg/datasource/sql/conn_at_test.go
+++ b/pkg/datasource/sql/conn_at_test.go
@@ -107,9 +107,10 @@ func TestATConn_ExecContext(t *testing.T) {
mi.before = beforeHook
var comitCnt int32
- beforeCommit := func(tx *Tx) {
+ beforeCommit := func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
assert.Equal(t, types.ATMode,
tx.tranCtx.TransactionMode)
+ return nil
}
ti.beforeCommit = beforeCommit
@@ -131,8 +132,9 @@ func TestATConn_ExecContext(t *testing.T) {
}
var comitCnt int32
- ti.beforeCommit = func(tx *Tx) {
+ ti.beforeCommit = func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
conn, err := db.Conn(context.Background())
@@ -168,8 +170,9 @@ func TestATConn_BeginTx(t *testing.T) {
}
var comitCnt int32
- ti.beforeCommit = func(tx *Tx) {
+ ti.beforeCommit = func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
_, err = tx.ExecContext(context.Background(), "SELECT * FROM
user")
@@ -194,8 +197,9 @@ func TestATConn_BeginTx(t *testing.T) {
}
var comitCnt int32
- ti.beforeCommit = func(tx *Tx) {
+ ti.beforeCommit = func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
_, err = tx.ExecContext(context.Background(), "SELECT * FROM
user")
@@ -222,8 +226,9 @@ func TestATConn_BeginTx(t *testing.T) {
}
var comitCnt int32
- ti.beforeCommit = func(tx *Tx) {
+ ti.beforeCommit = func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
_, err = tx.ExecContext(context.Background(), "SELECT * FROM
user")
@@ -238,3 +243,23 @@ func TestATConn_BeginTx(t *testing.T) {
assert.Equal(t, int32(1), atomic.LoadInt32(&comitCnt))
})
}
+
+type mockTxHook struct {
+ beforeCommit func(tx *Tx) error
+ beforeRollback func(tx *Tx)
+}
+
+// BeforeCommit
+func (mi *mockTxHook) BeforeCommit(tx *Tx) error {
+ if mi.beforeCommit != nil {
+ return mi.beforeCommit(tx)
+ }
+ return nil
+}
+
+// BeforeRollback
+func (mi *mockTxHook) BeforeRollback(tx *Tx) {
+ if mi.beforeRollback != nil {
+ mi.beforeRollback(tx)
+ }
+}
diff --git a/pkg/datasource/sql/conn_xa.go b/pkg/datasource/sql/conn_xa.go
index a7751cd4..9dc00bda 100644
--- a/pkg/datasource/sql/conn_xa.go
+++ b/pkg/datasource/sql/conn_xa.go
@@ -209,7 +209,7 @@ func (c *XAConn) createNewTxOnExecIfNeed(ctx
context.Context, f func() (types.Ex
if err != nil {
// XA End & Rollback
if rollbackErr := c.Rollback(ctx); rollbackErr != nil {
- log.Errorf("failed to rollback xa branch of :%s,
err:%w", c.txCtx.XID, rollbackErr)
+ log.Errorf("failed to rollback xa branch of :%s,
err:%v", c.txCtx.XID, rollbackErr)
}
return nil, err
}
diff --git a/pkg/datasource/sql/conn_xa_test.go
b/pkg/datasource/sql/conn_xa_test.go
index 624bf1c2..ff860a0f 100644
--- a/pkg/datasource/sql/conn_xa_test.go
+++ b/pkg/datasource/sql/conn_xa_test.go
@@ -101,25 +101,6 @@ func (mi *mockSQLInterceptor) After(ctx context.Context,
execCtx *types.ExecCont
return nil
}
-type mockTxHook struct {
- beforeCommit func(tx *Tx)
- beforeRollback func(tx *Tx)
-}
-
-// BeforeCommit
-func (mi *mockTxHook) BeforeCommit(tx *Tx) {
- if mi.beforeCommit != nil {
- mi.beforeCommit(tx)
- }
-}
-
-// BeforeRollback
-func (mi *mockTxHook) BeforeRollback(tx *Tx) {
- if mi.beforeRollback != nil {
- mi.beforeRollback(tx)
- }
-}
-
// simulateExecContextError allows tests to inject driver errors for certain
SQL strings.
// When set, baseMockConn will call this hook for each ExecContext.
var simulateExecContextError func(query string) error
@@ -209,9 +190,10 @@ func TestXAConn_ExecContext(t *testing.T) {
mi.before = before
var comitCnt int32
- beforeCommit := func(tx *Tx) {
+ beforeCommit := func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
assert.Equal(t, tx.tranCtx.TransactionMode,
types.XAMode)
+ return nil
}
ti.beforeCommit = beforeCommit
@@ -235,8 +217,9 @@ func TestXAConn_ExecContext(t *testing.T) {
mi.before = before
var comitCnt int32
- beforeCommit := func(tx *Tx) {
+ beforeCommit := func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
ti.beforeCommit = beforeCommit
@@ -273,8 +256,9 @@ func TestXAConn_BeginTx(t *testing.T) {
}
var comitCnt int32
- ti.beforeCommit = func(tx *Tx) {
+ ti.beforeCommit = func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
_, err = tx.ExecContext(context.Background(), "SELECT * FROM
user")
@@ -299,8 +283,9 @@ func TestXAConn_BeginTx(t *testing.T) {
}
var comitCnt int32
- ti.beforeCommit = func(tx *Tx) {
+ ti.beforeCommit = func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
_, err = tx.ExecContext(context.Background(), "SELECT * FROM
user")
@@ -327,8 +312,9 @@ func TestXAConn_BeginTx(t *testing.T) {
}
var comitCnt int32
- ti.beforeCommit = func(tx *Tx) {
+ ti.beforeCommit = func(tx *Tx) error {
atomic.AddInt32(&comitCnt, 1)
+ return nil
}
_, err = tx.ExecContext(context.Background(), "SELECT * FROM
user")
diff --git a/pkg/datasource/sql/db.go b/pkg/datasource/sql/db.go
index df162d10..d347b15b 100644
--- a/pkg/datasource/sql/db.go
+++ b/pkg/datasource/sql/db.go
@@ -128,11 +128,11 @@ func (db *DBResource) init() {
ctx := context.Background()
conn, err := db.connector.Connect(ctx)
if err != nil {
- log.Errorf("connect: %w", err)
+ log.Errorf("connect: %v", err)
}
version, err := selectDBVersion(ctx, conn)
if err != nil {
- log.Errorf("select db version: %w", err)
+ log.Errorf("select db version: %v", err)
}
db.SetDbVersion(version)
db.checkDbVersion()
diff --git a/pkg/datasource/sql/driver.go b/pkg/datasource/sql/driver.go
index 905d81d3..be69f38b 100644
--- a/pkg/datasource/sql/driver.go
+++ b/pkg/datasource/sql/driver.go
@@ -120,7 +120,7 @@ func (d *seataDriver) OpenConnector(name string) (c
driver.Connector, err error)
if driverCtx, ok := d.target.(driver.DriverContext); ok {
c, err = driverCtx.OpenConnector(name)
if err != nil {
- log.Errorf("open connector: %w", err)
+ log.Errorf("open connector: %v", err)
return nil, err
}
}
@@ -132,7 +132,7 @@ func (d *seataDriver) OpenConnector(name string) (c
driver.Connector, err error)
proxy, err := d.getOpenConnectorProxy(c, dbType, sql.OpenDB(c), name)
if err != nil {
- log.Errorf("register resource: %w", err)
+ log.Errorf("register resource: %v", err)
return nil, err
}
@@ -152,12 +152,12 @@ func (d *seataDriver) getOpenConnectorProxy(connector
driver.Connector, dbType t
}
res, err := newResource(options...)
if err != nil {
- log.Errorf("create new resource: %w", err)
+ log.Errorf("create new resource: %v", err)
return nil, err
}
datasource.RegisterTableCache(types.DBTypeMySQL,
mysql2.NewTableMetaInstance(db, cfg))
if err =
datasource.GetDataSourceManager(d.branchType).RegisterResource(res); err != nil
{
- log.Errorf("regisiter resource: %w", err)
+ log.Errorf("register resource: %v", err)
return nil, err
}
return &seataConnector{
diff --git a/pkg/datasource/sql/exec/at/base_executor.go
b/pkg/datasource/sql/exec/at/base_executor.go
index 76f3e424..13ac4fe8 100644
--- a/pkg/datasource/sql/exec/at/base_executor.go
+++ b/pkg/datasource/sql/exec/at/base_executor.go
@@ -34,6 +34,7 @@ import (
"seata.apache.org/seata-go/pkg/datasource/sql/types"
"seata.apache.org/seata-go/pkg/datasource/sql/undo"
"seata.apache.org/seata-go/pkg/datasource/sql/util"
+ "seata.apache.org/seata-go/pkg/util/log"
"seata.apache.org/seata-go/pkg/util/reflectx"
)
@@ -41,16 +42,22 @@ type baseExecutor struct {
hooks []exec.SQLHook
}
-func (b *baseExecutor) beforeHooks(ctx context.Context, execCtx
*types.ExecContext) {
+func (b *baseExecutor) beforeHooks(ctx context.Context, execCtx
*types.ExecContext) error {
for _, hook := range b.hooks {
- hook.Before(ctx, execCtx)
+ if err := hook.Before(ctx, execCtx); err != nil {
+ return err
+ }
}
+ return nil
}
-func (b *baseExecutor) afterHooks(ctx context.Context, execCtx
*types.ExecContext) {
+func (b *baseExecutor) afterHooks(ctx context.Context, execCtx
*types.ExecContext) error {
for _, hook := range b.hooks {
- hook.After(ctx, execCtx)
+ if err := hook.After(ctx, execCtx); err != nil {
+ log.Errorf("after hook failed: %v", err)
+ }
}
+ return nil
}
// GetScanSlice get the column type for scan
@@ -160,6 +167,57 @@ func (b *baseExecutor) traversalArgs(node ast.Node,
argsIndex *[]int32) {
b.traversalArgs(exprs.On.Expr, argsIndex)
}
break
+ case *ast.UnaryOperationExpr:
+ expr := node.(*ast.UnaryOperationExpr)
+ b.traversalArgs(expr.V, argsIndex)
+ break
+ case *ast.FuncCallExpr:
+ expr := node.(*ast.FuncCallExpr)
+ for _, arg := range expr.Args {
+ b.traversalArgs(arg, argsIndex)
+ }
+ break
+ case *ast.SubqueryExpr:
+ expr := node.(*ast.SubqueryExpr)
+ if expr.Query != nil {
+ b.traversalArgs(expr.Query, argsIndex)
+ }
+ break
+ case *ast.ExistsSubqueryExpr:
+ expr := node.(*ast.ExistsSubqueryExpr)
+ if expr.Sel != nil {
+ b.traversalArgs(expr.Sel, argsIndex)
+ }
+ break
+ case *ast.CompareSubqueryExpr:
+ expr := node.(*ast.CompareSubqueryExpr)
+ b.traversalArgs(expr.L, argsIndex)
+ if expr.R != nil {
+ b.traversalArgs(expr.R, argsIndex)
+ }
+ break
+ case *ast.PatternLikeExpr:
+ expr := node.(*ast.PatternLikeExpr)
+ b.traversalArgs(expr.Expr, argsIndex)
+ b.traversalArgs(expr.Pattern, argsIndex)
+ break
+ case *ast.IsNullExpr:
+ expr := node.(*ast.IsNullExpr)
+ b.traversalArgs(expr.Expr, argsIndex)
+ break
+ case *ast.CaseExpr:
+ expr := node.(*ast.CaseExpr)
+ if expr.Value != nil {
+ b.traversalArgs(expr.Value, argsIndex)
+ }
+ for _, whenClause := range expr.WhenClauses {
+ b.traversalArgs(whenClause.Expr, argsIndex)
+ b.traversalArgs(whenClause.Result, argsIndex)
+ }
+ if expr.ElseClause != nil {
+ b.traversalArgs(expr.ElseClause, argsIndex)
+ }
+ break
case *test_driver.ParamMarkerExpr:
*argsIndex = append(*argsIndex,
int32(node.(*test_driver.ParamMarkerExpr).Order))
break
diff --git a/pkg/datasource/sql/exec/hook.go b/pkg/datasource/sql/exec/hook.go
index 88140b49..40dfeaa9 100644
--- a/pkg/datasource/sql/exec/hook.go
+++ b/pkg/datasource/sql/exec/hook.go
@@ -53,9 +53,14 @@ func RegisterHook(hook SQLHook) {
}
// SQLHook SQL execution front and back interceptor
-// case 1. Used to intercept SQL to achieve the generation of front and rear
mirrors
-// case 2. Burning point to report
-// case 3. SQL black and white list
+//
+// Use cases:
+// 1. Used to intercept SQL to achieve the generation of front and rear mirrors
+// 2. Burning point to report
+// 3. SQL black and white list
+//
+// Before hook errors will prevent SQL execution
+// After hook errors are logged but don't affect main execution flow
type SQLHook interface {
Type() types.SQLType
Before(ctx context.Context, execCtx *types.ExecContext) error
diff --git a/pkg/datasource/sql/tx.go b/pkg/datasource/sql/tx.go
index db55e97d..c495dbce 100644
--- a/pkg/datasource/sql/tx.go
+++ b/pkg/datasource/sql/tx.go
@@ -55,7 +55,7 @@ type (
txOption func(tx *Tx)
txHook interface {
- BeforeCommit(tx *Tx)
+ BeforeCommit(tx *Tx) error
BeforeRollback(tx *Tx)
}
@@ -105,19 +105,24 @@ type Tx struct {
// Commit do commit action
func (tx *Tx) Commit() error {
- tx.beforeCommit()
+ if err := tx.beforeCommit(); err != nil {
+ return err
+ }
return tx.commitOnLocal()
}
-func (tx *Tx) beforeCommit() {
+func (tx *Tx) beforeCommit() error {
if len(txHooks) != 0 {
hl.RLock()
defer hl.RUnlock()
for i := range txHooks {
- txHooks[i].BeforeCommit(tx)
+ if err := txHooks[i].BeforeCommit(tx); err != nil {
+ return err
+ }
}
}
+ return nil
}
func (tx *Tx) Rollback() error {
@@ -209,7 +214,7 @@ func (tx *Tx) report(success bool) error {
if err = dataSourceManager.BranchReport(context.Background(),
request); err == nil {
break
}
- log.Infof("Failed to report [%s / %s] commit done [%s] Retry
Countdown: %s", tx.tranCtx.BranchID, tx.tranCtx.XID, success, retry)
+ log.Infof("Failed to report [%d / %s] commit done [%v] Retry
Countdown: %s", tx.tranCtx.BranchID, tx.tranCtx.XID, success, retry)
retry.Wait()
}
return err
diff --git a/pkg/datasource/sql/tx_at.go b/pkg/datasource/sql/tx_at.go
index 40c8b7ee..f4e3478d 100644
--- a/pkg/datasource/sql/tx_at.go
+++ b/pkg/datasource/sql/tx_at.go
@@ -33,7 +33,9 @@ type ATTx struct {
// case 2. not need flush undolog, is XA mode, do local transaction commit
// case 3. need run AT transaction
func (tx *ATTx) Commit() error {
- tx.tx.beforeCommit()
+ if err := tx.tx.beforeCommit(); err != nil {
+ return err
+ }
return tx.commitOnAT()
}
diff --git a/pkg/datasource/sql/tx_xa.go b/pkg/datasource/sql/tx_xa.go
index cdc87f8c..d3380e57 100644
--- a/pkg/datasource/sql/tx_xa.go
+++ b/pkg/datasource/sql/tx_xa.go
@@ -26,7 +26,9 @@ type XATx struct {
// case 2. not need flush undolog, is XA mode, do local transaction commit
// case 3. need run AT transaction
func (tx *XATx) Commit() error {
- tx.tx.beforeCommit()
+ if err := tx.tx.beforeCommit(); err != nil {
+ return err
+ }
return tx.commitOnXA()
}
diff --git a/pkg/datasource/sql/undo/base/undo.go
b/pkg/datasource/sql/undo/base/undo.go
index b196eee1..a3aab8d4 100644
--- a/pkg/datasource/sql/undo/base/undo.go
+++ b/pkg/datasource/sql/undo/base/undo.go
@@ -261,7 +261,7 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context,
dbType types.DBType, xid
defer func() {
if err != nil {
if err = tx.Rollback(); err != nil {
- log.Errorf("rollback fail, xid: %s, branchID:%s
err:%v", xid, branchID, err)
+ log.Errorf("rollback fail, xid: %s, branchID:%d
err:%v", xid, branchID, err)
return
}
}
@@ -274,7 +274,7 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context,
dbType types.DBType, xid
}
defer func() {
if err = stmt.Close(); err != nil {
- log.Errorf("stmt close fail, xid: %s, branchID:%s
err:%v", xid, branchID, err)
+ log.Errorf("stmt close fail, xid: %s, branchID:%d
err:%v", xid, branchID, err)
return
}
}()
@@ -286,7 +286,7 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context,
dbType types.DBType, xid
}
defer func() {
if err = rows.Close(); err != nil {
- log.Errorf("rows close fail, xid: %s, branchID:%s
err:%v", xid, branchID, err)
+ log.Errorf("rows close fail, xid: %s, branchID:%d
err:%v", xid, branchID, err)
return
}
}()
@@ -301,7 +301,7 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context,
dbType types.DBType, xid
undoLogRecords = append(undoLogRecords, record)
}
if err := rows.Err(); err != nil {
- log.Errorf("read rows next fail, xid: %s, branchID:%s err:%v",
xid, branchID, err)
+ log.Errorf("read rows next fail, xid: %s, branchID:%d err:%v",
xid, branchID, err)
return err
}
var exists bool
diff --git
a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go
b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go
index 6b82d537..56de4a36 100644
---
a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go
+++
b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go
@@ -233,7 +233,7 @@ func checkDuplicateKeyUpdate(insert *ast.InsertStmt,
metaData types.TableMeta) e
}
for name, col := range index.Columns {
if duplicateColsMap[strings.ToLower(col.ColumnName)] {
- log.Errorf("update pk value is not supported!
index name:%s update column name: %s", name, col.ColumnName)
+ log.Errorf("update pk value is not supported!
index name:%d update column name: %s", name, col.ColumnName)
return fmt.Errorf("update pk value is not
supported! ")
}
}
diff --git a/pkg/discovery/etcd3.go b/pkg/discovery/etcd3.go
index 49ff35be..38acca9c 100644
--- a/pkg/discovery/etcd3.go
+++ b/pkg/discovery/etcd3.go
@@ -92,12 +92,12 @@ func (s *EtcdRegistryService) watch(key string) {
v := kv.Value
clusterName, err := getClusterName(k)
if err != nil {
- log.Errorf("etcd key has an incorrect format:
", err)
+ log.Errorf("etcd key has an incorrect format:
%v", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
- log.Errorf("etcd value has an incorrect format:
", err)
+ log.Errorf("etcd value has an incorrect format:
%v", err)
return
}
s.rwLock.Lock()
@@ -129,12 +129,12 @@ func (s *EtcdRegistryService) watch(key string) {
v := event.Kv.Value
clusterName, err := getClusterName(k)
if err != nil {
- log.Errorf("etcd key err: ",
err)
+ log.Errorf("etcd key err: %v",
err)
return
}
serverInstance, err :=
getServerInstance(v)
if err != nil {
- log.Errorf("etcd value err: ",
err)
+ log.Errorf("etcd value err:
%v", err)
return
}
@@ -156,7 +156,7 @@ func (s *EtcdRegistryService) watch(key string) {
cluster, ip, port, err :=
getClusterAndAddress(event.Kv.Key)
if err != nil {
- log.Errorf("etcd key err: ",
err)
+ log.Errorf("etcd key err: %v",
err)
return
}
diff --git a/pkg/discovery/file.go b/pkg/discovery/file.go
index 3be8b560..e1cf2423 100644
--- a/pkg/discovery/file.go
+++ b/pkg/discovery/file.go
@@ -58,7 +58,7 @@ func (s *FileRegistryService) Lookup(key string)
([]*ServiceInstance, error) {
addrStr = v
}
if addrStr == "" {
- log.Errorf("endpoint is empty. key: %s group: %s", group)
+ log.Errorf("endpoint is empty. key: %s group: %s", key, group)
return nil, fmt.Errorf("endpoint is empty. key: %s group: %s",
key, group)
}
diff --git a/pkg/rm/tcc/fence/fence_driver.go b/pkg/rm/tcc/fence/fence_driver.go
index 07cccb91..0dbbaeeb 100644
--- a/pkg/rm/tcc/fence/fence_driver.go
+++ b/pkg/rm/tcc/fence/fence_driver.go
@@ -53,7 +53,7 @@ func (fd *FenceDriver) OpenConnector(name string) (connector
driver.Connector, r
if driverCtx, ok := fd.TargetDriver.(driver.DriverContext); ok {
connector, re = driverCtx.OpenConnector(name)
if re != nil {
- log.Errorf("open connector: %w", re)
+ log.Errorf("open connector: %v", re)
return nil, re
}
}
diff --git a/pkg/tm/global_transaction.go b/pkg/tm/global_transaction.go
index 8846e799..c645f3d1 100644
--- a/pkg/tm/global_transaction.go
+++ b/pkg/tm/global_transaction.go
@@ -74,7 +74,7 @@ func (g *GlobalTransactionManager) Commit(ctx
context.Context, gtr *GlobalTransa
if isTimeout(ctx) {
log.Infof("Rollback: tm detected timeout in global gtr %s",
gtr.Xid)
if err := GetGlobalTransactionManager().Rollback(ctx, gtr); err
!= nil {
- log.Errorf("Rollback transaction failed, error: %v in
global gtr % s", err, gtr.Xid)
+ log.Errorf("Rollback transaction failed, error: %v in
global gtr %s", err, gtr.Xid)
return err
}
return nil
diff --git a/pkg/tm/transaction_executor.go b/pkg/tm/transaction_executor.go
index 69df09ed..35f0a1fb 100644
--- a/pkg/tm/transaction_executor.go
+++ b/pkg/tm/transaction_executor.go
@@ -69,7 +69,7 @@ func WithGlobalTx(ctx context.Context, gc *GtxConfig,
business CallbackWithCtx)
if IsGlobalTx(ctx) {
// business maybe to throw panic, so need to recover it
here.
if err = commitOrRollback(ctx, deferErr == nil && re ==
nil); err != nil {
- log.Errorf("global transaction xid %s, name %s
second phase error", GetXID(ctx), GetTxName(ctx), err)
+ log.Errorf("global transaction xid %s, name %s
second phase error: %v", GetXID(ctx), GetTxName(ctx), err)
}
}
diff --git a/pkg/util/log/logging.go b/pkg/util/log/logging.go
index 3119eafa..ae333e70 100644
--- a/pkg/util/log/logging.go
+++ b/pkg/util/log/logging.go
@@ -20,6 +20,7 @@ package log
import (
"bytes"
"fmt"
+ "os"
"time"
getty "github.com/apache/dubbo-getty"
@@ -229,7 +230,13 @@ func Warnf(format string, v ...interface{}) {
// Error ...
func Error(v ...interface{}) {
+ defer func() {
+ if r := recover(); r != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %v\n", v)
+ }
+ }()
if log == nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %v\n", v)
return
}
log.Error(v...)
@@ -237,7 +244,17 @@ func Error(v ...interface{}) {
// Errorf ...
func Errorf(format string, v ...interface{}) {
+ defer func() {
+ if r := recover(); r != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: ")
+ fmt.Fprintf(os.Stderr, format, v...)
+ fmt.Fprintf(os.Stderr, "\n")
+ }
+ }()
if log == nil {
+ fmt.Fprintf(os.Stderr, "ERROR: ")
+ fmt.Fprintf(os.Stderr, format, v...)
+ fmt.Fprintf(os.Stderr, "\n")
return
}
log.Errorf(format, v...)
@@ -246,7 +263,7 @@ func Errorf(format string, v ...interface{}) {
// Panic ...
func Panic(v ...interface{}) {
if log == nil {
- return
+ panic(v)
}
log.Panic(v...)
}
@@ -254,7 +271,7 @@ func Panic(v ...interface{}) {
// Panicf ...
func Panicf(format string, v ...interface{}) {
if log == nil {
- return
+ panic(fmt.Sprintf(format, v...))
}
log.Panicf(format, v...)
}
@@ -262,6 +279,8 @@ func Panicf(format string, v ...interface{}) {
// Fatal ...
func Fatal(v ...interface{}) {
if log == nil {
+ fmt.Fprintf(os.Stderr, "FATAL: %v\n", v)
+ os.Exit(1)
return
}
log.Fatal(v...)
@@ -270,6 +289,10 @@ func Fatal(v ...interface{}) {
// Fatalf ...
func Fatalf(format string, v ...interface{}) {
if log == nil {
+ fmt.Fprintf(os.Stderr, "FATAL: ")
+ fmt.Fprintf(os.Stderr, format, v...)
+ fmt.Fprintf(os.Stderr, "\n")
+ os.Exit(1)
return
}
log.Fatalf(format, v...)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]