This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 87bd830fc28 Pipe: Set placeholder for required pipeMetaList & 
Prevented the empty progress report when shutdown (#16388)
87bd830fc28 is described below

commit 87bd830fc2869a0096cb2cb2513a5f52e57a8b7b
Author: Caideyipi <[email protected]>
AuthorDate: Thu Sep 11 12:30:43 2025 +0800

    Pipe: Set placeholder for required pipeMetaList & Prevented the empty 
progress report when shutdown (#16388)
    
    * fix
    
    * refactor
---
 .../manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java     | 2 +-
 .../manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java    | 4 ++--
 .../pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java   | 3 ++-
 .../org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java   | 5 ++++-
 .../db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java      | 2 +-
 5 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 1e8f32eda76..cfbc35a446c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -92,7 +92,7 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
 
   public void parseHeartbeat(
       final int dataNodeId,
-      /* @Nullable */ final List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode,
+      final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
       /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
       /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
       /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 547310ce49c..13a3c4b83d6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -35,11 +35,11 @@ public class PipeHeartbeat {
   private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
 
   public PipeHeartbeat(
-      /* @Nullable */ final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
+      final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
       /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
       /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
       /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
-    // Pipe meta may be null for nodes shutting down, return empty heartbeat
+    // Shall not reach here, just in case
     if (Objects.isNull(pipeMetaByteBufferListFromAgent)) {
       return;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index ef9d0f50409..8f864b4d5c2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -118,7 +119,7 @@ public class PipeHeartbeatScheduler {
 
     // config node heartbeat
     try {
-      final TPipeHeartbeatResp configNodeResp = new TPipeHeartbeatResp();
+      final TPipeHeartbeatResp configNodeResp = new TPipeHeartbeatResp(new 
ArrayList<>());
       PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp);
       pipeHeartbeatParser.parseHeartbeat(
           ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 7e881401838..728e2801c8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -646,8 +646,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     try (final ConfigNodeClient configNodeClient =
         
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
       // Send request to some API server
-      final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+      final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new 
ArrayList<>());
       collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+      if (resp.getPipeMetaList().isEmpty()) {
+        return;
+      }
       final TSStatus result =
           configNodeClient.pushHeartbeat(
               IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 10ec5177722..f7a2fa87980 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1459,7 +1459,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws 
TException {
-    final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+    final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new ArrayList<>());
     PipeDataNodeAgent.task().collectPipeMetaList(req, resp);
     return resp;
   }

Reply via email to