This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 46c9b63728 feature: Implement scheduled handling for end status 
transaction (#7133)
46c9b63728 is described below

commit 46c9b63728e47af2b0a0fab5deb9baef66b1d36a
Author: Yongjun Hong <[email protected]>
AuthorDate: Tue Feb 18 14:36:47 2025 +0900

    feature: Implement scheduled handling for end status transaction (#7133)
---
 changes/en-us/2.x.md                               |  1 +
 changes/zh-cn/2.x.md                               |  1 +
 .../org/apache/seata/common/ConfigurationKeys.java | 10 ++++
 .../java/org/apache/seata/common/Constants.java    |  4 ++
 .../org/apache/seata/common/DefaultValues.java     | 10 ++++
 script/config-center/config.txt                    |  1 +
 .../properties/server/ServerProperties.java        | 10 ++++
 .../properties/server/ServerPropertiesTest.java    |  2 +
 .../server/coordinator/DefaultCoordinator.java     | 70 +++++++++++++++++++++-
 .../apache/seata/server/session/GlobalSession.java | 28 ++++++++-
 .../apache/seata/server/session/SessionHelper.java | 17 ++++++
 server/src/main/resources/application.example.yml  |  1 +
 .../main/resources/application.raft.example.yml    |  1 +
 13 files changed, 153 insertions(+), 3 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 1ae5a80cfd..5c999299f2 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -9,6 +9,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury 
undolog parser
 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster 
mode supports address translation
 - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury 
serializer
+- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] Implement 
scheduled handling for end status transaction
 
 ### bugfix:
 
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 24d392898a..0141ea7a08 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -9,6 +9,7 @@
 - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 
支持UndoLog的fury序列化方式
 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换
 - [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器
+- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] 
实现对残留的end状态事务定时处理
 
 ### bugfix:
 
diff --git 
a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java 
b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
index 47d9d48af3..0bcb83cf3c 100644
--- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
+++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
@@ -475,6 +475,11 @@ public interface ConfigurationKeys {
      */
     String ROLLBACKING_RETRY_PERIOD = RECOVERY_PREFIX + 
"rollbackingRetryPeriod";
 
+    /**
+     * The constant END_STATUS_RETRY_PERIOD.
+     */
+    String END_STATUS_RETRY_PERIOD = RECOVERY_PREFIX + "endstatusRetryPeriod";
+
     /**
      * The constant TIMEOUT_RETRY_PERIOD.
      */
@@ -599,6 +604,11 @@ public interface ConfigurationKeys {
      */
     String RETRY_DEAD_THRESHOLD = SERVER_PREFIX + "retryDeadThreshold";
 
+    /**
+     * the constant END_STATE_RETRY_DEAD_THRESHOLD
+     */
+    String END_STATE_RETRY_DEAD_THRESHOLD = SERVER_PREFIX + 
"endStateRetryDeadThreshold";
+
     /**
      * the constant DISTRIBUTED_LOCK_EXPIRE_TIME
      */
diff --git a/common/src/main/java/org/apache/seata/common/Constants.java 
b/common/src/main/java/org/apache/seata/common/Constants.java
index 43da1827e0..812a3605e4 100644
--- a/common/src/main/java/org/apache/seata/common/Constants.java
+++ b/common/src/main/java/org/apache/seata/common/Constants.java
@@ -180,6 +180,10 @@ public interface Constants {
      */
     String ROLLBACKING = "Rollbacking";
 
+    /**
+     * The constant END
+     */
+    String END = "END";
     /**
      * The constant AUTO_COMMIT
      */
diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java 
b/common/src/main/java/org/apache/seata/common/DefaultValues.java
index fab0f3f40e..049d625548 100644
--- a/common/src/main/java/org/apache/seata/common/DefaultValues.java
+++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java
@@ -310,6 +310,11 @@ public interface DefaultValues {
      */
     int DEFAULT_RETRY_DEAD_THRESHOLD = 2 * 60 * 1000 + 10 * 1000;
 
+    /**
+     * the constant DEFAULT_END_STATE_RETRY_DEAD_THRESHOLD
+     */
+    int DEFAULT_END_STATE_RETRY_DEAD_THRESHOLD = 10 * 1000;
+
     /**
      * the constant TM_INTERCEPTOR_ORDER
      */
@@ -397,6 +402,11 @@ public interface DefaultValues {
      */
     int DEFAULT_ROLLBACKING_RETRY_PERIOD = 1000;
 
+    /**
+     * the constant DEFAULT_END_STATUS_RETRY_PERIOD
+     */
+    int DEFAULT_END_STATUS_RETRY_PERIOD = 30 * 1000;
+
     /**
      * the constant DEFAULT_TIMEOUT_RETRY_PERIOD
      */
diff --git a/script/config-center/config.txt b/script/config-center/config.txt
index f4794626c3..6ded35ed83 100644
--- a/script/config-center/config.txt
+++ b/script/config-center/config.txt
@@ -153,6 +153,7 @@ store.redis.queryLimit=100
 server.recovery.committingRetryPeriod=1000
 server.recovery.asynCommittingRetryPeriod=1000
 server.recovery.rollbackingRetryPeriod=1000
+server.recovery.endstatusRetryPeriod=1000
 server.recovery.timeoutRetryPeriod=1000
 server.maxCommitRetryTimeout=-1
 server.maxRollbackRetryTimeout=-1
diff --git 
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
 
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
index f273b741d6..33f1b93892 100644
--- 
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
+++ 
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
@@ -27,6 +27,7 @@ import static 
org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER
 public class ServerProperties {
     private long maxCommitRetryTimeout = -1L;
     private long maxRollbackRetryTimeout = -1L;
+    private long maxEndStatusRetryTimeout = -1L;
     private Boolean rollbackRetryTimeoutUnlockEnable = false;
     private Boolean enableCheckAuth = true;
     private Boolean enableParallelRequestHandle = true;
@@ -56,6 +57,15 @@ public class ServerProperties {
         return this;
     }
 
+    public long getMaxEndStatusRetryTimeout() {
+        return maxEndStatusRetryTimeout;
+    }
+
+    public ServerProperties setMaxEndStatusRetryTimeout(long 
maxEndStatusRetryTimeout) {
+        this.maxEndStatusRetryTimeout = maxEndStatusRetryTimeout;
+        return this;
+    }
+
     public Boolean getRollbackRetryTimeoutUnlockEnable() {
         return rollbackRetryTimeoutUnlockEnable;
     }
diff --git 
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
 
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
index fc9161a07c..60d8db4a12 100644
--- 
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
+++ 
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
@@ -35,6 +35,7 @@ public class ServerPropertiesTest {
         serverProperties.setRollbackRetryTimeoutUnlockEnable(true);
         serverProperties.setMaxCommitRetryTimeout(1L);
         serverProperties.setMaxRollbackRetryTimeout(1L);
+        serverProperties.setMaxEndStatusRetryTimeout(1L);
 
         Assertions.assertEquals(1, serverProperties.getXaerNotaRetryTimeout());
         Assertions.assertEquals(1, serverProperties.getRetryDeadThreshold());
@@ -47,5 +48,6 @@ public class ServerPropertiesTest {
         
Assertions.assertTrue(serverProperties.getRollbackRetryTimeoutUnlockEnable());
         Assertions.assertEquals(1L, 
serverProperties.getMaxCommitRetryTimeout());
         Assertions.assertEquals(1L, 
serverProperties.getMaxRollbackRetryTimeout());
+        Assertions.assertEquals(1L, 
serverProperties.getMaxEndStatusRetryTimeout());
     }
 }
diff --git 
a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
 
b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
index 1a2c523437..f2d09f8a6d 100644
--- 
a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
+++ 
b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
@@ -83,6 +83,7 @@ import static org.apache.seata.common.Constants.COMMITTING;
 import static org.apache.seata.common.Constants.RETRY_COMMITTING;
 import static org.apache.seata.common.Constants.RETRY_ROLLBACKING;
 import static org.apache.seata.common.Constants.ROLLBACKING;
+import static org.apache.seata.common.Constants.END;
 import static org.apache.seata.common.Constants.SYNC_PROCESSING;
 import static org.apache.seata.common.Constants.TX_TIMEOUT_CHECK;
 import static org.apache.seata.common.Constants.UNDOLOG_DELETE;
@@ -95,6 +96,7 @@ import static 
org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACKING_RETRY_PE
 import static 
org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE;
 import static 
org.apache.seata.common.DefaultValues.DEFAULT_TIMEOUT_RETRY_PERIOD;
 import static 
org.apache.seata.common.DefaultValues.DEFAULT_UNDO_LOG_DELETE_PERIOD;
+import static 
org.apache.seata.common.DefaultValues.DEFAULT_END_STATUS_RETRY_PERIOD;
 
 /**
  * The type Default coordinator.
@@ -125,6 +127,12 @@ public class DefaultCoordinator extends 
AbstractTCInboundHandler implements Tran
     protected static final long ROLLBACKING_RETRY_PERIOD = 
CONFIG.getLong(ConfigurationKeys.ROLLBACKING_RETRY_PERIOD,
             DEFAULT_ROLLBACKING_RETRY_PERIOD);
 
+    /**
+     * The constant END_STATUS_RETRY_PERIOD.
+     */
+    protected static final long END_STATUS_RETRY_PERIOD = 
CONFIG.getLong(ConfigurationKeys.END_STATUS_RETRY_PERIOD,
+        DEFAULT_END_STATUS_RETRY_PERIOD);
+
     /**
      * The constant TIMEOUT_RETRY_PERIOD.
      */
@@ -189,12 +197,15 @@ public class DefaultCoordinator extends 
AbstractTCInboundHandler implements Tran
 
     private final GlobalStatus[] retryRollbackingStatuses = new GlobalStatus[] 
{
         GlobalStatus.TimeoutRollbacking,
-        GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying};
+        GlobalStatus.TimeoutRollbackRetrying,
+        GlobalStatus.RollbackRetrying
+    };
 
-    private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] 
{GlobalStatus.CommitRetrying, GlobalStatus.Committed};
+    private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] 
{GlobalStatus.CommitRetrying};
 
     private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[] 
{GlobalStatus.Rollbacking};
     private final GlobalStatus[] committingStatuses = new GlobalStatus[] 
{GlobalStatus.Committing};
+    private final GlobalStatus[] endStatuses = new GlobalStatus[] 
{GlobalStatus.Rollbacked, GlobalStatus.TimeoutRollbacked, 
GlobalStatus.Committed, GlobalStatus.Finished};
 
     private final ThreadPoolExecutor branchRemoveExecutor;
 
@@ -432,6 +443,7 @@ public class DefaultCoordinator extends 
AbstractTCInboundHandler implements Tran
                     //The function of this 'return' is 'continue'.
                     return;
                 }
+
                 core.doGlobalRollback(rollbackingSession, true);
             } catch (TransactionException ex) {
                 LOGGER.error("Failed to retry rollbacking [{}] {} {}", 
rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
@@ -631,6 +643,58 @@ public class DefaultCoordinator extends 
AbstractTCInboundHandler implements Tran
             delay, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Handle end status by scheduled.
+     */
+    protected void handleEndStatesByScheduled() {
+        SessionCondition sessionCondition = new SessionCondition(endStatuses);
+        sessionCondition.setLazyLoadBranch(true);
+        List<GlobalSession> endStatusSessions =
+            
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
+        if (CollectionUtils.isEmpty(endStatusSessions)) {
+            endSchedule(RETRY_DEAD_THRESHOLD);
+            return;
+        }
+        long delay = END_STATUS_RETRY_PERIOD;
+        
endStatusSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
+        List<GlobalSession> needDoEndStatusSessions = new ArrayList<>();
+        for (GlobalSession endStatusSession : endStatusSessions) {
+            long time = endStatusSession.timeToDeadSession();
+            if (time <= 0) {
+                needDoEndStatusSessions.add(endStatusSession);
+            } else {
+                delay = Math.max(time, END_STATUS_RETRY_PERIOD);
+                break;
+            }
+        }
+        long now = System.currentTimeMillis();
+        SessionHelper.forEach(needDoEndStatusSessions, endStatusSession -> {
+            try {
+                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, 
endStatusSession.getBeginTime())) {
+                    handleEndStateSession(endStatusSession);
+                }
+            } catch (TransactionException ex) {
+                LOGGER.error("Failed to handle end status session [{}] {} {}", 
endStatusSession.getXid(), ex.getCode(), ex.getMessage());
+            }
+        });
+        endSchedule(delay);
+    }
+
+    private void handleEndStateSession(GlobalSession globalSession) throws 
TransactionException {
+        SessionHelper.processEndState(globalSession);
+    }
+
+    private void endSchedule(long delay) {
+        syncProcessing.schedule(
+            () -> {
+                boolean called = SessionHolder.distributedLockAndExecute(END, 
this::handleEndStatesByScheduled);
+                if (!called) {
+                    endSchedule(END_STATUS_RETRY_PERIOD);
+                }
+            },
+            delay, TimeUnit.MILLISECONDS);
+    }
+
     /**
      * Init.
      */
@@ -658,6 +722,8 @@ public class DefaultCoordinator extends 
AbstractTCInboundHandler implements Tran
         rollbackingSchedule(0);
 
         committingSchedule(0);
+
+        endSchedule(0);
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java 
b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
index f6488027e2..a674b1e7dc 100644
--- a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
+++ b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
@@ -19,6 +19,7 @@ package org.apache.seata.server.session;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -80,6 +81,13 @@ public class GlobalSession implements SessionLifecycle, 
SessionStorable {
     private static final int RETRY_DEAD_THRESHOLD = 
ConfigurationFactory.getInstance()
             .getInt(ConfigurationKeys.RETRY_DEAD_THRESHOLD, 
DefaultValues.DEFAULT_RETRY_DEAD_THRESHOLD);
 
+    /**
+     * If the global session's status is in an end state and currentTime - 
createTime >= END_STATE_RETRY_DEAD_THRESHOLD
+     * then the tx will be remand as need to retry rollback
+     */
+    private static final int END_STATE_RETRY_DEAD_THRESHOLD = 
ConfigurationFactory.getInstance()
+        .getInt(ConfigurationKeys.END_STATE_RETRY_DEAD_THRESHOLD, 
DefaultValues.DEFAULT_END_STATE_RETRY_DEAD_THRESHOLD);
+
     private String xid;
 
     private long transactionId;
@@ -203,13 +211,31 @@ public class GlobalSession implements SessionLifecycle, 
SessionStorable {
     }
 
     /**
-     * prevent could not handle committing and rollbacking transaction
+     * prevent could not handle committing, rollbacking and rollbacked 
transaction
+     *
+     * If the global session's status is in end status, it returns the 
remaining time until the session reaches
+     * the end state retry dead threshold.
+     * For other statuses, it returns the remaining time until the session 
reaches the retry dead threshold.
+     *
      * @return time to dead session. if not greater than 0, then deadSession
      */
     public long timeToDeadSession() {
+        if (isEndStatus()) {
+            return beginTime + END_STATE_RETRY_DEAD_THRESHOLD - 
System.currentTimeMillis();
+        }
         return beginTime + RETRY_DEAD_THRESHOLD - System.currentTimeMillis();
     }
 
+    private boolean isEndStatus() {
+        EnumSet<GlobalStatus> endStatuses = EnumSet.of(
+            GlobalStatus.Rollbacked,
+            GlobalStatus.TimeoutRollbacked,
+            GlobalStatus.Committed,
+            GlobalStatus.Finished
+        );
+        return endStatuses.contains(this.status);
+    }
+
     @Override
     public void begin() throws TransactionException {
         this.status = GlobalStatus.Begin;
diff --git 
a/server/src/main/java/org/apache/seata/server/session/SessionHelper.java 
b/server/src/main/java/org/apache/seata/server/session/SessionHelper.java
index 356a50cb10..fa18023e3b 100644
--- a/server/src/main/java/org/apache/seata/server/session/SessionHelper.java
+++ b/server/src/main/java/org/apache/seata/server/session/SessionHelper.java
@@ -454,6 +454,23 @@ public class SessionHelper {
         }
     }
 
+    public static void processEndState(GlobalSession globalSession) throws 
TransactionException {
+        GlobalStatus globalStatus = globalSession.getStatus();
+
+        switch (globalStatus) {
+            case Committed:
+            case Finished:
+                endCommitted(globalSession, true);
+                return;
+            case Rollbacked:
+            case TimeoutRollbacked:
+                endRollbacked(globalSession, true);
+                return;
+            default:
+                throw new TransactionException("Unsupported GlobalStatus:" + 
globalStatus);
+        }
+    }
+
     /**
      * if true, enable delete the branch asynchronously
      *
diff --git a/server/src/main/resources/application.example.yml 
b/server/src/main/resources/application.example.yml
index e13b1efc50..881cea9b51 100644
--- a/server/src/main/resources/application.example.yml
+++ b/server/src/main/resources/application.example.yml
@@ -152,6 +152,7 @@ seata:
       committing-retry-period: 1000
       async-committing-retry-period: 1000
       rollbacking-retry-period: 1000
+      end-status-retry-period: 1000
       timeout-retry-period: 1000
     undo:
       log-save-days: 7
diff --git a/server/src/main/resources/application.raft.example.yml 
b/server/src/main/resources/application.raft.example.yml
index dea10b382f..cc727ea02a 100644
--- a/server/src/main/resources/application.raft.example.yml
+++ b/server/src/main/resources/application.raft.example.yml
@@ -122,6 +122,7 @@ seata:
       committing-retry-period: 1000
       async-committing-retry-period: 1000
       rollbacking-retry-period: 1000
+      end-status-retry-period: 1000
       timeout-retry-period: 1000
     undo:
       log-save-days: 7


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to