This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 46c9b63728 feature: Implement scheduled handling for end status
transaction (#7133)
46c9b63728 is described below
commit 46c9b63728e47af2b0a0fab5deb9baef66b1d36a
Author: Yongjun Hong <[email protected]>
AuthorDate: Tue Feb 18 14:36:47 2025 +0900
feature: Implement scheduled handling for end status transaction (#7133)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 1 +
.../org/apache/seata/common/ConfigurationKeys.java | 10 ++++
.../java/org/apache/seata/common/Constants.java | 4 ++
.../org/apache/seata/common/DefaultValues.java | 10 ++++
script/config-center/config.txt | 1 +
.../properties/server/ServerProperties.java | 10 ++++
.../properties/server/ServerPropertiesTest.java | 2 +
.../server/coordinator/DefaultCoordinator.java | 70 +++++++++++++++++++++-
.../apache/seata/server/session/GlobalSession.java | 28 ++++++++-
.../apache/seata/server/session/SessionHelper.java | 17 ++++++
server/src/main/resources/application.example.yml | 1 +
.../main/resources/application.raft.example.yml | 1 +
13 files changed, 153 insertions(+), 3 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 1ae5a80cfd..5c999299f2 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -9,6 +9,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury
undolog parser
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster
mode supports address translation
- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury
serializer
+- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] Implement
scheduled handling for end status transaction
### bugfix:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 24d392898a..0141ea7a08 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -9,6 +9,7 @@
- [[#7037](https://github.com/apache/incubator-seata/pull/7037)]
支持UndoLog的fury序列化方式
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换
- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器
+- [[#7133](https://github.com/apache/incubator-seata/pull/7133)]
实现对残留的end状态事务定时处理
### bugfix:
diff --git
a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
index 47d9d48af3..0bcb83cf3c 100644
--- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
+++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
@@ -475,6 +475,11 @@ public interface ConfigurationKeys {
*/
String ROLLBACKING_RETRY_PERIOD = RECOVERY_PREFIX +
"rollbackingRetryPeriod";
+ /**
+ * The constant END_STATUS_RETRY_PERIOD.
+ */
+ String END_STATUS_RETRY_PERIOD = RECOVERY_PREFIX + "endstatusRetryPeriod";
+
/**
* The constant TIMEOUT_RETRY_PERIOD.
*/
@@ -599,6 +604,11 @@ public interface ConfigurationKeys {
*/
String RETRY_DEAD_THRESHOLD = SERVER_PREFIX + "retryDeadThreshold";
+ /**
+ * the constant END_STATE_RETRY_DEAD_THRESHOLD
+ */
+ String END_STATE_RETRY_DEAD_THRESHOLD = SERVER_PREFIX +
"endStateRetryDeadThreshold";
+
/**
* the constant DISTRIBUTED_LOCK_EXPIRE_TIME
*/
diff --git a/common/src/main/java/org/apache/seata/common/Constants.java
b/common/src/main/java/org/apache/seata/common/Constants.java
index 43da1827e0..812a3605e4 100644
--- a/common/src/main/java/org/apache/seata/common/Constants.java
+++ b/common/src/main/java/org/apache/seata/common/Constants.java
@@ -180,6 +180,10 @@ public interface Constants {
*/
String ROLLBACKING = "Rollbacking";
+ /**
+ * The constant END
+ */
+ String END = "END";
/**
* The constant AUTO_COMMIT
*/
diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java
b/common/src/main/java/org/apache/seata/common/DefaultValues.java
index fab0f3f40e..049d625548 100644
--- a/common/src/main/java/org/apache/seata/common/DefaultValues.java
+++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java
@@ -310,6 +310,11 @@ public interface DefaultValues {
*/
int DEFAULT_RETRY_DEAD_THRESHOLD = 2 * 60 * 1000 + 10 * 1000;
+ /**
+ * the constant DEFAULT_END_STATE_RETRY_DEAD_THRESHOLD
+ */
+ int DEFAULT_END_STATE_RETRY_DEAD_THRESHOLD = 10 * 1000;
+
/**
* the constant TM_INTERCEPTOR_ORDER
*/
@@ -397,6 +402,11 @@ public interface DefaultValues {
*/
int DEFAULT_ROLLBACKING_RETRY_PERIOD = 1000;
+ /**
+ * the constant DEFAULT_END_STATUS_RETRY_PERIOD
+ */
+ int DEFAULT_END_STATUS_RETRY_PERIOD = 30 * 1000;
+
/**
* the constant DEFAULT_TIMEOUT_RETRY_PERIOD
*/
diff --git a/script/config-center/config.txt b/script/config-center/config.txt
index f4794626c3..6ded35ed83 100644
--- a/script/config-center/config.txt
+++ b/script/config-center/config.txt
@@ -153,6 +153,7 @@ store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
+server.recovery.endstatusRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
diff --git
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
index f273b741d6..33f1b93892 100644
---
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
+++
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerProperties.java
@@ -27,6 +27,7 @@ import static
org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER
public class ServerProperties {
private long maxCommitRetryTimeout = -1L;
private long maxRollbackRetryTimeout = -1L;
+ private long maxEndStatusRetryTimeout = -1L;
private Boolean rollbackRetryTimeoutUnlockEnable = false;
private Boolean enableCheckAuth = true;
private Boolean enableParallelRequestHandle = true;
@@ -56,6 +57,15 @@ public class ServerProperties {
return this;
}
+ public long getMaxEndStatusRetryTimeout() {
+ return maxEndStatusRetryTimeout;
+ }
+
+ public ServerProperties setMaxEndStatusRetryTimeout(long
maxEndStatusRetryTimeout) {
+ this.maxEndStatusRetryTimeout = maxEndStatusRetryTimeout;
+ return this;
+ }
+
public Boolean getRollbackRetryTimeoutUnlockEnable() {
return rollbackRetryTimeoutUnlockEnable;
}
diff --git
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
index fc9161a07c..60d8db4a12 100644
---
a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
+++
b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerPropertiesTest.java
@@ -35,6 +35,7 @@ public class ServerPropertiesTest {
serverProperties.setRollbackRetryTimeoutUnlockEnable(true);
serverProperties.setMaxCommitRetryTimeout(1L);
serverProperties.setMaxRollbackRetryTimeout(1L);
+ serverProperties.setMaxEndStatusRetryTimeout(1L);
Assertions.assertEquals(1, serverProperties.getXaerNotaRetryTimeout());
Assertions.assertEquals(1, serverProperties.getRetryDeadThreshold());
@@ -47,5 +48,6 @@ public class ServerPropertiesTest {
Assertions.assertTrue(serverProperties.getRollbackRetryTimeoutUnlockEnable());
Assertions.assertEquals(1L,
serverProperties.getMaxCommitRetryTimeout());
Assertions.assertEquals(1L,
serverProperties.getMaxRollbackRetryTimeout());
+ Assertions.assertEquals(1L,
serverProperties.getMaxEndStatusRetryTimeout());
}
}
diff --git
a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
index 1a2c523437..f2d09f8a6d 100644
---
a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
+++
b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java
@@ -83,6 +83,7 @@ import static org.apache.seata.common.Constants.COMMITTING;
import static org.apache.seata.common.Constants.RETRY_COMMITTING;
import static org.apache.seata.common.Constants.RETRY_ROLLBACKING;
import static org.apache.seata.common.Constants.ROLLBACKING;
+import static org.apache.seata.common.Constants.END;
import static org.apache.seata.common.Constants.SYNC_PROCESSING;
import static org.apache.seata.common.Constants.TX_TIMEOUT_CHECK;
import static org.apache.seata.common.Constants.UNDOLOG_DELETE;
@@ -95,6 +96,7 @@ import static
org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACKING_RETRY_PE
import static
org.apache.seata.common.DefaultValues.DEFAULT_ROLLBACK_FAILED_UNLOCK_ENABLE;
import static
org.apache.seata.common.DefaultValues.DEFAULT_TIMEOUT_RETRY_PERIOD;
import static
org.apache.seata.common.DefaultValues.DEFAULT_UNDO_LOG_DELETE_PERIOD;
+import static
org.apache.seata.common.DefaultValues.DEFAULT_END_STATUS_RETRY_PERIOD;
/**
* The type Default coordinator.
@@ -125,6 +127,12 @@ public class DefaultCoordinator extends
AbstractTCInboundHandler implements Tran
protected static final long ROLLBACKING_RETRY_PERIOD =
CONFIG.getLong(ConfigurationKeys.ROLLBACKING_RETRY_PERIOD,
DEFAULT_ROLLBACKING_RETRY_PERIOD);
+ /**
+ * The constant END_STATUS_RETRY_PERIOD.
+ */
+ protected static final long END_STATUS_RETRY_PERIOD =
CONFIG.getLong(ConfigurationKeys.END_STATUS_RETRY_PERIOD,
+ DEFAULT_END_STATUS_RETRY_PERIOD);
+
/**
* The constant TIMEOUT_RETRY_PERIOD.
*/
@@ -189,12 +197,15 @@ public class DefaultCoordinator extends
AbstractTCInboundHandler implements Tran
private final GlobalStatus[] retryRollbackingStatuses = new GlobalStatus[]
{
GlobalStatus.TimeoutRollbacking,
- GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying};
+ GlobalStatus.TimeoutRollbackRetrying,
+ GlobalStatus.RollbackRetrying
+ };
- private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[]
{GlobalStatus.CommitRetrying, GlobalStatus.Committed};
+ private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[]
{GlobalStatus.CommitRetrying};
private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[]
{GlobalStatus.Rollbacking};
private final GlobalStatus[] committingStatuses = new GlobalStatus[]
{GlobalStatus.Committing};
+ private final GlobalStatus[] endStatuses = new GlobalStatus[]
{GlobalStatus.Rollbacked, GlobalStatus.TimeoutRollbacked,
GlobalStatus.Committed, GlobalStatus.Finished};
private final ThreadPoolExecutor branchRemoveExecutor;
@@ -432,6 +443,7 @@ public class DefaultCoordinator extends
AbstractTCInboundHandler implements Tran
//The function of this 'return' is 'continue'.
return;
}
+
core.doGlobalRollback(rollbackingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to retry rollbacking [{}] {} {}",
rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
@@ -631,6 +643,58 @@ public class DefaultCoordinator extends
AbstractTCInboundHandler implements Tran
delay, TimeUnit.MILLISECONDS);
}
+ /**
+ * Handle end status by scheduled.
+ */
+ protected void handleEndStatesByScheduled() {
+ SessionCondition sessionCondition = new SessionCondition(endStatuses);
+ sessionCondition.setLazyLoadBranch(true);
+ List<GlobalSession> endStatusSessions =
+
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
+ if (CollectionUtils.isEmpty(endStatusSessions)) {
+ endSchedule(RETRY_DEAD_THRESHOLD);
+ return;
+ }
+ long delay = END_STATUS_RETRY_PERIOD;
+
endStatusSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
+ List<GlobalSession> needDoEndStatusSessions = new ArrayList<>();
+ for (GlobalSession endStatusSession : endStatusSessions) {
+ long time = endStatusSession.timeToDeadSession();
+ if (time <= 0) {
+ needDoEndStatusSessions.add(endStatusSession);
+ } else {
+ delay = Math.max(time, END_STATUS_RETRY_PERIOD);
+ break;
+ }
+ }
+ long now = System.currentTimeMillis();
+ SessionHelper.forEach(needDoEndStatusSessions, endStatusSession -> {
+ try {
+ if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT,
endStatusSession.getBeginTime())) {
+ handleEndStateSession(endStatusSession);
+ }
+ } catch (TransactionException ex) {
+ LOGGER.error("Failed to handle end status session [{}] {} {}",
endStatusSession.getXid(), ex.getCode(), ex.getMessage());
+ }
+ });
+ endSchedule(delay);
+ }
+
+ private void handleEndStateSession(GlobalSession globalSession) throws
TransactionException {
+ SessionHelper.processEndState(globalSession);
+ }
+
+ private void endSchedule(long delay) {
+ syncProcessing.schedule(
+ () -> {
+ boolean called = SessionHolder.distributedLockAndExecute(END,
this::handleEndStatesByScheduled);
+ if (!called) {
+ endSchedule(END_STATUS_RETRY_PERIOD);
+ }
+ },
+ delay, TimeUnit.MILLISECONDS);
+ }
+
/**
* Init.
*/
@@ -658,6 +722,8 @@ public class DefaultCoordinator extends
AbstractTCInboundHandler implements Tran
rollbackingSchedule(0);
committingSchedule(0);
+
+ endSchedule(0);
}
@Override
diff --git
a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
index f6488027e2..a674b1e7dc 100644
--- a/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
+++ b/server/src/main/java/org/apache/seata/server/session/GlobalSession.java
@@ -19,6 +19,7 @@ package org.apache.seata.server.session;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -80,6 +81,13 @@ public class GlobalSession implements SessionLifecycle,
SessionStorable {
private static final int RETRY_DEAD_THRESHOLD =
ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.RETRY_DEAD_THRESHOLD,
DefaultValues.DEFAULT_RETRY_DEAD_THRESHOLD);
+ /**
+ * If the global session's status is in an end state and currentTime -
createTime >= END_STATE_RETRY_DEAD_THRESHOLD
+ * then the tx will be remand as need to retry rollback
+ */
+ private static final int END_STATE_RETRY_DEAD_THRESHOLD =
ConfigurationFactory.getInstance()
+ .getInt(ConfigurationKeys.END_STATE_RETRY_DEAD_THRESHOLD,
DefaultValues.DEFAULT_END_STATE_RETRY_DEAD_THRESHOLD);
+
private String xid;
private long transactionId;
@@ -203,13 +211,31 @@ public class GlobalSession implements SessionLifecycle,
SessionStorable {
}
/**
- * prevent could not handle committing and rollbacking transaction
+ * prevent could not handle committing, rollbacking and rollbacked
transaction
+ *
+ * If the global session's status is in end status, it returns the
remaining time until the session reaches
+ * the end state retry dead threshold.
+ * For other statuses, it returns the remaining time until the session
reaches the retry dead threshold.
+ *
* @return time to dead session. if not greater than 0, then deadSession
*/
public long timeToDeadSession() {
+ if (isEndStatus()) {
+ return beginTime + END_STATE_RETRY_DEAD_THRESHOLD -
System.currentTimeMillis();
+ }
return beginTime + RETRY_DEAD_THRESHOLD - System.currentTimeMillis();
}
+ private boolean isEndStatus() {
+ EnumSet<GlobalStatus> endStatuses = EnumSet.of(
+ GlobalStatus.Rollbacked,
+ GlobalStatus.TimeoutRollbacked,
+ GlobalStatus.Committed,
+ GlobalStatus.Finished
+ );
+ return endStatuses.contains(this.status);
+ }
+
@Override
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
diff --git
a/server/src/main/java/org/apache/seata/server/session/SessionHelper.java
b/server/src/main/java/org/apache/seata/server/session/SessionHelper.java
index 356a50cb10..fa18023e3b 100644
--- a/server/src/main/java/org/apache/seata/server/session/SessionHelper.java
+++ b/server/src/main/java/org/apache/seata/server/session/SessionHelper.java
@@ -454,6 +454,23 @@ public class SessionHelper {
}
}
+ public static void processEndState(GlobalSession globalSession) throws
TransactionException {
+ GlobalStatus globalStatus = globalSession.getStatus();
+
+ switch (globalStatus) {
+ case Committed:
+ case Finished:
+ endCommitted(globalSession, true);
+ return;
+ case Rollbacked:
+ case TimeoutRollbacked:
+ endRollbacked(globalSession, true);
+ return;
+ default:
+ throw new TransactionException("Unsupported GlobalStatus:" +
globalStatus);
+ }
+ }
+
/**
* if true, enable delete the branch asynchronously
*
diff --git a/server/src/main/resources/application.example.yml
b/server/src/main/resources/application.example.yml
index e13b1efc50..881cea9b51 100644
--- a/server/src/main/resources/application.example.yml
+++ b/server/src/main/resources/application.example.yml
@@ -152,6 +152,7 @@ seata:
committing-retry-period: 1000
async-committing-retry-period: 1000
rollbacking-retry-period: 1000
+ end-status-retry-period: 1000
timeout-retry-period: 1000
undo:
log-save-days: 7
diff --git a/server/src/main/resources/application.raft.example.yml
b/server/src/main/resources/application.raft.example.yml
index dea10b382f..cc727ea02a 100644
--- a/server/src/main/resources/application.raft.example.yml
+++ b/server/src/main/resources/application.raft.example.yml
@@ -122,6 +122,7 @@ seata:
committing-retry-period: 1000
async-committing-retry-period: 1000
rollbacking-retry-period: 1000
+ end-status-retry-period: 1000
timeout-retry-period: 1000
undo:
log-save-days: 7
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]