This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-insertnode-oom in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8d46869e15b37f40e85f7ced193c982e8566f27a Author: Caideyipi <[email protected]> AuthorDate: Thu Nov 21 15:45:59 2024 +0800 Pipe: Reduce the memory usage & Enable insertNode memory control for stream mode degrade and stuck restart (#14102) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 3cb544e39b250b56bdc8d4c4e3ff0b50c2922e5f) --- .../pipe/runtime/PipeHandleMetaChangePlan.java | 8 +- .../response/pipe/task/PipeTableResp.java | 5 +- .../pipe/agent/task/PipeConfigNodeTaskAgent.java | 4 +- .../runtime/heartbeat/PipeHeartbeat.java | 3 +- .../runtime/heartbeat/PipeHeartbeatParser.java | 5 +- .../manager/pipe/metric/PipeConfigNodeMetrics.java | 4 +- ... => PipeTemporaryMetaInCoordinatorMetrics.java} | 29 +++--- .../confignode/persistence/pipe/PipeInfo.java | 16 +-- .../confignode/persistence/pipe/PipeTaskInfo.java | 4 +- .../db/pipe/agent/runtime/PipeAgentLauncher.java | 2 +- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 30 ++++-- .../async/IoTDBDataRegionAsyncConnector.java | 2 +- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 30 +++++- .../common/tablet/PipeRawTabletInsertionEvent.java | 6 +- .../PipeRealtimeDataRegionHybridExtractor.java | 12 ++- .../impl/DataNodeInternalRPCServiceImpl.java | 17 ++-- .../config/executor/ClusterConfigTaskExecutor.java | 6 +- .../dataregion/wal/utils/WALEntryHandler.java | 12 +-- .../TsFileResourceProgressIndexTest.java | 5 + .../commons/consensus/index/ProgressIndex.java | 12 ++- .../consensus/index/impl/HybridProgressIndex.java | 36 ++++--- .../consensus/index/impl/IoTProgressIndex.java | 12 +++ .../consensus/index/impl/MetaProgressIndex.java | 9 +- .../consensus/index/impl/MinimumProgressIndex.java | 5 + .../consensus/index/impl/RecoverProgressIndex.java | 13 +++ .../consensus/index/impl/SimpleProgressIndex.java | 9 +- .../consensus/index/impl/StateProgressIndex.java | 17 +++- .../index/impl/TimeWindowStateProgressIndex.java | 24 +++++ .../commons/pipe/agent/task/PipeTaskAgent.java | 49 ++++++++++ .../commons/pipe/agent/task/meta/PipeMeta.java | 17 +++- .../pipe/agent/task/meta/PipeTemporaryMeta.java | 73 +------------- .../agent/task/meta/PipeTemporaryMetaInAgent.java | 107 +++++++++++++++++++++ ...ta.java => PipeTemporaryMetaInCoordinator.java} | 5 +- .../pipe/agent/task/progress/CommitterKey.java | 2 +- .../task/progress/PipeEventCommitManager.java | 12 ++- .../iotdb/commons/pipe/event/EnrichedEvent.java | 6 -- .../iotdb/commons/pipe/task/PipeMetaDeSerTest.java | 19 ++-- 37 files changed, 448 insertions(+), 179 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java index 07d70e92db4..74a3cf37c94 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java @@ -48,7 +48,7 @@ public class PipeHandleMetaChangePlan extends ConfigPhysicalPlan { } @Override - protected void serializeImpl(DataOutputStream stream) throws IOException { + protected void serializeImpl(final DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); stream.writeInt(pipeMetaList.size()); @@ -58,16 +58,16 @@ public class PipeHandleMetaChangePlan extends ConfigPhysicalPlan { } @Override - protected void deserializeImpl(ByteBuffer buffer) throws IOException { + protected void deserializeImpl(final ByteBuffer buffer) throws IOException { int size = buffer.getInt(); for (int i = 0; i < size; i++) { - PipeMeta pipeMeta = PipeMeta.deserialize(buffer); + PipeMeta pipeMeta = PipeMeta.deserialize4Coordinator(buffer); pipeMetaList.add(pipeMeta); } } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) { return true; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java index ef2f9bea26b..d36478ee31c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; @@ -177,7 +177,8 @@ public class PipeTableResp implements DataSet { staticMeta.getProcessorParameters().toString(), staticMeta.getConnectorParameters().toString(), exceptionMessageBuilder.toString()); - final PipeTemporaryMeta temporaryMeta = pipeMeta.getTemporaryMeta(); + final PipeTemporaryMetaInCoordinator temporaryMeta = + (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta(); final boolean canCalculateOnLocal = canCalculateOnLocal(pipeMeta); showPipeInfo.setRemainingEventCount( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java index d5204b1c99b..2451e055c72 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java @@ -121,7 +121,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent { final PipeMeta pipeMetaFromCoordinator) { try { return PipeConfigNodeAgent.runtime().isLeaderReady() - ? super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy()) + ? super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy4TaskAgent()) : null; } catch (final Exception e) { return new TPushPipeMetaRespExceptionMessage( @@ -152,7 +152,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent { .map( pipeMeta -> { try { - return pipeMeta.deepCopy(); + return pipeMeta.deepCopy4TaskAgent(); } catch (Exception e) { throw new PipeException("failed to deep copy pipeMeta", e); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java index 9c909149896..02ed8cca2fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java @@ -42,7 +42,8 @@ public class PipeHeartbeat { /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent, /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) { for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) { - final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i)); + final PipeMeta pipeMeta = + PipeMeta.deserialize4TaskAgent(pipeMetaByteBufferListFromAgent.get(i)); pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta); isCompletedMap.put( pipeMeta.getStaticMeta(), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 16e2e6a5b73..e2303fecdea 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -28,7 +28,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; import org.apache.iotdb.confignode.manager.ConfigManager; @@ -144,7 +144,8 @@ public class PipeHeartbeatParser { continue; } - final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta(); + final PipeTemporaryMetaInCoordinator temporaryMeta = + (PipeTemporaryMetaInCoordinator) pipeMetaFromCoordinator.getTemporaryMeta(); // Remove completed pipes final Boolean isPipeCompletedFromAgent = pipeHeartbeat.isCompleted(staticMeta); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java index a7a41f4f359..18b962eab89 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java @@ -41,7 +41,7 @@ public class PipeConfigNodeMetrics implements IMetricSet { PipeConfigRegionExtractorMetrics.getInstance().bindTo(metricService); PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService); PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService); - PipeTemporaryMetaMetrics.getInstance().bindTo(metricService); + PipeTemporaryMetaInCoordinatorMetrics.getInstance().bindTo(metricService); PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService); } @@ -53,7 +53,7 @@ public class PipeConfigNodeMetrics implements IMetricSet { PipeConfigRegionExtractorMetrics.getInstance().unbindFrom(metricService); PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService); PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService); - PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService); + PipeTemporaryMetaInCoordinatorMetrics.getInstance().unbindFrom(metricService); PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java similarity index 82% rename from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java rename to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java index 2296a53d91e..cc46a1a015e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java @@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.metric; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.metrics.AbstractMetricService; @@ -39,17 +40,19 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** - * The {@link PipeTemporaryMetaMetrics} is to calculate the pipe-statistics from the {@link - * PipeTemporaryMeta}. The class is lock-free and can only read from the thread-safe variables from - * the {@link PipeTemporaryMeta}. + * The {@link PipeTemporaryMetaInCoordinatorMetrics} is to calculate the pipe-statistics from the + * {@link PipeTemporaryMeta}. The class is lock-free and can only read from the thread-safe + * variables from the {@link PipeTemporaryMeta}. */ -public class PipeTemporaryMetaMetrics implements IMetricSet { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeTemporaryMetaMetrics.class); +public class PipeTemporaryMetaInCoordinatorMetrics implements IMetricSet { + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeTemporaryMetaInCoordinatorMetrics.class); @SuppressWarnings("java:S3077") private volatile AbstractMetricService metricService; - private final Map<String, PipeTemporaryMeta> pipeTemporaryMetaMap = new ConcurrentHashMap<>(); + private final Map<String, PipeTemporaryMetaInCoordinator> pipeTemporaryMetaMap = + new ConcurrentHashMap<>(); //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @@ -64,13 +67,13 @@ public class PipeTemporaryMetaMetrics implements IMetricSet { } private void createAutoGauge(final String pipeID) { - final PipeTemporaryMeta pipeTemporaryMeta = pipeTemporaryMetaMap.get(pipeID); + final PipeTemporaryMetaInCoordinator pipeTemporaryMeta = pipeTemporaryMetaMap.get(pipeID); final String[] pipeNameAndCreationTime = pipeID.split("_"); metricService.createAutoGauge( Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(), MetricLevel.IMPORTANT, pipeTemporaryMeta, - PipeTemporaryMeta::getGlobalRemainingEvents, + PipeTemporaryMetaInCoordinator::getGlobalRemainingEvents, Tag.NAME.toString(), pipeNameAndCreationTime[0], Tag.CREATION_TIME.toString(), @@ -79,7 +82,7 @@ public class PipeTemporaryMetaMetrics implements IMetricSet { Metric.PIPE_GLOBAL_REMAINING_TIME.toString(), MetricLevel.IMPORTANT, pipeTemporaryMeta, - PipeTemporaryMeta::getGlobalRemainingTime, + PipeTemporaryMetaInCoordinator::getGlobalRemainingTime, Tag.NAME.toString(), pipeNameAndCreationTime[0], Tag.CREATION_TIME.toString(), @@ -123,7 +126,8 @@ public class PipeTemporaryMetaMetrics implements IMetricSet { public void register(final PipeMeta pipeMeta) { final String taskID = pipeMeta.getStaticMeta().getPipeName() + "_" + pipeMeta.getStaticMeta().getCreationTime(); - pipeTemporaryMetaMap.putIfAbsent(taskID, pipeMeta.getTemporaryMeta()); + pipeTemporaryMetaMap.putIfAbsent( + taskID, (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta()); if (Objects.nonNull(metricService)) { createMetrics(taskID); } @@ -163,14 +167,15 @@ public class PipeTemporaryMetaMetrics implements IMetricSet { private static class PipeTemporaryMetaMetricsHolder { - private static final PipeTemporaryMetaMetrics INSTANCE = new PipeTemporaryMetaMetrics(); + private static final PipeTemporaryMetaInCoordinatorMetrics INSTANCE = + new PipeTemporaryMetaInCoordinatorMetrics(); private PipeTemporaryMetaMetricsHolder() { // Empty constructor } } - public static PipeTemporaryMetaMetrics getInstance() { + public static PipeTemporaryMetaInCoordinatorMetrics getInstance() { return PipeTemporaryMetaMetricsHolder.INSTANCE; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java index 48a9d705dc4..f1f15b02528 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java @@ -33,7 +33,7 @@ import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; import org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener; import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask; import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeTaskAgent; -import org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaMetrics; +import org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaInCoordinatorMetrics; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -94,7 +94,7 @@ public class PipeInfo implements SnapshotProcessor { throw new PipeException("Failed to increase listener reference", e); } }); - PipeTemporaryMetaMetrics.getInstance() + PipeTemporaryMetaInCoordinatorMetrics.getInstance() .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } else { @@ -114,7 +114,7 @@ public class PipeInfo implements SnapshotProcessor { PipeConfigNodeAgent.task() .handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName())); - PipeTemporaryMetaMetrics.getInstance() + PipeTemporaryMetaInCoordinatorMetrics.getInstance() .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (final Exception e) { @@ -143,7 +143,7 @@ public class PipeInfo implements SnapshotProcessor { throw new PipeException("Failed to decrease listener reference", e); } }); - PipeTemporaryMetaMetrics.getInstance() + PipeTemporaryMetaInCoordinatorMetrics.getInstance() .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } else { @@ -181,7 +181,7 @@ public class PipeInfo implements SnapshotProcessor { throw new PipeException("Failed to decrease listener reference", e); } }); - PipeTemporaryMetaMetrics.getInstance() + PipeTemporaryMetaInCoordinatorMetrics.getInstance() .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } else { @@ -206,7 +206,7 @@ public class PipeInfo implements SnapshotProcessor { public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plans) { try { final TSStatus status = pipeTaskInfo.operateMultiplePipes(plans); - PipeTemporaryMetaMetrics.getInstance() + PipeTemporaryMetaInCoordinatorMetrics.getInstance() .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); return status; } catch (final Exception e) { @@ -225,7 +225,7 @@ public class PipeInfo implements SnapshotProcessor { pipeMetaListFromCoordinator.add(pipeMeta); } PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator); - PipeTemporaryMetaMetrics.getInstance() + PipeTemporaryMetaInCoordinatorMetrics.getInstance() .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (final Exception e) { @@ -245,7 +245,7 @@ public class PipeInfo implements SnapshotProcessor { pipeTaskInfo.getPipeMetaByPipeName(pipeMeta.getStaticMeta().getPipeName())); } PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator); - PipeTemporaryMetaMetrics.getInstance() + PipeTemporaryMetaInCoordinatorMetrics.getInstance() .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (final Exception e) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 936220f77b3..70d23a47cc7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; @@ -602,7 +603,8 @@ public class PipeTaskInfo implements SnapshotProcessor { .get(consensusGroupId.getId()) .setLeaderNodeId(newLeader); // New region leader may contain un-transferred events - pipeMeta.getTemporaryMeta().markDataNodeUncompleted(newLeader); + ((PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta()) + .markDataNodeUncompleted(newLeader); } else { consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java index 0b8fb380cd9..4cfb7185def 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java @@ -168,7 +168,7 @@ class PipeAgentLauncher { getAllPipeInfoResp.getAllPipeInfo().stream() .map( byteBuffer -> { - final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer); + final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer); LOGGER.info( "Pulled pipe meta from config node: {}, recovering ...", pipeMeta); return pipeMeta; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index f55da4be67b..4686e302eb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -530,15 +530,25 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } // Only restart the stream mode pipes for releasing memTables. - if (extractors.get(0).isStreamMode() - && extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles) - && (mayMemTablePinnedCountReachDangerousThreshold() - || mayWalSizeReachThrottleThreshold())) { - // Extractors of this pipe may be stuck and is pinning too many MemTables. - LOGGER.warn( - "Pipe {} needs to restart because too many memTables are pinned.", - pipeMeta.getStaticMeta()); - stuckPipes.add(pipeMeta); + if (extractors.get(0).isStreamMode()) { + if (extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles) + && (mayMemTablePinnedCountReachDangerousThreshold() + || mayWalSizeReachThrottleThreshold())) { + // Extractors of this pipe may be stuck and is pinning too many MemTables. + LOGGER.warn( + "Pipe {} needs to restart because too many memTables are pinned.", + pipeMeta.getStaticMeta()); + stuckPipes.add(pipeMeta); + } else if (getFloatingMemoryUsageInByte(pipeName) + >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() + - PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()) + / pipeMetaKeeper.getPipeMetaCount()) { + // Extractors of this pipe may have too many insert nodes + LOGGER.warn( + "Pipe {} needs to restart because too many insertNodes are extracted.", + pipeMeta.getStaticMeta()); + stuckPipes.add(pipeMeta); + } } } @@ -584,7 +594,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { acquireWriteLock(); try { final long startTime = System.currentTimeMillis(); - final PipeMeta originalPipeMeta = pipeMeta.deepCopy(); + final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent(); handleDropPipe(pipeMeta.getStaticMeta().getPipeName()); handleSinglePipeMetaChanges(originalPipeMeta); LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index f127b7d1734..7ced0ca87d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -389,7 +389,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { private void logOnClientException( final AsyncPipeDataTransferServiceClient client, final Exception e) { if (client == null) { - LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e); + LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT); } else { LOGGER.warn( String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(), client.getPort()), e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 4979a3d2777..7a3ad4b1a20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; @@ -33,11 +34,14 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryPosition; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; @@ -48,14 +52,22 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent - implements TabletInsertionEvent { + implements TabletInsertionEvent, Accountable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class); + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(PipeInsertNodeTabletInsertionEvent.class) + + RamUsageEstimator.shallowSizeOfInstance(WALEntryHandler.class) + + RamUsageEstimator.shallowSizeOfInstance(WALEntryPosition.class) + + RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class) + + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class); private final WALEntryHandler walEntryHandler; private final boolean isAligned; @@ -137,12 +149,13 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent if (Objects.nonNull(pipeName)) { PipeDataNodeRemainingEventAndTimeMetrics.getInstance() .increaseTabletEventCount(pipeName, creationTime); + PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, ramBytesUsed()); } return true; } catch (final Exception e) { LOGGER.warn( String.format( - "Increase reference count for memtable %d error. Holder Message: %s", + "Increase reference count for memTable %d error. Holder Message: %s", walEntryHandler.getMemTableId(), holderMessage), e); return false; @@ -168,6 +181,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent return false; } finally { if (Objects.nonNull(pipeName)) { + PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, ramBytesUsed()); PipeDataNodeRemainingEventAndTimeMetrics.getInstance() .decreaseTabletEventCount(pipeName, creationTime); } @@ -413,4 +427,16 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent + " - " + super.coreReportMessage(); } + + // Notes: + // 1. We only consider insertion event's memory for degrade and restart, because degrade/restart + // may not be of use for releasing other events' memory. + // 2. We do not consider eventParsers because they may not exist and if it is invoked, the event + // will soon be released. + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) : 0) + + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 24e3c7cd1d4..825bb543843 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -34,6 +34,7 @@ import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.record.Tablet; import java.util.Objects; @@ -41,6 +42,9 @@ import java.util.function.BiConsumer; public class PipeRawTabletInsertionEvent extends EnrichedEvent implements TabletInsertionEvent { + // For better calculation + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(PipeRawTabletInsertionEvent.class); private Tablet tablet; private String deviceId; // Only used when the tablet is released. private final boolean isAligned; @@ -115,7 +119,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet allocatedMemoryBlock = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet)); + PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + INSTANCE_SIZE); if (Objects.nonNull(pipeName)) { PipeDataNodeRemainingEventAndTimeMetrics.getInstance() .increaseTabletEventCount(pipeName, creationTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index ced920f378c..4db9249d73c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -214,7 +214,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio || mayMemTablePinnedCountReachDangerousThreshold() || isHistoricalTsFileEventCountExceededLimit() || isRealtimeTsFileEventCountExceededLimit() - || mayTsFileLinkedCountReachDangerousThreshold(); + || mayTsFileLinkedCountReachDangerousThreshold() + || mayInsertNodeMemoryReachDangerousThreshold(); } private boolean mayWalSizeReachThrottleThreshold() { @@ -245,6 +246,15 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); } + private boolean mayInsertNodeMemoryReachDangerousThreshold() { + return 3 + * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName) + * PipeDataNodeAgent.task().getPipeCount() + >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() + - PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()) + * 2; + } + @Override public Event supply() { PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index dcb2b852c36..467de8e3e80 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1055,13 +1055,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TPushPipeMetaResp pushPipeMeta(TPushPipeMetaReq req) { - final List<PipeMeta> pipeMetas = new ArrayList<>(); - for (ByteBuffer byteBuffer : req.getPipeMetas()) { - pipeMetas.add(PipeMeta.deserialize(byteBuffer)); - } try { - List<TPushPipeMetaRespExceptionMessage> exceptionMessages = - PipeDataNodeAgent.task().handlePipeMetaChanges(pipeMetas); + final List<TPushPipeMetaRespExceptionMessage> exceptionMessages = + PipeDataNodeAgent.task() + .handlePipeMetaChanges( + req.getPipeMetas().stream() + .map(PipeMeta::deserialize4TaskAgent) + .collect(Collectors.toList())); return exceptionMessages.isEmpty() ? new TPushPipeMetaResp() @@ -1083,7 +1083,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface if (req.isSetPipeNameToDrop()) { exceptionMessage = PipeDataNodeAgent.task().handleDropPipe(req.getPipeNameToDrop()); } else if (req.isSetPipeMeta()) { - final PipeMeta pipeMeta = PipeMeta.deserialize(ByteBuffer.wrap(req.getPipeMeta())); + final PipeMeta pipeMeta = + PipeMeta.deserialize4TaskAgent(ByteBuffer.wrap(req.getPipeMeta())); exceptionMessage = PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta); } else { throw new Exception("Invalid TPushSinglePipeMetaReq"); @@ -1120,7 +1121,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface } } else if (req.isSetPipeMetas()) { for (ByteBuffer byteBuffer : req.getPipeMetas()) { - final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer); + final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer); TPushPipeMetaRespExceptionMessage message = PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta); exceptionMessages.add(message); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d59d9deb55e..618a9cd53dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -1827,7 +1827,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { pipeMetaFromCoordinator = getAllPipeInfoResp.getAllPipeInfo().stream() - .map(PipeMeta::deserialize) + .map(PipeMeta::deserialize4Coordinator) .filter( pipeMeta -> pipeMeta @@ -1846,7 +1846,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode())); return future; } - } catch (Exception e) { + } catch (final Exception e) { final String exceptionMessage = String.format( "Failed to alter pipe %s, because %s", @@ -1906,7 +1906,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute()); } } - } catch (Exception e) { + } catch (final Exception e) { LOGGER.info("Failed to validate alter pipe statement, because {}", e.getMessage(), e); future.setException( new IoTDBException(e.getMessage(), TSStatusCode.PIPE_ERROR.getStatusCode())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java index ea9570033ae..edc67edccfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java @@ -58,7 +58,7 @@ public class WALEntryHandler { private volatile boolean isHardlink = false; private final AtomicReference<File> hardlinkFile = new AtomicReference<>(); - public WALEntryHandler(WALEntryValue value) { + public WALEntryHandler(final WALEntryValue value) { this.value = value; } @@ -105,7 +105,7 @@ public class WALEntryHandler { */ public InsertNode getInsertNode() throws WALPipeException { // return local cache - WALEntryValue res = value; + final WALEntryValue res = value; if (res != null) { if (res instanceof InsertNode) { return (InsertNode) res; @@ -120,7 +120,7 @@ public class WALEntryHandler { synchronized (this) { this.wait(); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { logger.warn("Interrupted when waiting for result.", e); Thread.currentThread().interrupt(); } @@ -179,7 +179,7 @@ public class WALEntryHandler { } } - public void setWalNode(WALNode walNode, long memTableId) { + public void setWalNode(final WALNode walNode, final long memTableId) { this.walNode = walNode; this.memTableId = memTableId; walEntryPosition.setWalNode(walNode, memTableId); @@ -189,7 +189,7 @@ public class WALEntryHandler { return memTableId; } - public void setEntryPosition(long walFileVersionId, long position) { + public void setEntryPosition(final long walFileVersionId, final long position) { this.walEntryPosition.setEntryPosition(walFileVersionId, position, value); this.value = null; synchronized (this) { @@ -205,7 +205,7 @@ public class WALEntryHandler { return walEntryPosition.getSize(); } - public void setSize(int size) { + public void setSize(final int size) { this.walEntryPosition.setSize(size); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java index e82ac0e08e5..33bf80115a3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java @@ -224,6 +224,11 @@ public class TsFileResourceProgressIndexTest { public TotalOrderSumTuple getTotalOrderSumTuple() { return new TotalOrderSumTuple((long) val); } + + @Override + public long ramBytesUsed() { + return 0; + } } @Test diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java index 43f934c5b9b..b54d6db4dab 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; import javax.annotation.Nonnull; @@ -32,6 +34,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -53,7 +56,14 @@ import java.util.stream.LongStream; * immutability contract. This prevents unintended modifications to the underlying mutable state * from affecting other parts of the program. */ -public abstract class ProgressIndex { +public abstract class ProgressIndex implements Accountable { + + protected static final long LOCK_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.class) + + RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.ReadLock.class) + + RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.WriteLock.class) + + ((long) RamUsageEstimator.NUM_BYTES_OBJECT_HEADER << 1) + + 64; /** Serialize this progress index to the given byte buffer. */ public abstract void serialize(ByteBuffer byteBuffer); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java index ac0e50b317c..b27c2666cc9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import com.google.common.collect.ImmutableMap; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nonnull; @@ -40,6 +41,11 @@ import java.util.stream.Collectors; public class HybridProgressIndex extends ProgressIndex { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class) + ProgressIndex.LOCK_SIZE; + private static final long ENTRY_SIZE = + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + + RamUsageEstimator.alignObjectSize(Short.BYTES); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Map<Short, ProgressIndex> type2Index; @@ -48,11 +54,11 @@ public class HybridProgressIndex extends ProgressIndex { this(Collections.emptyMap()); } - public HybridProgressIndex(ProgressIndex progressIndex) { + public HybridProgressIndex(final ProgressIndex progressIndex) { this(Collections.singletonMap(progressIndex.getType().getType(), progressIndex)); } - private HybridProgressIndex(Map<Short, ProgressIndex> type2Index) { + private HybridProgressIndex(final Map<Short, ProgressIndex> type2Index) { this.type2Index = new HashMap<>(type2Index); } @@ -61,7 +67,7 @@ public class HybridProgressIndex extends ProgressIndex { } @Override - public void serialize(ByteBuffer byteBuffer) { + public void serialize(final ByteBuffer byteBuffer) { lock.readLock().lock(); try { ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(byteBuffer); @@ -77,7 +83,7 @@ public class HybridProgressIndex extends ProgressIndex { } @Override - public void serialize(OutputStream stream) throws IOException { + public void serialize(final OutputStream stream) throws IOException { lock.readLock().lock(); try { ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(stream); @@ -93,7 +99,7 @@ public class HybridProgressIndex extends ProgressIndex { } @Override - public boolean isAfter(@Nonnull ProgressIndex progressIndex) { + public boolean isAfter(@Nonnull final ProgressIndex progressIndex) { lock.readLock().lock(); try { if (progressIndex instanceof MinimumProgressIndex) { @@ -121,14 +127,14 @@ public class HybridProgressIndex extends ProgressIndex { } } - public boolean isGivenProgressIndexAfterSelf(ProgressIndex progressIndex) { + public boolean isGivenProgressIndexAfterSelf(final ProgressIndex progressIndex) { return type2Index.size() == 1 && type2Index.containsKey(progressIndex.getType().getType()) && progressIndex.isAfter(type2Index.get(progressIndex.getType().getType())); } @Override - public boolean equals(ProgressIndex progressIndex) { + public boolean equals(final ProgressIndex progressIndex) { lock.readLock().lock(); try { if (!(progressIndex instanceof HybridProgressIndex)) { @@ -152,7 +158,7 @@ public class HybridProgressIndex extends ProgressIndex { } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (obj == null) { return false; } @@ -171,7 +177,8 @@ public class HybridProgressIndex extends ProgressIndex { } @Override - public ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) { + public ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex( + final ProgressIndex progressIndex) { lock.writeLock().lock(); try { if (progressIndex == null || progressIndex instanceof MinimumProgressIndex) { @@ -229,7 +236,7 @@ public class HybridProgressIndex extends ProgressIndex { } } - public static HybridProgressIndex deserializeFrom(ByteBuffer byteBuffer) { + public static HybridProgressIndex deserializeFrom(final ByteBuffer byteBuffer) { final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex(); final int size = ReadWriteIOUtils.readInt(byteBuffer); for (int i = 0; i < size; i++) { @@ -240,7 +247,7 @@ public class HybridProgressIndex extends ProgressIndex { return hybridProgressIndex; } - public static HybridProgressIndex deserializeFrom(InputStream stream) throws IOException { + public static HybridProgressIndex deserializeFrom(final InputStream stream) throws IOException { final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex(); final int size = ReadWriteIOUtils.readInt(stream); for (int i = 0; i < size; i++) { @@ -255,4 +262,11 @@ public class HybridProgressIndex extends ProgressIndex { public String toString() { return "HybridProgressIndex{" + "type2Index=" + type2Index + '}'; } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + type2Index.size() * ENTRY_SIZE + + type2Index.values().stream().map(ProgressIndex::ramBytesUsed).reduce(0L, Long::sum); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java index 5c7ffb2cc81..8b02d85da5a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nonnull; @@ -37,6 +38,12 @@ import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; public class IoTProgressIndex extends ProgressIndex { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(IoTProgressIndex.class) + ProgressIndex.LOCK_SIZE; + + // We assume that the integers are all cached, while the longs are all not + private static final long ENTRY_SIZE = + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + Long.BYTES; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -232,4 +239,9 @@ public class IoTProgressIndex extends ProgressIndex { public String toString() { return "IoTProgressIndex{" + "peerId2SearchIndex=" + peerId2SearchIndex + '}'; } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + peerId2SearchIndex.size() * ENTRY_SIZE; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java index c181345e4ad..75322152d45 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nonnull; @@ -34,7 +35,8 @@ import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; public class MetaProgressIndex extends ProgressIndex { - + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(MetaProgressIndex.class) + ProgressIndex.LOCK_SIZE; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final long index; @@ -173,4 +175,9 @@ public class MetaProgressIndex extends ProgressIndex { public String toString() { return "MetaProgressIndex{" + "index=" + index + '}'; } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java index e476409a30b..e22f82c9fbb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java @@ -99,4 +99,9 @@ public class MinimumProgressIndex extends ProgressIndex { public String toString() { return "MinimumProgressIndex{}"; } + + @Override + public long ramBytesUsed() { + return 0; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java index c2511222eab..5756594abeb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import com.google.common.collect.ImmutableMap; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nonnull; @@ -39,6 +40,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; public class RecoverProgressIndex extends ProgressIndex { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RecoverProgressIndex.class) + + +ProgressIndex.LOCK_SIZE; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -239,4 +243,13 @@ public class RecoverProgressIndex extends ProgressIndex { public String toString() { return "RecoverProgressIndex{" + "dataNodeId2LocalIndex=" + dataNodeId2LocalIndex + '}'; } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + dataNodeId2LocalIndex.size() * RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + + dataNodeId2LocalIndex.values().stream() + .map(SimpleProgressIndex::ramBytesUsed) + .reduce(0L, Long::sum); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java index 209fa21d321..26d37237256 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nonnull; @@ -34,7 +35,8 @@ import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; public class SimpleProgressIndex extends ProgressIndex { - + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SimpleProgressIndex.class) + ProgressIndex.LOCK_SIZE; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final int rebootTimes; @@ -203,4 +205,9 @@ public class SimpleProgressIndex extends ProgressIndex { + memtableFlushOrderId + '}'; } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java index 9eca0795e51..b6c51665e19 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import com.google.common.collect.ImmutableMap; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nonnull; @@ -44,7 +45,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * integrity and independence of the progress index instances. */ public class StateProgressIndex extends ProgressIndex { - + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(StateProgressIndex.class) + ProgressIndex.LOCK_SIZE; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final long version; @@ -52,7 +54,7 @@ public class StateProgressIndex extends ProgressIndex { private final ProgressIndex innerProgressIndex; public StateProgressIndex( - long version, Map<String, Binary> state, ProgressIndex innerProgressIndex) { + final long version, final Map<String, Binary> state, final ProgressIndex innerProgressIndex) { this.version = version; this.state = new HashMap<>(state); this.innerProgressIndex = innerProgressIndex; @@ -242,4 +244,15 @@ public class StateProgressIndex extends ProgressIndex { + innerProgressIndex + '}'; } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + innerProgressIndex.ramBytesUsed() + + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY * state.size() + + state.entrySet().stream() + .map( + entry -> RamUsageEstimator.sizeOf(entry.getKey()) + entry.getValue().ramBytesUsed()) + .reduce(0L, Long::sum); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java index 37d6b6fdf38..bb1b2c1ce5e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import com.google.common.collect.ImmutableMap; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nonnull; @@ -47,6 +48,13 @@ import java.util.stream.Collectors; */ public class TimeWindowStateProgressIndex extends ProgressIndex { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TimeWindowStateProgressIndex.class) + + ProgressIndex.LOCK_SIZE; + private static final long ENTRY_SIZE = + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + + RamUsageEstimator.shallowSizeOfInstance(Pair.class); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // Only the byteBuffer is nullable, the timeSeries, pair and timestamp must not be null @@ -292,4 +300,20 @@ public class TimeWindowStateProgressIndex extends ProgressIndex { + timeSeries2TimestampWindowBufferPairMap + "'}"; } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + timeSeries2TimestampWindowBufferPairMap.size() * ENTRY_SIZE + + timeSeries2TimestampWindowBufferPairMap.entrySet().stream() + .map( + entry -> + RamUsageEstimator.sizeOf(entry.getKey()) + + RamUsageEstimator.sizeOf(entry.getValue().getLeft()) + + (Objects.nonNull(entry.getValue().getRight()) + ? (RamUsageEstimator.shallowSizeOfInstance(ByteBuffer.class) + + RamUsageEstimator.sizeOf(entry.getValue().getRight().array())) + : 0)) + .reduce(0L, Long::sum); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 56069df8417..0a7e9f84aa9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -29,6 +29,9 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; @@ -79,6 +82,7 @@ public abstract class PipeTaskAgent { // Help PipeEndPointRateLimiter to check if the pipe is still alive PipeEndPointRateLimiter.setTaskAgent(this); + PipeEventCommitManager.getInstance().setTaskAgent(this); } ////////////////////////// PipeMeta Lock Control ////////////////////////// @@ -1020,8 +1024,53 @@ public abstract class PipeTaskAgent { protected abstract void collectPipeMetaListInternal( final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException; + ///////////////////////// Maintain meta info ///////////////////////// + public long getPipeCreationTime(final String pipeName) { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime(); } + + public String getPipeNameWithCreationTime(final String pipeName, final long creationTime) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + return pipeMeta == null + ? pipeName + "_" + creationTime + : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getPipeNameWithCreationTime(); + } + + public CommitterKey getCommitterKey( + final String pipeName, final long creationTime, final int regionId, final int restartTime) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + return pipeMeta == null + ? new CommitterKey(pipeName, creationTime, regionId, restartTime) + : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) + .getCommitterKey(pipeName, creationTime, regionId, restartTime); + } + + public long getFloatingMemoryUsageInByte(final String pipeName) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + return pipeMeta == null + ? 0 + : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getFloatingMemoryUsageInByte(); + } + + public void addFloatingMemoryUsageInByte(final String pipeName, final long sizeInByte) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + if (Objects.nonNull(pipeMeta)) { + ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) + .addFloatingMemoryUsageInByte(sizeInByte); + } + } + + public void decreaseFloatingMemoryUsageInByte(final String pipeName, final long sizeInByte) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + if (Objects.nonNull(pipeMeta)) { + ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) + .decreaseFloatingMemoryUsageInByte(sizeInByte); + } + } + + public int getPipeCount() { + return pipeMetaKeeper.getPipeMetaCount(); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java index e536a3a7af2..997278010e9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java @@ -37,7 +37,7 @@ public class PipeMeta { private final PipeTemporaryMeta temporaryMeta; public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta runtimeMeta) { - this(staticMeta, runtimeMeta, new PipeTemporaryMeta()); + this(staticMeta, runtimeMeta, new PipeTemporaryMetaInCoordinator()); } public PipeMeta( @@ -79,14 +79,23 @@ public class PipeMeta { return new PipeMeta(staticMeta, runtimeMeta); } - public static PipeMeta deserialize(final ByteBuffer byteBuffer) { + public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) { + final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer); + final PipeRuntimeMeta runtimeMeta = PipeRuntimeMeta.deserialize(byteBuffer); + return new PipeMeta( + staticMeta, + runtimeMeta, + new PipeTemporaryMetaInAgent(staticMeta.getPipeName(), staticMeta.getCreationTime())); + } + + public static PipeMeta deserialize4Coordinator(final ByteBuffer byteBuffer) { final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer); final PipeRuntimeMeta runtimeMeta = PipeRuntimeMeta.deserialize(byteBuffer); return new PipeMeta(staticMeta, runtimeMeta); } - public PipeMeta deepCopy() throws IOException { - return PipeMeta.deserialize(serialize()); + public PipeMeta deepCopy4TaskAgent() throws IOException { + return PipeMeta.deserialize4TaskAgent(serialize()); } public String coreReportMessage() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java index aeffe7fb3d9..5363f47190b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java @@ -19,75 +19,4 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; -import java.util.Collections; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class PipeTemporaryMeta { - - private final Set<Integer> completedDataNodeIds = - Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new ConcurrentHashMap<>(); - private final ConcurrentMap<Integer, Double> nodeId2RemainingTimeMap = new ConcurrentHashMap<>(); - - public void markDataNodeCompleted(final int dataNodeId) { - completedDataNodeIds.add(dataNodeId); - } - - public void markDataNodeUncompleted(final int dataNodeId) { - completedDataNodeIds.remove(dataNodeId); - } - - public void setRemainingEvent(final int dataNodeId, final long remainingEventCount) { - nodeId2RemainingEventMap.put(dataNodeId, remainingEventCount); - } - - public void setRemainingTime(final int dataNodeId, final double remainingTime) { - nodeId2RemainingTimeMap.put(dataNodeId, remainingTime); - } - - public Set<Integer> getCompletedDataNodeIds() { - return completedDataNodeIds; - } - - public long getGlobalRemainingEvents() { - return nodeId2RemainingEventMap.values().stream().reduce(Long::sum).orElse(0L); - } - - public double getGlobalRemainingTime() { - return nodeId2RemainingTimeMap.values().stream().reduce(Math::max).orElse(0d); - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final PipeTemporaryMeta that = (PipeTemporaryMeta) o; - return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds) - && Objects.equals(this.nodeId2RemainingEventMap, that.nodeId2RemainingEventMap) - && Objects.equals(this.nodeId2RemainingTimeMap, that.nodeId2RemainingTimeMap); - } - - @Override - public int hashCode() { - return Objects.hash(completedDataNodeIds, nodeId2RemainingEventMap, nodeId2RemainingTimeMap); - } - - @Override - public String toString() { - return "PipeTemporaryMeta{" - + "completedDataNodeIds=" - + completedDataNodeIds - + ", nodeId2RemainingEventMap=" - + nodeId2RemainingEventMap - + ", nodeId2RemainingTimeMap" - + nodeId2RemainingTimeMap - + '}'; - } -} +public interface PipeTemporaryMeta {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java new file mode 100644 index 00000000000..23914fa8d84 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.task.meta; + +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class PipeTemporaryMetaInAgent implements PipeTemporaryMeta { + + // Statistics + private final AtomicLong floatingMemoryUsageInByte = new AtomicLong(0L); + + // Object pool + private final String pipeNameWithCreationTime; + private final Map<Integer, CommitterKey> regionId2CommitterKeyMap = new ConcurrentHashMap<>(); + + PipeTemporaryMetaInAgent(final String pipeName, final long creationTime) { + this.pipeNameWithCreationTime = pipeName + "_" + creationTime; + } + + /////////////////////////////// DataNode /////////////////////////////// + + public void addFloatingMemoryUsageInByte(final long usage) { + floatingMemoryUsageInByte.addAndGet(usage); + } + + public void decreaseFloatingMemoryUsageInByte(final long usage) { + floatingMemoryUsageInByte.addAndGet(-usage); + } + + public long getFloatingMemoryUsageInByte() { + return floatingMemoryUsageInByte.get(); + } + + public String getPipeNameWithCreationTime() { + return pipeNameWithCreationTime; + } + + public CommitterKey getCommitterKey( + final String pipeName, final long creationTime, final int regionId, final int restartTime) { + final CommitterKey key = regionId2CommitterKeyMap.get(regionId); + if (Objects.nonNull(key) && key.getRestartTimes() == restartTime) { + return key; + } + final CommitterKey newKey = new CommitterKey(pipeName, creationTime, regionId, restartTime); + if (Objects.nonNull(key) && restartTime < key.getRestartTimes()) { + return newKey; + } + // restartTime > key.getRestartTimes() + regionId2CommitterKeyMap.put(regionId, newKey); + return newKey; + } + + /////////////////////////////// Object /////////////////////////////// + + // We assume that the "pipeNameWithCreationTime" does not contain extra information + // thus we do not consider it here + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PipeTemporaryMetaInAgent that = (PipeTemporaryMetaInAgent) o; + return Objects.equals( + this.floatingMemoryUsageInByte.get(), that.floatingMemoryUsageInByte.get()) + && Objects.equals(this.regionId2CommitterKeyMap, that.regionId2CommitterKeyMap); + } + + @Override + public int hashCode() { + return Objects.hash(floatingMemoryUsageInByte, regionId2CommitterKeyMap); + } + + @Override + public String toString() { + return "PipeTemporaryMeta{" + + "floatingMemoryUsage=" + + floatingMemoryUsageInByte + + ", regionId2CommitterKeyMap=" + + regionId2CommitterKeyMap + + '}'; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java similarity index 94% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java copy to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java index aeffe7fb3d9..ee127bbae44 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java @@ -25,8 +25,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -public class PipeTemporaryMeta { +public class PipeTemporaryMetaInCoordinator implements PipeTemporaryMeta { + // ConfigNode statistics private final Set<Integer> completedDataNodeIds = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new ConcurrentHashMap<>(); @@ -68,7 +69,7 @@ public class PipeTemporaryMeta { if (o == null || getClass() != o.getClass()) { return false; } - final PipeTemporaryMeta that = (PipeTemporaryMeta) o; + final PipeTemporaryMetaInCoordinator that = (PipeTemporaryMetaInCoordinator) o; return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds) && Objects.equals(this.nodeId2RemainingEventMap, that.nodeId2RemainingEventMap) && Objects.equals(this.nodeId2RemainingTimeMap, that.nodeId2RemainingTimeMap); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java index d135ba27a3e..8335d504551 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java @@ -32,7 +32,7 @@ public class CommitterKey { this(pipeName, creationTime, regionId, -1); } - CommitterKey( + public CommitterKey( final String pipeName, final long creationTime, final int regionId, final int restartTimes) { this.pipeName = pipeName; this.creationTime = creationTime; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index c429f3155c0..af84d89f3da 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.agent.task.progress; +import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics; @@ -34,6 +35,7 @@ public class PipeEventCommitManager { private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCommitManager.class); + private volatile PipeTaskAgent taskAgent; private final Map<CommitterKey, PipeEventCommitter> eventCommitterMap = new ConcurrentHashMap<>(); // the restartTimes in the committer key is always -1 @@ -104,7 +106,9 @@ public class PipeEventCommitManager { } if (Objects.nonNull(commitRateMarker)) { try { - commitRateMarker.accept(event.getPipeNameWithCreationTime(), event.isDataRegionEvent()); + commitRateMarker.accept( + taskAgent.getPipeNameWithCreationTime(event.getPipeName(), event.getCreationTime()), + event.isDataRegionEvent()); } catch (final Exception e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -144,7 +148,7 @@ public class PipeEventCommitManager { private CommitterKey generateCommitterKey( final String pipeName, final long creationTime, final int regionId) { - return new CommitterKey( + return taskAgent.getCommitterKey( pipeName, creationTime, regionId, @@ -162,6 +166,10 @@ public class PipeEventCommitManager { committerKey.getPipeName(), committerKey.getCreationTime(), committerKey.getRegionId()); } + public void setTaskAgent(final PipeTaskAgent taskAgent) { + this.taskAgent = taskAgent; + } + public void setCommitRateMarker(final BiConsumer<String, Boolean> commitRateMarker) { this.commitRateMarker = commitRateMarker; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 23b68f450a4..c04653adaa3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -53,7 +53,6 @@ public abstract class EnrichedEvent implements Event { protected final String pipeName; protected final long creationTime; - private final String pipeNameWithCreationTime; // cache for better performance protected final PipeTaskMeta pipeTaskMeta; @@ -84,7 +83,6 @@ public abstract class EnrichedEvent implements Event { isReleased = new AtomicBoolean(false); this.pipeName = pipeName; this.creationTime = creationTime; - this.pipeNameWithCreationTime = pipeName + "_" + creationTime; this.pipeTaskMeta = pipeTaskMeta; this.pipePattern = pipePattern; this.startTime = startTime; @@ -295,10 +293,6 @@ public abstract class EnrichedEvent implements Event { return creationTime; } - public String getPipeNameWithCreationTime() { - return pipeNameWithCreationTime; - } - public final int getRegionId() { // TODO: persist regionId in EnrichedEvent return committerKey == null ? -1 : committerKey.getRegionId(); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java index ad77c8c3901..8e04baf7cc0 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java @@ -51,7 +51,7 @@ public class PipeMetaDeSerTest { @Test public void test() throws IOException { - PipeStaticMeta pipeStaticMeta = + final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta( "pipeName", 123L, @@ -67,8 +67,8 @@ public class PipeMetaDeSerTest { } }, new HashMap<String, String>() {}); - ByteBuffer staticByteBuffer = pipeStaticMeta.serialize(); - PipeStaticMeta pipeStaticMeta1 = PipeStaticMeta.deserialize(staticByteBuffer); + final ByteBuffer staticByteBuffer = pipeStaticMeta.serialize(); + final PipeStaticMeta pipeStaticMeta1 = PipeStaticMeta.deserialize(staticByteBuffer); Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1); HybridProgressIndex hybridProgressIndex = @@ -82,8 +82,9 @@ public class PipeMetaDeSerTest { hybridProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( new IoTProgressIndex(3, 6L)); - Map<String, Pair<Long, ByteBuffer>> timeSeries2TimestampWindowBufferPairMap = new HashMap<>(); - ByteBuffer buffer; + final Map<String, Pair<Long, ByteBuffer>> timeSeries2TimestampWindowBufferPairMap = + new HashMap<>(); + final ByteBuffer buffer; try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { ReadWriteIOUtils.write("123", outputStream); @@ -92,7 +93,7 @@ public class PipeMetaDeSerTest { timeSeries2TimestampWindowBufferPairMap.put("root.test.a1", new Pair<>(123L, buffer)); final HybridProgressIndex finalHybridProgressIndex = hybridProgressIndex; - PipeRuntimeMeta pipeRuntimeMeta = + final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta( new ConcurrentHashMap<Integer, PipeTaskMeta>() { { @@ -145,9 +146,9 @@ public class PipeMetaDeSerTest { pipeRuntimeMeta1 = PipeRuntimeMeta.deserialize(runtimeByteBuffer); Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1); - PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta); - ByteBuffer byteBuffer = pipeMeta.serialize(); - PipeMeta pipeMeta1 = PipeMeta.deserialize(byteBuffer); + final PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta); + final ByteBuffer byteBuffer = pipeMeta.serialize(); + final PipeMeta pipeMeta1 = PipeMeta.deserialize4Coordinator(byteBuffer); Assert.assertEquals(pipeMeta, pipeMeta1); } }
