This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 977086e33bfd3e3a4fad5140e19eb2458e55a45a Author: Caideyipi <[email protected]> AuthorDate: Thu Jul 10 12:07:50 2025 +0800 [To dev/1.3] Pipe: Enabled waiting for pipes to finish & progress index persist to config node in shutdown hook (#15896)(#15901) * Pipe: Enabled waiting for pipes to finish & progress index persist to config node in shutdown hook (#15896) * persist in shutdown hook * Update PipeTaskAgent.java * Update PipeConfigNodeTaskAgent.java * Update DataNodeShutdownHook.java * Fix * Fix2 * Fix3 * Update PipeHeartbeatScheduler.java * Update ShowPipeTask.java --- .../rpc/DataNodeAsyncRequestRPCHandler.java | 2 +- .../handlers/rpc/PipeHeartbeatRPCHandler.java | 2 +- .../iotdb/confignode/manager/ConfigManager.java | 18 ++++++++++ .../apache/iotdb/confignode/manager/IManager.java | 3 ++ .../pipe/agent/task/PipeConfigNodeTaskAgent.java | 2 +- .../runtime/heartbeat/PipeHeartbeatScheduler.java | 2 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 6 ++++ .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 41 ++++++++++++++++------ .../subtask/processor/PipeProcessorSubtask.java | 4 +-- .../event/common/heartbeat/PipeHeartbeatEvent.java | 6 ++-- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 6 ++-- .../common/tablet/PipeRawTabletInsertionEvent.java | 6 ++-- .../common/tsfile/PipeTsFileInsertionEvent.java | 6 ++-- .../dataregion/IoTDBDataRegionExtractor.java | 4 +-- .../PipeRealtimeDataRegionHybridExtractor.java | 5 ++- .../schemaregion/IoTDBSchemaRegionExtractor.java | 4 +-- .../iotdb/db/pipe/metric/PipeDataNodeMetrics.java | 6 ++-- .../PipeDataNodeRemainingEventAndTimeOperator.java | 18 +++++++++- ...ics.java => PipeDataNodeSinglePipeMetrics.java} | 15 ++++---- .../iotdb/db/protocol/client/ConfigNodeClient.java | 8 +++++ .../impl/DataNodeInternalRPCServiceImpl.java | 2 +- .../execution/config/sys/pipe/ShowPipeTask.java | 4 +-- .../iotdb/db/service/DataNodeShutdownHook.java | 37 +++++++++++++++++++ .../apache/iotdb/commons/conf/CommonConfig.java | 14 ++++++++ .../commons/pipe/agent/task/PipeTaskAgent.java | 2 +- .../iotdb/commons/pipe/config/PipeConfig.java | 5 +++ .../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++ .../thrift-commons/src/main/thrift/common.thrift | 7 ++++ .../src/main/thrift/confignode.thrift | 3 ++ .../src/main/thrift/datanode.thrift | 9 +---- 30 files changed, 193 insertions(+), 59 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index 6e2a9dc97cc..69021eaec1a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.client.async.handlers.rpc; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.commons.client.request.AsyncRequestContext; @@ -32,7 +33,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp; import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp; import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp; import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java index 569424afbff..e5fa157961d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java @@ -20,8 +20,8 @@ package org.apache.iotdb.confignode.client.async.handlers.rpc; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 0a207ab7238..8bb92f9d2bb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSchemaNode; @@ -2562,6 +2563,23 @@ public class ConfigManager implements IManager { : new TThrottleQuotaResp(status); } + @Override + public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) { + final TSStatus status = confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + pipeManager + .getPipeRuntimeCoordinator() + .parseHeartbeat( + dataNodeId, + resp.getPipeMetaList(), + resp.getPipeCompletedList(), + resp.getPipeRemainingEventCountList(), + resp.getPipeRemainingTimeList()); + return StatusUtils.OK; + } + @Override public DataSet registerAINode(TAINodeRegisterReq req) { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 657f068e674..88731538be2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; @@ -823,4 +824,6 @@ public interface IManager { /** Set space quota. */ TSStatus setSpaceQuota(TSetSpaceQuotaReq req); + + TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java index 0e90174a7f2..b115ba25b4a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.manager.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; @@ -35,7 +36,6 @@ import org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeConfigNodeRe import org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics; import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.exception.PipeException; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java index 3533b40158d..120ddb65f86 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; @@ -31,7 +32,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index b8ec6f773bd..21dbccd07a7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; @@ -1298,4 +1299,9 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac public TThrottleQuotaResp getThrottleQuota() { return configManager.getThrottleQuota(); } + + @Override + public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) { + return configManager.pushHeartbeat(dataNodeId, resp); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index ef0d4c6b4d9..1513d8ebdeb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; @@ -48,11 +50,14 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode; import org.apache.iotdb.db.schemaengine.SchemaEngine; @@ -63,10 +68,10 @@ import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.SystemMetric; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.collect.ImmutableMap; import org.apache.thrift.TException; @@ -306,13 +311,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { @Override protected void thawRate(final String pipeName, final long creationTime) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime); + PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" + creationTime); } @Override protected void freezeRate(final String pipeName, final long creationTime) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .freezeRate(pipeName + "_" + creationTime); + PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime); } @Override @@ -323,7 +327,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId); + PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); return true; } @@ -351,7 +355,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final long creationTime = pipeMeta.getStaticMeta().getCreationTime(); final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId); + PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); // When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the // subscribed pipe, so the subscription needs to be manually marked as completed. if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) { @@ -445,7 +449,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; final Pair<Long, Double> remainingEventAndTime = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime()); pipeCompletedList.add(isCompleted); pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); @@ -475,7 +479,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { protected void collectPipeMetaListInternal( final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { // Do nothing if data node is removing or removed, or request does not need pipe meta list - if (PipeDataNodeAgent.runtime().isShutdown()) { + if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != Long.MIN_VALUE) { return; } LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId); @@ -528,7 +532,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; final Pair<Long, Double> remainingEventAndTime = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime()); pipeCompletedList.add(isCompleted); pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); @@ -817,6 +821,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } } + public void persistAllProgressIndex() { + try (final ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to some API server + final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(); + collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp); + final TSStatus result = + configNodeClient.pushHeartbeat( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { + LOGGER.warn("Failed to persist progress index to configNode, status: {}", result); + } + } catch (final Exception e) { + LOGGER.warn(e.getMessage()); + } + } + ///////////////////////// Pipe Consensus ///////////////////////// public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index c8a0acf32fa..d39feb96fe0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -31,7 +31,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -140,7 +140,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { } else if (event instanceof TsFileInsertionEvent) { pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); PipeProcessorMetrics.getInstance().markTsFileEvent(taskID); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .markTsFileCollectInvocationCount( pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount()); } else if (event instanceof PipeHeartbeatEvent) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 19c3b0ca996..ad29b285442 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.event.Event; @@ -83,7 +83,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseHeartbeatEventCount(pipeName, creationTime); } return true; @@ -94,7 +94,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { // PipeName == null indicates that the event is the raw event at disruptor, // not the event copied and passed to the extractor if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseHeartbeatEventCount(pipeName, creationTime); if (shouldPrintMessage && LOGGER.isDebugEnabled()) { LOGGER.debug(this.toString()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index e7814916e5d..80ac86310b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -28,7 +28,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; @@ -160,7 +160,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent try { PipeDataNodeResourceManager.wal().pin(walEntryHandler); if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseInsertNodeEventCount(pipeName, creationTime); PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, ramBytesUsed()); } @@ -196,7 +196,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } finally { if (Objects.nonNull(pipeName)) { PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, ramBytesUsed()); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseInsertNodeEventCount(pipeName, creationTime, System.nanoTime() - extractTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 95a532708e0..9f3ad7a1bdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -28,7 +28,7 @@ import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.Pi import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; @@ -130,7 +130,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent allocatedMemoryBlock, PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + INSTANCE_SIZE); if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseRawTabletEventCount(pipeName, creationTime); } return true; @@ -139,7 +139,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseRawTabletEventCount(pipeName, creationTime); } allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 3aa9b4a6b13..41931131eb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; @@ -266,7 +266,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent return false; } finally { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .increaseTsFileEventCount(pipeName, creationTime); } } @@ -290,7 +290,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent return false; } finally { if (Objects.nonNull(pipeName)) { - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .decreaseTsFileEventCount(pipeName, creationTime, System.nanoTime() - extractTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 808df12dfac..aa2f849f56f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -34,7 +34,7 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -358,7 +358,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { // register metric after generating taskID PipeDataRegionExtractorMetrics.getInstance().register(this); PipeTsFileToTabletsMetrics.getInstance().register(this); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this); + PipeDataNodeSinglePipeMetrics.getInstance().register(this); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index c9d16163fc8..500a195e600 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -31,7 +31,7 @@ import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; @@ -239,8 +239,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio private boolean mayRemainingInsertNodeEventExceedLimit(final PipeRealtimeEvent event) { final boolean mayRemainingInsertEventExceedLimit = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .mayRemainingInsertEventExceedLimit(pipeID); + PipeDataNodeSinglePipeMetrics.getInstance().mayRemainingInsertEventExceedLimit(pipeID); if (mayRemainingInsertEventExceedLimit && event.mayExtractorUseTablets(this)) { logByLogManager( l -> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java index b1153071bab..92fc23775e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionExtractorMetrics; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -75,7 +75,7 @@ public class IoTDBSchemaRegionExtractor extends IoTDBNonDataRegionExtractor { listenedTypeSet = SchemaRegionListeningFilter.parseListeningPlanTypeSet(parameters); PipeSchemaRegionExtractorMetrics.getInstance().register(this); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this); + PipeDataNodeSinglePipeMetrics.getInstance().register(this); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java index 3f03ce580fd..dd3873feaf4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.pipe.metric; import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; @@ -53,7 +53,7 @@ public class PipeDataNodeMetrics implements IMetricSet { PipeSchemaRegionListenerMetrics.getInstance().bindTo(metricService); PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService); PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService); + PipeDataNodeSinglePipeMetrics.getInstance().bindTo(metricService); PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService); PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService); } @@ -71,7 +71,7 @@ public class PipeDataNodeMetrics implements IMetricSet { PipeSchemaRegionListenerMetrics.getInstance().unbindFrom(metricService); PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService); PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService); + PipeDataNodeSinglePipeMetrics.getInstance().unbindFrom(metricService); PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService); PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java index 86368acf353..73d58285345 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java @@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { +public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { // Calculate from schema region extractors directly for it requires less computation private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors = @@ -107,6 +107,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { return insertNodeEventCountEMA.insertNodeEMAValue; } + public long getRemainingNonHeartbeatEvents() { + final long remainingEvents = + tsfileEventCount.get() + + rawTabletEventCount.get() + + insertNodeEventCount.get() + + schemaRegionExtractors.stream() + .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount) + .reduce(Long::sum) + .orElse(0L); + + // There are cases where the indicator is negative. For example, after the Pipe is restarted, + // the Processor SubTask is still collecting Events, resulting in a negative count. This + // situation cannot be avoided because the Pipe may be restarted internally. + return remainingEvents >= 0 ? remainingEvents : 0; + } + long getRemainingEvents() { final long remainingEvents = tsfileEventCount.get() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java similarity index 96% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 354a980edfd..677d758a162 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -42,15 +42,14 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet { +public class PipeDataNodeSinglePipeMetrics implements IMetricSet { - private static final Logger LOGGER = - LoggerFactory.getLogger(PipeDataNodeRemainingEventAndTimeMetrics.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeSinglePipeMetrics.class); @SuppressWarnings("java:S3077") private volatile AbstractMetricService metricService; - private final Map<String, PipeDataNodeRemainingEventAndTimeOperator> + public final Map<String, PipeDataNodeRemainingEventAndTimeOperator> remainingEventAndTimeOperatorMap = new ConcurrentHashMap<>(); private static Histogram PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM = @@ -381,19 +380,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet { private static class PipeDataNodeRemainingEventAndTimeMetricsHolder { - private static final PipeDataNodeRemainingEventAndTimeMetrics INSTANCE = - new PipeDataNodeRemainingEventAndTimeMetrics(); + private static final PipeDataNodeSinglePipeMetrics INSTANCE = + new PipeDataNodeSinglePipeMetrics(); private PipeDataNodeRemainingEventAndTimeMetricsHolder() { // Empty constructor } } - public static PipeDataNodeRemainingEventAndTimeMetrics getInstance() { + public static PipeDataNodeSinglePipeMetrics getInstance() { return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE; } - private PipeDataNodeRemainingEventAndTimeMetrics() { + private PipeDataNodeSinglePipeMetrics() { PipeEventCommitManager.getInstance().setCommitRateMarker(this::markRegionCommit); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 979c18320ca..2825eefaf4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; @@ -1271,6 +1272,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie () -> client.getThrottleQuota(), resp -> !updateConfigNodeLeader(resp.status)); } + @Override + public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) + throws TException { + return executeRemoteCallWithRetry( + () -> client.pushHeartbeat(dataNodeId, resp), status -> !updateConfigNodeLeader(status)); + } + public static class Factory extends ThriftClientFactory<ConfigRegionId, ConfigNodeClient> { public Factory( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 9eb5446f0cb..ec1c3686f24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TLoadSample; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSender; import org.apache.iotdb.common.rpc.thrift.TServiceType; @@ -211,7 +212,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java index 687a438f9b3..16ee005a089 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; -import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; @@ -97,7 +97,7 @@ public class ShowPipeTask implements IConfigTask { if (remainingEventCount == -1 && remainingTime == -1) { final Pair<Long, Double> remainingEventAndTime = - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(tPipeInfo.getId(), tPipeInfo.getCreationTime()); remainingEventCount = remainingEventAndTime.getLeft(); remainingTime = remainingEventAndTime.getRight(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 13b87f5bb83..8dd3d39c92c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -25,10 +25,14 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -42,6 +46,8 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + public class DataNodeShutdownHook extends Thread { private static final Logger logger = LoggerFactory.getLogger(DataNodeShutdownHook.class); @@ -86,6 +92,37 @@ public class DataNodeShutdownHook extends Thread { triggerSnapshotForAllDataRegion(); } + long startTime = System.currentTimeMillis(); + if (PipeDataNodeAgent.task().getPipeCount() != 0) { + for (Map.Entry<String, PipeDataNodeRemainingEventAndTimeOperator> entry : + PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet()) { + boolean timeout = false; + while (true) { + if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) { + logger.info( + "Successfully waited for pipe {} to finish.", entry.getValue().getPipeName()); + break; + } + if (System.currentTimeMillis() - startTime + > PipeConfig.getInstance().getPipeMaxWaitFinishTime()) { + timeout = true; + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.info("Interrupted when waiting for pipe to finish"); + } + } + if (timeout) { + logger.info("Timed out when waiting for pipes to finish, will break"); + break; + } + } + } + // Persist progress index before shutdown to accurate recovery after restart + PipeDataNodeAgent.task().persistAllProgressIndex(); // Shutdown pipe progressIndex background service PipePeriodicalJobExecutor.shutdownBackgroundService(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 038581bf1f5..97312b61400 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -232,6 +232,8 @@ public class CommonConfig { private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20; private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE; + private long pipeMaxWaitFinishTime = 10 * 1000; + private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; // 50B private int pipeExtractorMatcherCacheSize = 1024; @@ -1380,6 +1382,18 @@ public class CommonConfig { pipeSubtaskExecutorForcedRestartIntervalMs); } + public long getPipeMaxWaitFinishTime() { + return pipeMaxWaitFinishTime; + } + + public void setPipeMaxWaitFinishTime(long pipeMaxWaitFinishTime) { + if (this.pipeMaxWaitFinishTime == pipeMaxWaitFinishTime) { + return; + } + this.pipeMaxWaitFinishTime = pipeMaxWaitFinishTime; + logger.info("pipeMaxWaitFinishTime is set to {}.", pipeMaxWaitFinishTime); + } + public int getPipeRealTimeQueuePollTsFileThreshold() { return pipeRealTimeQueuePollTsFileThreshold; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 353da8c84e4..a88232161c8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; @@ -35,7 +36,6 @@ import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index be0c70d7f42..94a65c01cf0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -147,6 +147,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs(); } + public long getPipeMaxWaitFinishTime() { + return COMMON_CONFIG.getPipeMaxWaitFinishTime(); + } + /////////////////////////////// Extractor /////////////////////////////// public int getPipeExtractorAssignerDisruptorRingBufferSize() { @@ -515,6 +519,7 @@ public class PipeConfig { LOGGER.info( "PipeSubtaskExecutorForcedRestartIntervalMs: {}", getPipeSubtaskExecutorForcedRestartIntervalMs()); + LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime()); LOGGER.info( "PipeExtractorAssignerDisruptorRingBufferSize: {}", diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 7086ff731bf..8bc3c384b01 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -591,6 +591,11 @@ public class PipeDescriptor { "pipe_threshold_allocation_strategy_high_usage_threshold", String.valueOf( config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold())))); + + config.setPipeMaxWaitFinishTime( + Long.parseLong( + properties.getProperty( + "pipe_max_wait_finish_time", String.valueOf(config.getPipeMaxWaitFinishTime())))); } public static void loadPipeExternalConfig( diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index aed1b5f88f5..73aef5c4c59 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -196,6 +196,13 @@ struct TSetThrottleQuotaReq { 2: required TThrottleQuota throttleQuota } +struct TPipeHeartbeatResp { + 1: required list<binary> pipeMetaList + 2: optional list<bool> pipeCompletedList + 3: optional list<i64> pipeRemainingEventCountList + 4: optional list<double> pipeRemainingTimeList +} + struct TLicense { 1: required i64 licenseIssueTimestamp 2: required i64 expireTimestamp diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 4642f3e20af..19e8e2a7d1f 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -1771,5 +1771,8 @@ service IConfigNodeRPCService { /** Get throttle quota information */ TThrottleQuotaResp getThrottleQuota() + + /** Push heartbeat in shutdown */ + common.TSStatus pushHeartbeat(i32 dataNodeId, common.TPipeHeartbeatResp resp) } diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index d58e1eb1591..f7a9c4c3854 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -313,13 +313,6 @@ struct TPipeHeartbeatReq { 1: required i64 heartbeatId } -struct TPipeHeartbeatResp { - 1: required list<binary> pipeMetaList - 2: optional list<bool> pipeCompletedList - 3: optional list<i64> pipeRemainingEventCountList - 4: optional list<double> pipeRemainingTimeList -} - enum TSchemaLimitLevel{ DEVICE, TIMESERIES @@ -1041,7 +1034,7 @@ service IDataNodeRPCService { /** * ConfigNode will ask DataNode for pipe meta in every few seconds **/ - TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) + common.TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) /** * Execute CQ on DataNode
