This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch bugn
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f45e50b06ae509f422426ff958b6be1b7c3f8708
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 19:43:52 2026 +0800

    fix
---
 .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../iotdb/commons/pipe/agent/task/PipeTaskAgent.java     | 16 ++++++++++++++--
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 031cfd3a62e..06d6a512b30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -377,7 +377,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   ///////////////////////// Heartbeat /////////////////////////
 
   public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws 
TException {
-    if (!tryReadLockWithTimeOut(
+    if (!tryReadLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 73b543592c1..99d45298794 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -130,6 +130,10 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) {
+    return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+  }
+
   protected void releaseReadLock() {
     pipeMetaKeeper.releaseReadLock();
   }
@@ -148,10 +152,18 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) {
+    return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+  }
+
   protected void releaseWriteLock() {
     pipeMetaKeeper.releaseWriteLock();
   }
 
+  private long convertMsToCeilSeconds(final long timeOutInMs) {
+    return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L);
+  }
+
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
   public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
@@ -368,7 +380,7 @@ public abstract class PipeTaskAgent {
 
   public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
       final List<PipeMeta> pipeMetaListFromCoordinator) {
-    if (!tryWriteLockWithTimeOut(
+    if (!tryWriteLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return null;
     }
@@ -1107,7 +1119,7 @@ public abstract class PipeTaskAgent {
 
   public void collectPipeMetaList(final TPipeHeartbeatReq req, final 
TPipeHeartbeatResp resp)
       throws TException {
-    if (!tryReadLockWithTimeOut(
+    if (!tryReadLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return;
     }

Reply via email to