This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 0cd141caf0 refactor: replace synchronization logic in
AbstractNettyRemotingClient (#7719)
0cd141caf0 is described below
commit 0cd141caf017a0b304dceb7f19bb1bfa900ae61f
Author: lokidundun <[email protected]>
AuthorDate: Thu Oct 23 10:27:50 2025 +0800
refactor: replace synchronization logic in AbstractNettyRemotingClient
(#7719)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 1 +
.../core/rpc/netty/AbstractNettyRemoting.java | 28 +++++++++++-------
.../rpc/netty/AbstractNettyRemotingClient.java | 33 +++++++++++++++-------
.../rpc/netty/AbstractNettyRemotingServer.java | 7 +++--
5 files changed, 48 insertions(+), 22 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 7705bc0bfa..69194e5613 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -94,6 +94,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7615](https://github.com/seata/seata/pull/7615)] Refactor DataSourceProxy
- [[#7617](https://github.com/seata/seata/pull/7617)] Refactor Alibaba Dubbo
and HSF
+- [[#7719](https://github.com/apache/incubator-seata/pull/7719)] Replace
synchronized with ReentrantLock in AbstractNettyRemotingClient to support
virtual threads
### doc:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index a34a0e3b19..c409d29287 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -92,6 +92,7 @@
- [[#7615](https://github.com/seata/seata/pull/7615)] 重构 DataSourceProxy
- [[#7617](https://github.com/seata/seata/pull/7617)] 重构 Alibaba Dubbo 和 HSF 模块
+- [[#7719](https://github.com/apache/incubator-seata/pull/7719)] 替换
AbstractNettyRemotingClient 中的 synchronized 为 ReentrantLock,以支持虚拟线程
### doc:
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java
index 6baaa0e302..8b5c96c56e 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java
@@ -53,6 +53,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
/**
* The abstract netty remoting.
@@ -92,7 +94,8 @@ public abstract class AbstractNettyRemoting implements
Disposable {
protected volatile long nowMills = 0;
private static final int TIMEOUT_CHECK_INTERVAL = 3000;
- protected final Object lock = new Object();
+ protected final ReentrantLock writabilityLock = new ReentrantLock();
+ protected final Condition writabilityCondition =
writabilityLock.newCondition();
/**
* The Is sending.
*/
@@ -379,21 +382,26 @@ public abstract class AbstractNettyRemoting implements
Disposable {
private void channelWritableCheck(Channel channel, Object msg) {
int tryTimes = 0;
- synchronized (lock) {
+ writabilityLock.lock();
+ try {
while (!channel.isWritable()) {
+ tryTimes++;
+ if (tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
+ destroyChannel(channel);
+ throw new FrameworkException(
+ "msg:" + ((msg == null) ? "null" : msg.toString()),
+ FrameworkErrorCode.ChannelIsNotWritable);
+ }
try {
- tryTimes++;
- if (tryTimes >
NettyClientConfig.getMaxNotWriteableRetry()) {
- destroyChannel(channel);
- throw new FrameworkException(
- "msg:" + ((msg == null) ? "null" :
msg.toString()),
- FrameworkErrorCode.ChannelIsNotWritable);
- }
- lock.wait(NOT_WRITEABLE_CHECK_MILLS);
+ writabilityCondition.await(NOT_WRITEABLE_CHECK_MILLS,
TimeUnit.MILLISECONDS);
} catch (InterruptedException exx) {
LOGGER.error(exx.getMessage());
+ Thread.currentThread().interrupt();
+ throw new FrameworkException(exx);
}
}
+ } finally {
+ writabilityLock.unlock();
}
}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 1e313c8c9f..9a42092277 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -67,6 +67,8 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static
org.apache.seata.common.exception.FrameworkErrorCode.NoAvailableService;
@@ -90,7 +92,9 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
private final CopyOnWriteArrayList<ChannelEventListener>
channelEventListeners = new CopyOnWriteArrayList<>();
- protected final Object mergeLock = new Object();
+ protected final ReentrantLock mergeLock = new ReentrantLock();
+ protected final Condition mergeCondition = mergeLock.newCondition();
+ protected volatile boolean isSending = false;
/**
* When sending message type is {@link MergeMessage}, will be stored to
mergeMsgMap.
@@ -183,8 +187,11 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
if (!isSending) {
- synchronized (mergeLock) {
- mergeLock.notifyAll();
+ mergeLock.lock();
+ try {
+ mergeCondition.signalAll();
+ } finally {
+ mergeLock.unlock();
}
}
@@ -578,11 +585,14 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
@Override
public void run() {
while (true) {
- synchronized (mergeLock) {
- try {
- mergeLock.wait(MAX_MERGE_SEND_MILLS);
- } catch (InterruptedException e) {
- }
+ mergeLock.lock();
+ try {
+ mergeCondition.await(MAX_MERGE_SEND_MILLS,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("MergedSendRunnable wait interrupted", e);
+ } finally {
+ mergeLock.unlock();
}
isSending = true;
basketMap.forEach((address, basket) -> {
@@ -665,10 +675,13 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
- synchronized (lock) {
+ AbstractNettyRemotingClient.super.writabilityLock.lock();
+ try {
if (ctx.channel().isWritable()) {
- lock.notifyAll();
+
AbstractNettyRemotingClient.super.writabilityCondition.signalAll();
}
+ } finally {
+ AbstractNettyRemotingClient.super.writabilityLock.unlock();
}
ctx.fireChannelWritabilityChanged();
}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
index 3943587bd3..1bcad8c6f7 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
@@ -180,10 +180,13 @@ public abstract class AbstractNettyRemotingServer extends
AbstractNettyRemoting
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
- synchronized (lock) {
+ AbstractNettyRemotingServer.super.writabilityLock.lock();
+ try {
if (ctx.channel().isWritable()) {
- lock.notifyAll();
+
AbstractNettyRemotingServer.super.writabilityCondition.signalAll();
}
+ } finally {
+ AbstractNettyRemotingServer.super.writabilityLock.unlock();
}
ctx.fireChannelWritabilityChanged();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]