This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch ms
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6e7e73357b37391f9c7dab0d277e960e9f355c8c
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 10:36:23 2026 +0800

    Pipe: Fixed the bug that the timeout ms is regarded as s (#17590)
---
 .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../iotdb/commons/pipe/agent/task/PipeTaskAgent.java     | 16 ++++++++++++++--
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index c437507f4b0..9038e7c3a71 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -374,7 +374,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   ///////////////////////// Heartbeat /////////////////////////
 
   public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws 
TException {
-    if (!tryReadLockWithTimeOut(
+    if (!tryReadLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 726bc1b6a1f..fcff317b3f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -125,6 +125,10 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) {
+    return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+  }
+
   protected void releaseReadLock() {
     pipeMetaKeeper.releaseReadLock();
   }
@@ -143,10 +147,18 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) {
+    return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+  }
+
   protected void releaseWriteLock() {
     pipeMetaKeeper.releaseWriteLock();
   }
 
+  private long convertMsToCeilSeconds(final long timeOutInMs) {
+    return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L);
+  }
+
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
   public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
@@ -363,7 +375,7 @@ public abstract class PipeTaskAgent {
 
   public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
       final List<PipeMeta> pipeMetaListFromCoordinator) {
-    if (!tryWriteLockWithTimeOut(
+    if (!tryWriteLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return null;
     }
@@ -1091,7 +1103,7 @@ public abstract class PipeTaskAgent {
 
   public void collectPipeMetaList(final TPipeHeartbeatReq req, final 
TPipeHeartbeatResp resp)
       throws TException {
-    if (!tryReadLockWithTimeOut(
+    if (!tryReadLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return;
     }

Reply via email to