This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 87bd830fc28 Pipe: Set placeholder for required pipeMetaList &
Prevented the empty progress report when shutdown (#16388)
87bd830fc28 is described below
commit 87bd830fc2869a0096cb2cb2513a5f52e57a8b7b
Author: Caideyipi <[email protected]>
AuthorDate: Thu Sep 11 12:30:43 2025 +0800
Pipe: Set placeholder for required pipeMetaList & Prevented the empty
progress report when shutdown (#16388)
* fix
* refactor
---
.../manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java | 2 +-
.../manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java | 4 ++--
.../pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java | 3 ++-
.../org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 5 ++++-
.../db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java | 2 +-
5 files changed, 10 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 1e8f32eda76..cfbc35a446c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -92,7 +92,7 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
public void parseHeartbeat(
final int dataNodeId,
- /* @Nullable */ final List<ByteBuffer>
pipeMetaByteBufferListFromDataNode,
+ final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 547310ce49c..13a3c4b83d6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -35,11 +35,11 @@ public class PipeHeartbeat {
private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
public PipeHeartbeat(
- /* @Nullable */ final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
+ final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
- // Pipe meta may be null for nodes shutting down, return empty heartbeat
+ // Shall not reach here, just in case
if (Objects.isNull(pipeMetaByteBufferListFromAgent)) {
return;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index ef9d0f50409..8f864b4d5c2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -118,7 +119,7 @@ public class PipeHeartbeatScheduler {
// config node heartbeat
try {
- final TPipeHeartbeatResp configNodeResp = new TPipeHeartbeatResp();
+ final TPipeHeartbeatResp configNodeResp = new TPipeHeartbeatResp(new
ArrayList<>());
PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp);
pipeHeartbeatParser.parseHeartbeat(
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 7e881401838..728e2801c8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -646,8 +646,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
// Send request to some API server
- final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+ final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new
ArrayList<>());
collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+ if (resp.getPipeMetaList().isEmpty()) {
+ return;
+ }
final TSStatus result =
configNodeClient.pushHeartbeat(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 10ec5177722..f7a2fa87980 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1459,7 +1459,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws
TException {
- final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+ final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new ArrayList<>());
PipeDataNodeAgent.task().collectPipeMetaList(req, resp);
return resp;
}