This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3cb544e39b2 Pipe: Reduce the memory usage & Enable insertNode memory
control for stream mode degrade and stuck restart (#14102)
3cb544e39b2 is described below
commit 3cb544e39b250b56bdc8d4c4e3ff0b50c2922e5f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 21 15:45:59 2024 +0800
Pipe: Reduce the memory usage & Enable insertNode memory control for stream
mode degrade and stuck restart (#14102)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/runtime/PipeHandleMetaChangePlan.java | 8 +-
.../response/pipe/task/PipeTableResp.java | 5 +-
.../pipe/agent/task/PipeConfigNodeTaskAgent.java | 4 +-
.../runtime/heartbeat/PipeHeartbeat.java | 3 +-
.../runtime/heartbeat/PipeHeartbeatParser.java | 5 +-
.../manager/pipe/metric/PipeConfigNodeMetrics.java | 4 +-
... => PipeTemporaryMetaInCoordinatorMetrics.java} | 29 +++---
.../confignode/persistence/pipe/PipeInfo.java | 16 +--
.../confignode/persistence/pipe/PipeTaskInfo.java | 4 +-
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 2 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 30 ++++--
.../evolvable/batch/PipeTabletEventPlainBatch.java | 10 +-
.../async/IoTDBDataRegionAsyncConnector.java | 2 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 30 +++++-
.../common/tablet/PipeRawTabletInsertionEvent.java | 6 +-
.../PipeRealtimeDataRegionHybridExtractor.java | 12 ++-
.../impl/DataNodeInternalRPCServiceImpl.java | 7 +-
.../config/executor/ClusterConfigTaskExecutor.java | 6 +-
.../dataregion/wal/utils/WALEntryHandler.java | 12 +--
.../TsFileResourceProgressIndexTest.java | 5 +
.../commons/consensus/index/ProgressIndex.java | 12 ++-
.../consensus/index/impl/HybridProgressIndex.java | 36 ++++---
.../consensus/index/impl/IoTProgressIndex.java | 12 +++
.../consensus/index/impl/MetaProgressIndex.java | 9 +-
.../consensus/index/impl/MinimumProgressIndex.java | 5 +
.../consensus/index/impl/RecoverProgressIndex.java | 13 +++
.../consensus/index/impl/SimpleProgressIndex.java | 9 +-
.../consensus/index/impl/StateProgressIndex.java | 17 +++-
.../index/impl/TimeWindowStateProgressIndex.java | 24 +++++
.../commons/pipe/agent/task/PipeTaskAgent.java | 49 ++++++++++
.../commons/pipe/agent/task/meta/PipeMeta.java | 17 +++-
.../pipe/agent/task/meta/PipeTemporaryMeta.java | 73 +-------------
.../agent/task/meta/PipeTemporaryMetaInAgent.java | 107 +++++++++++++++++++++
...ta.java => PipeTemporaryMetaInCoordinator.java} | 5 +-
.../pipe/agent/task/progress/CommitterKey.java | 2 +-
.../task/progress/PipeEventCommitManager.java | 12 ++-
.../iotdb/commons/pipe/event/EnrichedEvent.java | 6 --
.../iotdb/commons/pipe/task/PipeMetaDeSerTest.java | 19 ++--
38 files changed, 447 insertions(+), 180 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
index 07d70e92db4..74a3cf37c94 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
@@ -48,7 +48,7 @@ public class PipeHandleMetaChangePlan extends
ConfigPhysicalPlan {
}
@Override
- protected void serializeImpl(DataOutputStream stream) throws IOException {
+ protected void serializeImpl(final DataOutputStream stream) throws
IOException {
stream.writeShort(getType().getPlanType());
stream.writeInt(pipeMetaList.size());
@@ -58,16 +58,16 @@ public class PipeHandleMetaChangePlan extends
ConfigPhysicalPlan {
}
@Override
- protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
- PipeMeta pipeMeta = PipeMeta.deserialize(buffer);
+ PipeMeta pipeMeta = PipeMeta.deserialize4Coordinator(buffer);
pipeMetaList.add(pipeMeta);
}
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index f2cbe60cb97..020690f2465 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
@@ -175,7 +175,8 @@ public class PipeTableResp implements DataSet {
staticMeta.getProcessorParameters().toString(),
staticMeta.getConnectorParameters().toString(),
exceptionMessageBuilder.toString());
- final PipeTemporaryMeta temporaryMeta = pipeMeta.getTemporaryMeta();
+ final PipeTemporaryMetaInCoordinator temporaryMeta =
+ (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta();
final boolean canCalculateOnLocal = canCalculateOnLocal(pipeMeta);
showPipeInfo.setRemainingEventCount(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index d5204b1c99b..2451e055c72 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -121,7 +121,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
final PipeMeta pipeMetaFromCoordinator) {
try {
return PipeConfigNodeAgent.runtime().isLeaderReady()
- ?
super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy())
+ ?
super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy4TaskAgent())
: null;
} catch (final Exception e) {
return new TPushPipeMetaRespExceptionMessage(
@@ -152,7 +152,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
.map(
pipeMeta -> {
try {
- return pipeMeta.deepCopy();
+ return pipeMeta.deepCopy4TaskAgent();
} catch (Exception e) {
throw new PipeException("failed to deep copy
pipeMeta", e);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 9c909149896..02ed8cca2fc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -42,7 +42,8 @@ public class PipeHeartbeat {
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
- final PipeMeta pipeMeta =
PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
+ final PipeMeta pipeMeta =
+
PipeMeta.deserialize4TaskAgent(pipeMetaByteBufferListFromAgent.get(i));
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
isCompletedMap.put(
pipeMeta.getStaticMeta(),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 16e2e6a5b73..e2303fecdea 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -28,7 +28,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -144,7 +144,8 @@ public class PipeHeartbeatParser {
continue;
}
- final PipeTemporaryMeta temporaryMeta =
pipeMetaFromCoordinator.getTemporaryMeta();
+ final PipeTemporaryMetaInCoordinator temporaryMeta =
+ (PipeTemporaryMetaInCoordinator)
pipeMetaFromCoordinator.getTemporaryMeta();
// Remove completed pipes
final Boolean isPipeCompletedFromAgent =
pipeHeartbeat.isCompleted(staticMeta);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index 264ed997b84..cc9652cb6f4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -41,7 +41,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigRegionExtractorMetrics.getInstance().bindTo(metricService);
PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
- PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance().bindTo(metricService);
PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
PipeConfigNodeResourceMetrics.getInstance().bindTo(metricService);
}
@@ -54,7 +54,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigRegionExtractorMetrics.getInstance().unbindFrom(metricService);
PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
- PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
+
PipeTemporaryMetaInCoordinatorMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeResourceMetrics.getInstance().unbindFrom(metricService);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java
similarity index 82%
rename from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
rename to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java
index 2296a53d91e..cc46a1a015e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.metric;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.metrics.AbstractMetricService;
@@ -39,17 +40,19 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
- * The {@link PipeTemporaryMetaMetrics} is to calculate the pipe-statistics
from the {@link
- * PipeTemporaryMeta}. The class is lock-free and can only read from the
thread-safe variables from
- * the {@link PipeTemporaryMeta}.
+ * The {@link PipeTemporaryMetaInCoordinatorMetrics} is to calculate the
pipe-statistics from the
+ * {@link PipeTemporaryMeta}. The class is lock-free and can only read from
the thread-safe
+ * variables from the {@link PipeTemporaryMeta}.
*/
-public class PipeTemporaryMetaMetrics implements IMetricSet {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTemporaryMetaMetrics.class);
+public class PipeTemporaryMetaInCoordinatorMetrics implements IMetricSet {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeTemporaryMetaInCoordinatorMetrics.class);
@SuppressWarnings("java:S3077")
private volatile AbstractMetricService metricService;
- private final Map<String, PipeTemporaryMeta> pipeTemporaryMetaMap = new
ConcurrentHashMap<>();
+ private final Map<String, PipeTemporaryMetaInCoordinator>
pipeTemporaryMetaMap =
+ new ConcurrentHashMap<>();
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@@ -64,13 +67,13 @@ public class PipeTemporaryMetaMetrics implements IMetricSet
{
}
private void createAutoGauge(final String pipeID) {
- final PipeTemporaryMeta pipeTemporaryMeta =
pipeTemporaryMetaMap.get(pipeID);
+ final PipeTemporaryMetaInCoordinator pipeTemporaryMeta =
pipeTemporaryMetaMap.get(pipeID);
final String[] pipeNameAndCreationTime = pipeID.split("_");
metricService.createAutoGauge(
Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(),
MetricLevel.IMPORTANT,
pipeTemporaryMeta,
- PipeTemporaryMeta::getGlobalRemainingEvents,
+ PipeTemporaryMetaInCoordinator::getGlobalRemainingEvents,
Tag.NAME.toString(),
pipeNameAndCreationTime[0],
Tag.CREATION_TIME.toString(),
@@ -79,7 +82,7 @@ public class PipeTemporaryMetaMetrics implements IMetricSet {
Metric.PIPE_GLOBAL_REMAINING_TIME.toString(),
MetricLevel.IMPORTANT,
pipeTemporaryMeta,
- PipeTemporaryMeta::getGlobalRemainingTime,
+ PipeTemporaryMetaInCoordinator::getGlobalRemainingTime,
Tag.NAME.toString(),
pipeNameAndCreationTime[0],
Tag.CREATION_TIME.toString(),
@@ -123,7 +126,8 @@ public class PipeTemporaryMetaMetrics implements IMetricSet
{
public void register(final PipeMeta pipeMeta) {
final String taskID =
pipeMeta.getStaticMeta().getPipeName() + "_" +
pipeMeta.getStaticMeta().getCreationTime();
- pipeTemporaryMetaMap.putIfAbsent(taskID, pipeMeta.getTemporaryMeta());
+ pipeTemporaryMetaMap.putIfAbsent(
+ taskID, (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta());
if (Objects.nonNull(metricService)) {
createMetrics(taskID);
}
@@ -163,14 +167,15 @@ public class PipeTemporaryMetaMetrics implements
IMetricSet {
private static class PipeTemporaryMetaMetricsHolder {
- private static final PipeTemporaryMetaMetrics INSTANCE = new
PipeTemporaryMetaMetrics();
+ private static final PipeTemporaryMetaInCoordinatorMetrics INSTANCE =
+ new PipeTemporaryMetaInCoordinatorMetrics();
private PipeTemporaryMetaMetricsHolder() {
// Empty constructor
}
}
- public static PipeTemporaryMetaMetrics getInstance() {
+ public static PipeTemporaryMetaInCoordinatorMetrics getInstance() {
return PipeTemporaryMetaMetricsHolder.INSTANCE;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index 48a9d705dc4..f1f15b02528 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -33,7 +33,7 @@ import
org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import
org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
import
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
import
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeTaskAgent;
-import
org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaMetrics;
+import
org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaInCoordinatorMetrics;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -94,7 +94,7 @@ public class PipeInfo implements SnapshotProcessor {
throw new PipeException("Failed to increase listener
reference", e);
}
});
- PipeTemporaryMetaMetrics.getInstance()
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
@@ -114,7 +114,7 @@ public class PipeInfo implements SnapshotProcessor {
PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
- PipeTemporaryMetaMetrics.getInstance()
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
@@ -143,7 +143,7 @@ public class PipeInfo implements SnapshotProcessor {
throw new PipeException("Failed to decrease listener
reference", e);
}
});
- PipeTemporaryMetaMetrics.getInstance()
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
@@ -181,7 +181,7 @@ public class PipeInfo implements SnapshotProcessor {
throw new PipeException("Failed to decrease listener
reference", e);
}
});
- PipeTemporaryMetaMetrics.getInstance()
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
@@ -206,7 +206,7 @@ public class PipeInfo implements SnapshotProcessor {
public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plans)
{
try {
final TSStatus status = pipeTaskInfo.operateMultiplePipes(plans);
- PipeTemporaryMetaMetrics.getInstance()
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return status;
} catch (final Exception e) {
@@ -225,7 +225,7 @@ public class PipeInfo implements SnapshotProcessor {
pipeMetaListFromCoordinator.add(pipeMeta);
}
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
- PipeTemporaryMetaMetrics.getInstance()
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
@@ -245,7 +245,7 @@ public class PipeInfo implements SnapshotProcessor {
pipeTaskInfo.getPipeMetaByPipeName(pipeMeta.getStaticMeta().getPipeName()));
}
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
- PipeTemporaryMetaMetrics.getInstance()
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 936220f77b3..70d23a47cc7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
@@ -602,7 +603,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
.get(consensusGroupId.getId())
.setLeaderNodeId(newLeader);
// New region leader may contain un-transferred
events
-
pipeMeta.getTemporaryMeta().markDataNodeUncompleted(newLeader);
+ ((PipeTemporaryMetaInCoordinator)
pipeMeta.getTemporaryMeta())
+ .markDataNodeUncompleted(newLeader);
} else {
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 0b8fb380cd9..4cfb7185def 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -168,7 +168,7 @@ class PipeAgentLauncher {
getAllPipeInfoResp.getAllPipeInfo().stream()
.map(
byteBuffer -> {
- final PipeMeta pipeMeta =
PipeMeta.deserialize(byteBuffer);
+ final PipeMeta pipeMeta =
PipeMeta.deserialize4TaskAgent(byteBuffer);
LOGGER.info(
"Pulled pipe meta from config node: {}, recovering
...", pipeMeta);
return pipeMeta;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 5e5143e5f06..a25029f6a58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -528,15 +528,25 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
// Only restart the stream mode pipes for releasing memTables.
- if (extractors.get(0).isStreamMode()
- &&
extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
- && (mayMemTablePinnedCountReachDangerousThreshold()
- || mayWalSizeReachThrottleThreshold())) {
- // Extractors of this pipe may be stuck and is pinning too many
MemTables.
- LOGGER.warn(
- "Pipe {} needs to restart because too many memTables are pinned.",
- pipeMeta.getStaticMeta());
- stuckPipes.add(pipeMeta);
+ if (extractors.get(0).isStreamMode()) {
+ if
(extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
+ && (mayMemTablePinnedCountReachDangerousThreshold()
+ || mayWalSizeReachThrottleThreshold())) {
+ // Extractors of this pipe may be stuck and is pinning too many
MemTables.
+ LOGGER.warn(
+ "Pipe {} needs to restart because too many memTables are
pinned.",
+ pipeMeta.getStaticMeta());
+ stuckPipes.add(pipeMeta);
+ } else if (getFloatingMemoryUsageInByte(pipeName)
+ >=
(PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+ -
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
+ / pipeMetaKeeper.getPipeMetaCount()) {
+ // Extractors of this pipe may have too many insert nodes
+ LOGGER.warn(
+ "Pipe {} needs to restart because too many insertNodes are
extracted.",
+ pipeMeta.getStaticMeta());
+ stuckPipes.add(pipeMeta);
+ }
}
}
@@ -582,7 +592,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
acquireWriteLock();
try {
final long startTime = System.currentTimeMillis();
- final PipeMeta originalPipeMeta = pipeMeta.deepCopy();
+ final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent();
handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
handleSinglePipeMetaChanges(originalPipeMeta);
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index fae2338a215..ce865b1fb21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -54,7 +54,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
private final List<String> binaryDataBases = new ArrayList<>();
- private final List<String> inertNodeDataBases = new ArrayList<>();
+ private final List<String> insertNodeDataBases = new ArrayList<>();
private final List<String> tabletDataBases = new ArrayList<>();
// limit in buffer size
@@ -111,7 +111,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
tabletBuffers.clear();
binaryDataBases.clear();
- inertNodeDataBases.clear();
+ insertNodeDataBases.clear();
tabletDataBases.clear();
pipe2BytesAccumulated.clear();
@@ -123,7 +123,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
insertNodeBuffers,
tabletBuffers,
binaryDataBases,
- inertNodeDataBases,
+ insertNodeDataBases,
tabletDataBases);
}
@@ -168,10 +168,10 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
databaseEstimateSize =
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
-
inertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
} else {
databaseEstimateSize = 4;
- inertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
+ insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
}
}
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index e473017a489..ca5982d9ea5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -396,7 +396,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private void logOnClientException(
final AsyncPipeDataTransferServiceClient client, final Exception e) {
if (client == null) {
- LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e);
+ LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
} else {
LOGGER.warn(
String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(),
client.getPort()), e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 616ce606fc0..3cd8cc72c25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser;
@@ -42,11 +43,14 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryPosition;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
@@ -63,10 +67,16 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent
- implements TabletInsertionEvent, ReferenceTrackableEvent {
+ implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(PipeInsertNodeTabletInsertionEvent.class)
+ + RamUsageEstimator.shallowSizeOfInstance(WALEntryHandler.class)
+ + RamUsageEstimator.shallowSizeOfInstance(WALEntryPosition.class)
+ + RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class)
+ + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class);
private final WALEntryHandler walEntryHandler;
private final boolean isAligned;
@@ -164,12 +174,13 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseTabletEventCount(pipeName, creationTime);
+ PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName,
ramBytesUsed());
}
return true;
} catch (final Exception e) {
LOGGER.warn(
String.format(
- "Increase reference count for memtable %d error. Holder Message:
%s",
+ "Increase reference count for memTable %d error. Holder Message:
%s",
walEntryHandler.getMemTableId(), holderMessage),
e);
return false;
@@ -180,7 +191,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
try {
PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
- // Release the containers' memory.
+ // Release the parsers' memory.
if (eventParsers != null) {
eventParsers.clear();
eventParsers = null;
@@ -195,6 +206,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
return false;
} finally {
if (Objects.nonNull(pipeName)) {
+ PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName,
ramBytesUsed());
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseTabletEventCount(pipeName, creationTime);
}
@@ -481,6 +493,18 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
this.isReleased, this.referenceCount, this.walEntryHandler);
}
+ // Notes:
+ // 1. We only consider insertion event's memory for degrade and restart,
because degrade/restart
+ // may not be of use for releasing other events' memory.
+ // 2. We do not consider eventParsers because they may not exist and if it
is invoked, the event
+ // will soon be released.
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath)
: 0)
+ + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
+ }
+
private static class PipeInsertNodeTabletInsertionEventResource extends
PipeEventResource {
private final WALEntryHandler walEntryHandler;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5157ece8810..2782e04c9aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.record.Tablet;
import java.util.Objects;
@@ -51,6 +52,9 @@ import java.util.function.BiConsumer;
public class PipeRawTabletInsertionEvent extends PipeInsertionEvent
implements TabletInsertionEvent, ReferenceTrackableEvent {
+ // For better calculation
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(PipeRawTabletInsertionEvent.class);
private Tablet tablet;
private String deviceId; // Only used when the tablet is released.
private final boolean isAligned;
@@ -168,7 +172,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
- PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
+ PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) +
INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseTabletEventCount(pipeName, creationTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index bf2a5b6a966..95437f162b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -214,7 +214,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
|| mayMemTablePinnedCountReachDangerousThreshold()
|| isHistoricalTsFileEventCountExceededLimit()
|| isRealtimeTsFileEventCountExceededLimit()
- || mayTsFileLinkedCountReachDangerousThreshold();
+ || mayTsFileLinkedCountReachDangerousThreshold()
+ || mayInsertNodeMemoryReachDangerousThreshold();
}
private boolean mayWalSizeReachThrottleThreshold() {
@@ -245,6 +246,15 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
}
+ private boolean mayInsertNodeMemoryReachDangerousThreshold() {
+ return 3
+ * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
+ * PipeDataNodeAgent.task().getPipeCount()
+ >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+ -
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
+ * 2;
+ }
+
@Override
public Event supply() {
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3d40633bdf1..d028fb1f182 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1118,7 +1118,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
PipeDataNodeAgent.task()
.handlePipeMetaChanges(
req.getPipeMetas().stream()
- .map(PipeMeta::deserialize)
+ .map(PipeMeta::deserialize4TaskAgent)
.collect(Collectors.toList()));
return exceptionMessages.isEmpty()
@@ -1141,7 +1141,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (req.isSetPipeNameToDrop()) {
exceptionMessage =
PipeDataNodeAgent.task().handleDropPipe(req.getPipeNameToDrop());
} else if (req.isSetPipeMeta()) {
- final PipeMeta pipeMeta =
PipeMeta.deserialize(ByteBuffer.wrap(req.getPipeMeta()));
+ final PipeMeta pipeMeta =
+ PipeMeta.deserialize4TaskAgent(ByteBuffer.wrap(req.getPipeMeta()));
exceptionMessage =
PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
} else {
throw new Exception("Invalid TPushSinglePipeMetaReq");
@@ -1178,7 +1179,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
} else if (req.isSetPipeMetas()) {
for (ByteBuffer byteBuffer : req.getPipeMetas()) {
- final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
+ final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer);
TPushPipeMetaRespExceptionMessage message =
PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
exceptionMessages.add(message);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 0b2fad3a64d..8bde753ee72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1945,7 +1945,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
pipeMetaFromCoordinator =
getAllPipeInfoResp.getAllPipeInfo().stream()
- .map(PipeMeta::deserialize)
+ .map(PipeMeta::deserialize4Coordinator)
.filter(
pipeMeta ->
pipeMeta
@@ -1964,7 +1964,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new IoTDBException(exceptionMessage,
TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
- } catch (Exception e) {
+ } catch (final Exception e) {
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, because %s",
@@ -2024,7 +2024,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute());
}
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.info("Failed to validate alter pipe statement, because {}",
e.getMessage(), e);
future.setException(
new IoTDBException(e.getMessage(),
TSStatusCode.PIPE_ERROR.getStatusCode()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index ea9570033ae..edc67edccfa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -58,7 +58,7 @@ public class WALEntryHandler {
private volatile boolean isHardlink = false;
private final AtomicReference<File> hardlinkFile = new AtomicReference<>();
- public WALEntryHandler(WALEntryValue value) {
+ public WALEntryHandler(final WALEntryValue value) {
this.value = value;
}
@@ -105,7 +105,7 @@ public class WALEntryHandler {
*/
public InsertNode getInsertNode() throws WALPipeException {
// return local cache
- WALEntryValue res = value;
+ final WALEntryValue res = value;
if (res != null) {
if (res instanceof InsertNode) {
return (InsertNode) res;
@@ -120,7 +120,7 @@ public class WALEntryHandler {
synchronized (this) {
this.wait();
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Interrupted when waiting for result.", e);
Thread.currentThread().interrupt();
}
@@ -179,7 +179,7 @@ public class WALEntryHandler {
}
}
- public void setWalNode(WALNode walNode, long memTableId) {
+ public void setWalNode(final WALNode walNode, final long memTableId) {
this.walNode = walNode;
this.memTableId = memTableId;
walEntryPosition.setWalNode(walNode, memTableId);
@@ -189,7 +189,7 @@ public class WALEntryHandler {
return memTableId;
}
- public void setEntryPosition(long walFileVersionId, long position) {
+ public void setEntryPosition(final long walFileVersionId, final long
position) {
this.walEntryPosition.setEntryPosition(walFileVersionId, position, value);
this.value = null;
synchronized (this) {
@@ -205,7 +205,7 @@ public class WALEntryHandler {
return walEntryPosition.getSize();
}
- public void setSize(int size) {
+ public void setSize(final int size) {
this.walEntryPosition.setSize(size);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index 4d1f2b8bb3e..07ae50a5f88 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -227,6 +227,11 @@ public class TsFileResourceProgressIndexTest {
public TotalOrderSumTuple getTotalOrderSumTuple() {
return new TotalOrderSumTuple((long) val);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
}
@Test
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 43f934c5b9b..b54d6db4dab 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
import javax.annotation.Nonnull;
@@ -32,6 +34,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@@ -53,7 +56,14 @@ import java.util.stream.LongStream;
* immutability contract. This prevents unintended modifications to the
underlying mutable state
* from affecting other parts of the program.
*/
-public abstract class ProgressIndex {
+public abstract class ProgressIndex implements Accountable {
+
+ protected static final long LOCK_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.class)
+ +
RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.ReadLock.class)
+ +
RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.WriteLock.class)
+ + ((long) RamUsageEstimator.NUM_BYTES_OBJECT_HEADER << 1)
+ + 64;
/** Serialize this progress index to the given byte buffer. */
public abstract void serialize(ByteBuffer byteBuffer);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index ac0e50b317c..b27c2666cc9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -40,6 +41,11 @@ import java.util.stream.Collectors;
public class HybridProgressIndex extends ProgressIndex {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class) +
ProgressIndex.LOCK_SIZE;
+ private static final long ENTRY_SIZE =
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ + RamUsageEstimator.alignObjectSize(Short.BYTES);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<Short, ProgressIndex> type2Index;
@@ -48,11 +54,11 @@ public class HybridProgressIndex extends ProgressIndex {
this(Collections.emptyMap());
}
- public HybridProgressIndex(ProgressIndex progressIndex) {
+ public HybridProgressIndex(final ProgressIndex progressIndex) {
this(Collections.singletonMap(progressIndex.getType().getType(),
progressIndex));
}
- private HybridProgressIndex(Map<Short, ProgressIndex> type2Index) {
+ private HybridProgressIndex(final Map<Short, ProgressIndex> type2Index) {
this.type2Index = new HashMap<>(type2Index);
}
@@ -61,7 +67,7 @@ public class HybridProgressIndex extends ProgressIndex {
}
@Override
- public void serialize(ByteBuffer byteBuffer) {
+ public void serialize(final ByteBuffer byteBuffer) {
lock.readLock().lock();
try {
ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(byteBuffer);
@@ -77,7 +83,7 @@ public class HybridProgressIndex extends ProgressIndex {
}
@Override
- public void serialize(OutputStream stream) throws IOException {
+ public void serialize(final OutputStream stream) throws IOException {
lock.readLock().lock();
try {
ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(stream);
@@ -93,7 +99,7 @@ public class HybridProgressIndex extends ProgressIndex {
}
@Override
- public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
+ public boolean isAfter(@Nonnull final ProgressIndex progressIndex) {
lock.readLock().lock();
try {
if (progressIndex instanceof MinimumProgressIndex) {
@@ -121,14 +127,14 @@ public class HybridProgressIndex extends ProgressIndex {
}
}
- public boolean isGivenProgressIndexAfterSelf(ProgressIndex progressIndex) {
+ public boolean isGivenProgressIndexAfterSelf(final ProgressIndex
progressIndex) {
return type2Index.size() == 1
&& type2Index.containsKey(progressIndex.getType().getType())
&&
progressIndex.isAfter(type2Index.get(progressIndex.getType().getType()));
}
@Override
- public boolean equals(ProgressIndex progressIndex) {
+ public boolean equals(final ProgressIndex progressIndex) {
lock.readLock().lock();
try {
if (!(progressIndex instanceof HybridProgressIndex)) {
@@ -152,7 +158,7 @@ public class HybridProgressIndex extends ProgressIndex {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
@@ -171,7 +177,8 @@ public class HybridProgressIndex extends ProgressIndex {
}
@Override
- public ProgressIndex
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
+ public ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex(
+ final ProgressIndex progressIndex) {
lock.writeLock().lock();
try {
if (progressIndex == null || progressIndex instanceof
MinimumProgressIndex) {
@@ -229,7 +236,7 @@ public class HybridProgressIndex extends ProgressIndex {
}
}
- public static HybridProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ public static HybridProgressIndex deserializeFrom(final ByteBuffer
byteBuffer) {
final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; i++) {
@@ -240,7 +247,7 @@ public class HybridProgressIndex extends ProgressIndex {
return hybridProgressIndex;
}
- public static HybridProgressIndex deserializeFrom(InputStream stream) throws
IOException {
+ public static HybridProgressIndex deserializeFrom(final InputStream stream)
throws IOException {
final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
final int size = ReadWriteIOUtils.readInt(stream);
for (int i = 0; i < size; i++) {
@@ -255,4 +262,11 @@ public class HybridProgressIndex extends ProgressIndex {
public String toString() {
return "HybridProgressIndex{" + "type2Index=" + type2Index + '}';
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + type2Index.size() * ENTRY_SIZE
+ +
type2Index.values().stream().map(ProgressIndex::ramBytesUsed).reduce(0L,
Long::sum);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index 5c7ffb2cc81..8b02d85da5a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -37,6 +38,12 @@ import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class IoTProgressIndex extends ProgressIndex {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(IoTProgressIndex.class) +
ProgressIndex.LOCK_SIZE;
+
+ // We assume that the integers are all cached, while the longs are all not
+ private static final long ENTRY_SIZE =
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + Long.BYTES;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -232,4 +239,9 @@ public class IoTProgressIndex extends ProgressIndex {
public String toString() {
return "IoTProgressIndex{" + "peerId2SearchIndex=" + peerId2SearchIndex +
'}';
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE + peerId2SearchIndex.size() * ENTRY_SIZE;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
index c181345e4ad..75322152d45 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -34,7 +35,8 @@ import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MetaProgressIndex extends ProgressIndex {
-
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(MetaProgressIndex.class) +
ProgressIndex.LOCK_SIZE;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final long index;
@@ -173,4 +175,9 @@ public class MetaProgressIndex extends ProgressIndex {
public String toString() {
return "MetaProgressIndex{" + "index=" + index + '}';
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
index e476409a30b..e22f82c9fbb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -99,4 +99,9 @@ public class MinimumProgressIndex extends ProgressIndex {
public String toString() {
return "MinimumProgressIndex{}";
}
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index c2511222eab..5756594abeb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -39,6 +40,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class RecoverProgressIndex extends ProgressIndex {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(RecoverProgressIndex.class)
+ + +ProgressIndex.LOCK_SIZE;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -239,4 +243,13 @@ public class RecoverProgressIndex extends ProgressIndex {
public String toString() {
return "RecoverProgressIndex{" + "dataNodeId2LocalIndex=" +
dataNodeId2LocalIndex + '}';
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + dataNodeId2LocalIndex.size() *
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ + dataNodeId2LocalIndex.values().stream()
+ .map(SimpleProgressIndex::ramBytesUsed)
+ .reduce(0L, Long::sum);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index d852f0cc001..162dc9f128d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -34,7 +35,8 @@ import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SimpleProgressIndex extends ProgressIndex {
-
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SimpleProgressIndex.class) +
ProgressIndex.LOCK_SIZE;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final int rebootTimes;
@@ -211,4 +213,9 @@ public class SimpleProgressIndex extends ProgressIndex {
+ memTableFlushOrderId
+ '}';
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
index 9eca0795e51..b6c51665e19 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -44,7 +45,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* integrity and independence of the progress index instances.
*/
public class StateProgressIndex extends ProgressIndex {
-
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(StateProgressIndex.class) +
ProgressIndex.LOCK_SIZE;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final long version;
@@ -52,7 +54,7 @@ public class StateProgressIndex extends ProgressIndex {
private final ProgressIndex innerProgressIndex;
public StateProgressIndex(
- long version, Map<String, Binary> state, ProgressIndex
innerProgressIndex) {
+ final long version, final Map<String, Binary> state, final ProgressIndex
innerProgressIndex) {
this.version = version;
this.state = new HashMap<>(state);
this.innerProgressIndex = innerProgressIndex;
@@ -242,4 +244,15 @@ public class StateProgressIndex extends ProgressIndex {
+ innerProgressIndex
+ '}';
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + innerProgressIndex.ramBytesUsed()
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY * state.size()
+ + state.entrySet().stream()
+ .map(
+ entry -> RamUsageEstimator.sizeOf(entry.getKey()) +
entry.getValue().ramBytesUsed())
+ .reduce(0L, Long::sum);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
index 37d6b6fdf38..bb1b2c1ce5e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -47,6 +48,13 @@ import java.util.stream.Collectors;
*/
public class TimeWindowStateProgressIndex extends ProgressIndex {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(TimeWindowStateProgressIndex.class)
+ + ProgressIndex.LOCK_SIZE;
+ private static final long ENTRY_SIZE =
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ + RamUsageEstimator.shallowSizeOfInstance(Pair.class);
+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Only the byteBuffer is nullable, the timeSeries, pair and timestamp must
not be null
@@ -292,4 +300,20 @@ public class TimeWindowStateProgressIndex extends
ProgressIndex {
+ timeSeries2TimestampWindowBufferPairMap
+ "'}";
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + timeSeries2TimestampWindowBufferPairMap.size() * ENTRY_SIZE
+ + timeSeries2TimestampWindowBufferPairMap.entrySet().stream()
+ .map(
+ entry ->
+ RamUsageEstimator.sizeOf(entry.getKey())
+ + RamUsageEstimator.sizeOf(entry.getValue().getLeft())
+ + (Objects.nonNull(entry.getValue().getRight())
+ ?
(RamUsageEstimator.shallowSizeOfInstance(ByteBuffer.class)
+ +
RamUsageEstimator.sizeOf(entry.getValue().getRight().array()))
+ : 0))
+ .reduce(0L, Long::sum);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 00afcee7280..a42b7d5ea7e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -29,6 +29,9 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
@@ -81,6 +84,7 @@ public abstract class PipeTaskAgent {
// Help PipeEndPointRateLimiter to check if the pipe is still alive
PipeEndPointRateLimiter.setTaskAgent(this);
+ PipeEventCommitManager.getInstance().setTaskAgent(this);
}
////////////////////////// PipeMeta Lock Control //////////////////////////
@@ -1030,8 +1034,53 @@ public abstract class PipeTaskAgent {
protected abstract void collectPipeMetaListInternal(
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws
TException;
+ ///////////////////////// Maintain meta info /////////////////////////
+
public long getPipeCreationTime(final String pipeName) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime();
}
+
+ public String getPipeNameWithCreationTime(final String pipeName, final long
creationTime) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ return pipeMeta == null
+ ? pipeName + "_" + creationTime
+ : ((PipeTemporaryMetaInAgent)
pipeMeta.getTemporaryMeta()).getPipeNameWithCreationTime();
+ }
+
+ public CommitterKey getCommitterKey(
+ final String pipeName, final long creationTime, final int regionId,
final int restartTime) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ return pipeMeta == null
+ ? new CommitterKey(pipeName, creationTime, regionId, restartTime)
+ : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+ .getCommitterKey(pipeName, creationTime, regionId, restartTime);
+ }
+
+ public long getFloatingMemoryUsageInByte(final String pipeName) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ return pipeMeta == null
+ ? 0
+ : ((PipeTemporaryMetaInAgent)
pipeMeta.getTemporaryMeta()).getFloatingMemoryUsageInByte();
+ }
+
+ public void addFloatingMemoryUsageInByte(final String pipeName, final long
sizeInByte) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ if (Objects.nonNull(pipeMeta)) {
+ ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+ .addFloatingMemoryUsageInByte(sizeInByte);
+ }
+ }
+
+ public void decreaseFloatingMemoryUsageInByte(final String pipeName, final
long sizeInByte) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ if (Objects.nonNull(pipeMeta)) {
+ ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+ .decreaseFloatingMemoryUsageInByte(sizeInByte);
+ }
+ }
+
+ public int getPipeCount() {
+ return pipeMetaKeeper.getPipeMetaCount();
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index e536a3a7af2..997278010e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -37,7 +37,7 @@ public class PipeMeta {
private final PipeTemporaryMeta temporaryMeta;
public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta
runtimeMeta) {
- this(staticMeta, runtimeMeta, new PipeTemporaryMeta());
+ this(staticMeta, runtimeMeta, new PipeTemporaryMetaInCoordinator());
}
public PipeMeta(
@@ -79,14 +79,23 @@ public class PipeMeta {
return new PipeMeta(staticMeta, runtimeMeta);
}
- public static PipeMeta deserialize(final ByteBuffer byteBuffer) {
+ public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) {
+ final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
+ final PipeRuntimeMeta runtimeMeta =
PipeRuntimeMeta.deserialize(byteBuffer);
+ return new PipeMeta(
+ staticMeta,
+ runtimeMeta,
+ new PipeTemporaryMetaInAgent(staticMeta.getPipeName(),
staticMeta.getCreationTime()));
+ }
+
+ public static PipeMeta deserialize4Coordinator(final ByteBuffer byteBuffer) {
final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
final PipeRuntimeMeta runtimeMeta =
PipeRuntimeMeta.deserialize(byteBuffer);
return new PipeMeta(staticMeta, runtimeMeta);
}
- public PipeMeta deepCopy() throws IOException {
- return PipeMeta.deserialize(serialize());
+ public PipeMeta deepCopy4TaskAgent() throws IOException {
+ return PipeMeta.deserialize4TaskAgent(serialize());
}
public String coreReportMessage() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
index aeffe7fb3d9..5363f47190b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
@@ -19,75 +19,4 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class PipeTemporaryMeta {
-
- private final Set<Integer> completedDataNodeIds =
- Collections.newSetFromMap(new ConcurrentHashMap<>());
- private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new
ConcurrentHashMap<>();
- private final ConcurrentMap<Integer, Double> nodeId2RemainingTimeMap = new
ConcurrentHashMap<>();
-
- public void markDataNodeCompleted(final int dataNodeId) {
- completedDataNodeIds.add(dataNodeId);
- }
-
- public void markDataNodeUncompleted(final int dataNodeId) {
- completedDataNodeIds.remove(dataNodeId);
- }
-
- public void setRemainingEvent(final int dataNodeId, final long
remainingEventCount) {
- nodeId2RemainingEventMap.put(dataNodeId, remainingEventCount);
- }
-
- public void setRemainingTime(final int dataNodeId, final double
remainingTime) {
- nodeId2RemainingTimeMap.put(dataNodeId, remainingTime);
- }
-
- public Set<Integer> getCompletedDataNodeIds() {
- return completedDataNodeIds;
- }
-
- public long getGlobalRemainingEvents() {
- return
nodeId2RemainingEventMap.values().stream().reduce(Long::sum).orElse(0L);
- }
-
- public double getGlobalRemainingTime() {
- return
nodeId2RemainingTimeMap.values().stream().reduce(Math::max).orElse(0d);
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
- return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds)
- && Objects.equals(this.nodeId2RemainingEventMap,
that.nodeId2RemainingEventMap)
- && Objects.equals(this.nodeId2RemainingTimeMap,
that.nodeId2RemainingTimeMap);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(completedDataNodeIds, nodeId2RemainingEventMap,
nodeId2RemainingTimeMap);
- }
-
- @Override
- public String toString() {
- return "PipeTemporaryMeta{"
- + "completedDataNodeIds="
- + completedDataNodeIds
- + ", nodeId2RemainingEventMap="
- + nodeId2RemainingEventMap
- + ", nodeId2RemainingTimeMap"
- + nodeId2RemainingTimeMap
- + '}';
- }
-}
+public interface PipeTemporaryMeta {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java
new file mode 100644
index 00000000000..23914fa8d84
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.meta;
+
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeTemporaryMetaInAgent implements PipeTemporaryMeta {
+
+ // Statistics
+ private final AtomicLong floatingMemoryUsageInByte = new AtomicLong(0L);
+
+ // Object pool
+ private final String pipeNameWithCreationTime;
+ private final Map<Integer, CommitterKey> regionId2CommitterKeyMap = new
ConcurrentHashMap<>();
+
+ PipeTemporaryMetaInAgent(final String pipeName, final long creationTime) {
+ this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
+ }
+
+ /////////////////////////////// DataNode ///////////////////////////////
+
+ public void addFloatingMemoryUsageInByte(final long usage) {
+ floatingMemoryUsageInByte.addAndGet(usage);
+ }
+
+ public void decreaseFloatingMemoryUsageInByte(final long usage) {
+ floatingMemoryUsageInByte.addAndGet(-usage);
+ }
+
+ public long getFloatingMemoryUsageInByte() {
+ return floatingMemoryUsageInByte.get();
+ }
+
+ public String getPipeNameWithCreationTime() {
+ return pipeNameWithCreationTime;
+ }
+
+ public CommitterKey getCommitterKey(
+ final String pipeName, final long creationTime, final int regionId,
final int restartTime) {
+ final CommitterKey key = regionId2CommitterKeyMap.get(regionId);
+ if (Objects.nonNull(key) && key.getRestartTimes() == restartTime) {
+ return key;
+ }
+ final CommitterKey newKey = new CommitterKey(pipeName, creationTime,
regionId, restartTime);
+ if (Objects.nonNull(key) && restartTime < key.getRestartTimes()) {
+ return newKey;
+ }
+ // restartTime > key.getRestartTimes()
+ regionId2CommitterKeyMap.put(regionId, newKey);
+ return newKey;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ // We assume that the "pipeNameWithCreationTime" does not contain extra
information
+ // thus we do not consider it here
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PipeTemporaryMetaInAgent that = (PipeTemporaryMetaInAgent) o;
+ return Objects.equals(
+ this.floatingMemoryUsageInByte.get(),
that.floatingMemoryUsageInByte.get())
+ && Objects.equals(this.regionId2CommitterKeyMap,
that.regionId2CommitterKeyMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(floatingMemoryUsageInByte, regionId2CommitterKeyMap);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeTemporaryMeta{"
+ + "floatingMemoryUsage="
+ + floatingMemoryUsageInByte
+ + ", regionId2CommitterKeyMap="
+ + regionId2CommitterKeyMap
+ + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java
similarity index 94%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java
index aeffe7fb3d9..ee127bbae44 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java
@@ -25,8 +25,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-public class PipeTemporaryMeta {
+public class PipeTemporaryMetaInCoordinator implements PipeTemporaryMeta {
+ // ConfigNode statistics
private final Set<Integer> completedDataNodeIds =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new
ConcurrentHashMap<>();
@@ -68,7 +69,7 @@ public class PipeTemporaryMeta {
if (o == null || getClass() != o.getClass()) {
return false;
}
- final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
+ final PipeTemporaryMetaInCoordinator that =
(PipeTemporaryMetaInCoordinator) o;
return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds)
&& Objects.equals(this.nodeId2RemainingEventMap,
that.nodeId2RemainingEventMap)
&& Objects.equals(this.nodeId2RemainingTimeMap,
that.nodeId2RemainingTimeMap);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
index d135ba27a3e..8335d504551 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
@@ -32,7 +32,7 @@ public class CommitterKey {
this(pipeName, creationTime, regionId, -1);
}
- CommitterKey(
+ public CommitterKey(
final String pipeName, final long creationTime, final int regionId,
final int restartTimes) {
this.pipeName = pipeName;
this.creationTime = creationTime;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index c429f3155c0..af84d89f3da 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.agent.task.progress;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
@@ -34,6 +35,7 @@ public class PipeEventCommitManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitManager.class);
+ private volatile PipeTaskAgent taskAgent;
private final Map<CommitterKey, PipeEventCommitter> eventCommitterMap = new
ConcurrentHashMap<>();
// the restartTimes in the committer key is always -1
@@ -104,7 +106,9 @@ public class PipeEventCommitManager {
}
if (Objects.nonNull(commitRateMarker)) {
try {
- commitRateMarker.accept(event.getPipeNameWithCreationTime(),
event.isDataRegionEvent());
+ commitRateMarker.accept(
+ taskAgent.getPipeNameWithCreationTime(event.getPipeName(),
event.getCreationTime()),
+ event.isDataRegionEvent());
} catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -144,7 +148,7 @@ public class PipeEventCommitManager {
private CommitterKey generateCommitterKey(
final String pipeName, final long creationTime, final int regionId) {
- return new CommitterKey(
+ return taskAgent.getCommitterKey(
pipeName,
creationTime,
regionId,
@@ -162,6 +166,10 @@ public class PipeEventCommitManager {
committerKey.getPipeName(), committerKey.getCreationTime(),
committerKey.getRegionId());
}
+ public void setTaskAgent(final PipeTaskAgent taskAgent) {
+ this.taskAgent = taskAgent;
+ }
+
public void setCommitRateMarker(final BiConsumer<String, Boolean>
commitRateMarker) {
this.commitRateMarker = commitRateMarker;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 2fdef0730a8..30c1d1dca6b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -55,7 +55,6 @@ public abstract class EnrichedEvent implements Event {
protected final String pipeName;
protected final long creationTime;
- private final String pipeNameWithCreationTime; // cache for better
performance
protected final PipeTaskMeta pipeTaskMeta;
@@ -89,7 +88,6 @@ public abstract class EnrichedEvent implements Event {
this.pipeName = pipeName;
this.creationTime = creationTime;
- this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
this.pipeTaskMeta = pipeTaskMeta;
this.treePattern = treePattern;
this.tablePattern = tablePattern;
@@ -313,10 +311,6 @@ public abstract class EnrichedEvent implements Event {
return creationTime;
}
- public String getPipeNameWithCreationTime() {
- return pipeNameWithCreationTime;
- }
-
public final int getRegionId() {
// TODO: persist regionId in EnrichedEvent
return committerKey == null ? -1 : committerKey.getRegionId();
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
index ad77c8c3901..8e04baf7cc0 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
@@ -51,7 +51,7 @@ public class PipeMetaDeSerTest {
@Test
public void test() throws IOException {
- PipeStaticMeta pipeStaticMeta =
+ final PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
"pipeName",
123L,
@@ -67,8 +67,8 @@ public class PipeMetaDeSerTest {
}
},
new HashMap<String, String>() {});
- ByteBuffer staticByteBuffer = pipeStaticMeta.serialize();
- PipeStaticMeta pipeStaticMeta1 =
PipeStaticMeta.deserialize(staticByteBuffer);
+ final ByteBuffer staticByteBuffer = pipeStaticMeta.serialize();
+ final PipeStaticMeta pipeStaticMeta1 =
PipeStaticMeta.deserialize(staticByteBuffer);
Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1);
HybridProgressIndex hybridProgressIndex =
@@ -82,8 +82,9 @@ public class PipeMetaDeSerTest {
hybridProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
new IoTProgressIndex(3, 6L));
- Map<String, Pair<Long, ByteBuffer>>
timeSeries2TimestampWindowBufferPairMap = new HashMap<>();
- ByteBuffer buffer;
+ final Map<String, Pair<Long, ByteBuffer>>
timeSeries2TimestampWindowBufferPairMap =
+ new HashMap<>();
+ final ByteBuffer buffer;
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write("123", outputStream);
@@ -92,7 +93,7 @@ public class PipeMetaDeSerTest {
timeSeries2TimestampWindowBufferPairMap.put("root.test.a1", new
Pair<>(123L, buffer));
final HybridProgressIndex finalHybridProgressIndex = hybridProgressIndex;
- PipeRuntimeMeta pipeRuntimeMeta =
+ final PipeRuntimeMeta pipeRuntimeMeta =
new PipeRuntimeMeta(
new ConcurrentHashMap<Integer, PipeTaskMeta>() {
{
@@ -145,9 +146,9 @@ public class PipeMetaDeSerTest {
pipeRuntimeMeta1 = PipeRuntimeMeta.deserialize(runtimeByteBuffer);
Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
- PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta);
- ByteBuffer byteBuffer = pipeMeta.serialize();
- PipeMeta pipeMeta1 = PipeMeta.deserialize(byteBuffer);
+ final PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta);
+ final ByteBuffer byteBuffer = pipeMeta.serialize();
+ final PipeMeta pipeMeta1 = PipeMeta.deserialize4Coordinator(byteBuffer);
Assert.assertEquals(pipeMeta, pipeMeta1);
}
}