This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 0cd141caf0 refactor: replace synchronization logic in 
AbstractNettyRemotingClient (#7719)
0cd141caf0 is described below

commit 0cd141caf017a0b304dceb7f19bb1bfa900ae61f
Author: lokidundun <[email protected]>
AuthorDate: Thu Oct 23 10:27:50 2025 +0800

    refactor: replace synchronization logic in AbstractNettyRemotingClient 
(#7719)
---
 changes/en-us/2.x.md                               |  1 +
 changes/zh-cn/2.x.md                               |  1 +
 .../core/rpc/netty/AbstractNettyRemoting.java      | 28 +++++++++++-------
 .../rpc/netty/AbstractNettyRemotingClient.java     | 33 +++++++++++++++-------
 .../rpc/netty/AbstractNettyRemotingServer.java     |  7 +++--
 5 files changed, 48 insertions(+), 22 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 7705bc0bfa..69194e5613 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -94,6 +94,7 @@ Add changes here for all PR submitted to the 2.x branch.
 
 - [[#7615](https://github.com/seata/seata/pull/7615)] Refactor DataSourceProxy
 - [[#7617](https://github.com/seata/seata/pull/7617)] Refactor Alibaba Dubbo 
and HSF
+- [[#7719](https://github.com/apache/incubator-seata/pull/7719)] Replace 
synchronized with ReentrantLock in AbstractNettyRemotingClient to support 
virtual threads
 
 
 ### doc:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index a34a0e3b19..c409d29287 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -92,6 +92,7 @@
 
 - [[#7615](https://github.com/seata/seata/pull/7615)] 重构 DataSourceProxy
 - [[#7617](https://github.com/seata/seata/pull/7617)] 重构 Alibaba Dubbo 和 HSF 模块
+- [[#7719](https://github.com/apache/incubator-seata/pull/7719)] 替换 
AbstractNettyRemotingClient 中的 synchronized 为 ReentrantLock,以支持虚拟线程
 
 
 ### doc:
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java
index 6baaa0e302..8b5c96c56e 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java
@@ -53,6 +53,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * The abstract netty remoting.
@@ -92,7 +94,8 @@ public abstract class AbstractNettyRemoting implements 
Disposable {
     protected volatile long nowMills = 0;
 
     private static final int TIMEOUT_CHECK_INTERVAL = 3000;
-    protected final Object lock = new Object();
+    protected final ReentrantLock writabilityLock = new ReentrantLock();
+    protected final Condition writabilityCondition = 
writabilityLock.newCondition();
     /**
      * The Is sending.
      */
@@ -379,21 +382,26 @@ public abstract class AbstractNettyRemoting implements 
Disposable {
 
     private void channelWritableCheck(Channel channel, Object msg) {
         int tryTimes = 0;
-        synchronized (lock) {
+        writabilityLock.lock();
+        try {
             while (!channel.isWritable()) {
+                tryTimes++;
+                if (tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
+                    destroyChannel(channel);
+                    throw new FrameworkException(
+                            "msg:" + ((msg == null) ? "null" : msg.toString()),
+                            FrameworkErrorCode.ChannelIsNotWritable);
+                }
                 try {
-                    tryTimes++;
-                    if (tryTimes > 
NettyClientConfig.getMaxNotWriteableRetry()) {
-                        destroyChannel(channel);
-                        throw new FrameworkException(
-                                "msg:" + ((msg == null) ? "null" : 
msg.toString()),
-                                FrameworkErrorCode.ChannelIsNotWritable);
-                    }
-                    lock.wait(NOT_WRITEABLE_CHECK_MILLS);
+                    writabilityCondition.await(NOT_WRITEABLE_CHECK_MILLS, 
TimeUnit.MILLISECONDS);
                 } catch (InterruptedException exx) {
                     LOGGER.error(exx.getMessage());
+                    Thread.currentThread().interrupt();
+                    throw new FrameworkException(exx);
                 }
             }
+        } finally {
+            writabilityLock.unlock();
         }
     }
 
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 1e313c8c9f..9a42092277 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -67,6 +67,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 
 import static 
org.apache.seata.common.exception.FrameworkErrorCode.NoAvailableService;
@@ -90,7 +92,9 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
 
     private final CopyOnWriteArrayList<ChannelEventListener> 
channelEventListeners = new CopyOnWriteArrayList<>();
 
-    protected final Object mergeLock = new Object();
+    protected final ReentrantLock mergeLock = new ReentrantLock();
+    protected final Condition mergeCondition = mergeLock.newCondition();
+    protected volatile boolean isSending = false;
 
     /**
      * When sending message type is {@link MergeMessage}, will be stored to 
mergeMsgMap.
@@ -183,8 +187,11 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
                 LOGGER.debug("offer message: {}", rpcMessage.getBody());
             }
             if (!isSending) {
-                synchronized (mergeLock) {
-                    mergeLock.notifyAll();
+                mergeLock.lock();
+                try {
+                    mergeCondition.signalAll();
+                } finally {
+                    mergeLock.unlock();
                 }
             }
 
@@ -578,11 +585,14 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
         @Override
         public void run() {
             while (true) {
-                synchronized (mergeLock) {
-                    try {
-                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
-                    } catch (InterruptedException e) {
-                    }
+                mergeLock.lock();
+                try {
+                    mergeCondition.await(MAX_MERGE_SEND_MILLS, 
TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOGGER.warn("MergedSendRunnable wait interrupted", e);
+                } finally {
+                    mergeLock.unlock();
                 }
                 isSending = true;
                 basketMap.forEach((address, basket) -> {
@@ -665,10 +675,13 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
 
         @Override
         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
-            synchronized (lock) {
+            AbstractNettyRemotingClient.super.writabilityLock.lock();
+            try {
                 if (ctx.channel().isWritable()) {
-                    lock.notifyAll();
+                    
AbstractNettyRemotingClient.super.writabilityCondition.signalAll();
                 }
+            } finally {
+                AbstractNettyRemotingClient.super.writabilityLock.unlock();
             }
             ctx.fireChannelWritabilityChanged();
         }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
index 3943587bd3..1bcad8c6f7 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
@@ -180,10 +180,13 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
 
         @Override
         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
-            synchronized (lock) {
+            AbstractNettyRemotingServer.super.writabilityLock.lock();
+            try {
                 if (ctx.channel().isWritable()) {
-                    lock.notifyAll();
+                    
AbstractNettyRemotingServer.super.writabilityCondition.signalAll();
                 }
+            } finally {
+                AbstractNettyRemotingServer.super.writabilityLock.unlock();
             }
             ctx.fireChannelWritabilityChanged();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to