This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e3b9b364fae [To dev/1.3] Pipe: Set placeholder for required
pipeMetaList & Prevented the empty progress report when shutdown (#16388)
(#16396)
e3b9b364fae is described below
commit e3b9b364fae383c7716cd96d54ce33ba71c7289f
Author: Caideyipi <[email protected]>
AuthorDate: Fri Sep 12 10:07:54 2025 +0800
[To dev/1.3] Pipe: Set placeholder for required pipeMetaList & Prevented
the empty progress report when shutdown (#16388) (#16396)
* fix
* refactor
---
.../manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java | 2 +-
.../manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java | 4 ++--
.../pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java | 3 ++-
.../org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 5 ++++-
.../db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java | 2 +-
5 files changed, 10 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 5b6369a2c3c..eae99126523 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -104,7 +104,7 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
public void parseHeartbeat(
final int dataNodeId,
- /* @Nullable */ final List<ByteBuffer>
pipeMetaByteBufferListFromDataNode,
+ final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 547310ce49c..13a3c4b83d6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -35,11 +35,11 @@ public class PipeHeartbeat {
private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
public PipeHeartbeat(
- /* @Nullable */ final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
+ final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
- // Pipe meta may be null for nodes shutting down, return empty heartbeat
+ // Shall not reach here, just in case
if (Objects.isNull(pipeMetaByteBufferListFromAgent)) {
return;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index ef9d0f50409..8f864b4d5c2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -118,7 +119,7 @@ public class PipeHeartbeatScheduler {
// config node heartbeat
try {
- final TPipeHeartbeatResp configNodeResp = new TPipeHeartbeatResp();
+ final TPipeHeartbeatResp configNodeResp = new TPipeHeartbeatResp(new
ArrayList<>());
PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp);
pipeHeartbeatParser.parseHeartbeat(
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 6baa48c8d68..828cd6eb691 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -632,8 +632,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
// Send request to some API server
- final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+ final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new
ArrayList<>());
collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+ if (resp.getPipeMetaList().isEmpty()) {
+ return;
+ }
final TSStatus result =
configNodeClient.pushHeartbeat(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 6ba0c45fe7b..c7c51ed5f4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1374,7 +1374,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws
TException {
- final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+ final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new ArrayList<>());
PipeDataNodeAgent.task().collectPipeMetaList(req, resp);
return resp;
}