This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 282147773bd Pipe: Reduced the logs in ConfigNode caused by pipeMeta
sync and heartbeat report (#12813)
282147773bd is described below
commit 282147773bd1d7a8a6cea3aed5bc5b751b9c4e3b
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 26 22:02:18 2024 +0800
Pipe: Reduced the logs in ConfigNode caused by pipeMeta sync and heartbeat
report (#12813)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/agent/task/PipeConfigNodeTaskAgent.java | 37 ++++++++--------
.../runtime/heartbeat/PipeHeartbeat.java | 4 ++
.../runtime/heartbeat/PipeHeartbeatParser.java | 39 +++++++++++------
.../pipe/event/PipeConfigRegionSnapshotEvent.java | 9 +---
.../resource/PipeConfigNodeResourceManager.java | 51 ++++++++++++++++++++++
.../PipeConfigNodeSnapshotResourceManager.java | 11 +----
.../confignode/persistence/pipe/PipeTaskInfo.java | 16 ++++++-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 10 ++---
.../IoTDBDataNodeCacheLeaderClientManager.java | 8 ++--
.../evolvable/batch/PipeTabletEventPlainBatch.java | 4 +-
.../PipeConsensusTransferBatchReqBuilder.java | 4 +-
.../schema/PipeSchemaRegionSnapshotEvent.java | 4 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 6 +--
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 12 ++---
.../tsfile/TsFileInsertionDataContainer.java | 7 +--
.../PipeHistoricalDataRegionTsFileExtractor.java | 11 ++---
.../PipeRealtimeDataRegionHybridExtractor.java | 6 +--
.../realtime/assigner/DisruptorQueue.java | 12 ++---
.../iotdb/db/pipe/metric/PipeResourceMetrics.java | 14 +++---
.../downsampling/PartialPathLastObjectCache.java | 4 +-
...nager.java => PipeDataNodeResourceManager.java} | 8 ++--
.../db/pipe/resource/memory/PipeMemoryBlock.java | 18 ++++----
.../pipe/resource/tsfile/PipeTsFileResource.java | 10 +++--
.../resource/tsfile/PipeTsFileResourceManager.java | 4 +-
.../pipe/resource/wal/PipeWALResourceManager.java | 34 +++++++--------
.../hardlink/PipeWALHardlinkResourceManager.java | 23 +++++-----
.../selfhost/PipeWALSelfHostResourceManager.java | 8 ++--
.../dataregion/wal/utils/WALInsertNodeCache.java | 4 +-
.../event/SubscriptionEventBinaryCache.java | 8 ++--
.../PipeDataNodeSnapshotResourceManagerTest.java | 20 ++++-----
.../commons}/pipe/resource/log/PipeLogManager.java | 7 ++-
.../commons}/pipe/resource/log/PipeLogStatus.java | 6 +--
33 files changed, 250 insertions(+), 173 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index abfea097dbf..4536789daf6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -32,6 +33,7 @@ import
org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
import
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeRemainingTimeMetrics;
import
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigRegionExtractorMetrics;
+import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTask;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskBuilder;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskStage;
@@ -50,6 +52,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -57,8 +60,6 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfigNodeTaskAgent.class);
- private final AtomicLong lastLogPrintedTime = new AtomicLong(0);
-
@Override
protected boolean isShutdown() {
return PipeConfigNodeAgent.runtime().isShutdown();
@@ -214,11 +215,13 @@ public class PipeConfigNodeTaskAgent extends
PipeTaskAgent {
final List<Long> pipeRemainingEventCountList = new ArrayList<>();
final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
- final boolean shouldPrintLog =
- System.currentTimeMillis() - lastLogPrintedTime.get() > 1000 * 60 *
10; // 10 minutes
- if (shouldPrintLog) {
- lastLogPrintedTime.set(System.currentTimeMillis());
- }
+ final Optional<Logger> logger =
+ PipeConfigNodeResourceManager.log()
+ .schedule(
+ PipeConfigNodeTaskAgent.class,
+
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
@@ -227,20 +230,20 @@ public class PipeConfigNodeTaskAgent extends
PipeTaskAgent {
final long remainingEventCount =
PipeConfigRegionExtractorMetrics.getInstance()
.getRemainingEventCount(staticMeta.getPipeName(),
staticMeta.getCreationTime());
- final double remainingTime =
+ final double estimatedRemainingTime =
PipeConfigNodeRemainingTimeMetrics.getInstance()
.getRemainingTime(staticMeta.getPipeName(),
staticMeta.getCreationTime());
pipeRemainingEventCountList.add(remainingEventCount);
- pipeRemainingTimeList.add(remainingTime);
-
- if (shouldPrintLog) {
- LOGGER.info(
- "Reporting pipe meta: {}, remainingEventCount: {},
remainingTime: {}",
- pipeMeta.coreReportMessage(),
- remainingEventCount,
- remainingTime);
- }
+ pipeRemainingTimeList.add(estimatedRemainingTime);
+
+ logger.ifPresent(
+ l ->
+ l.info(
+ "Reporting pipe meta: {}, remainingEventCount: {},
estimatedRemainingTime: {}",
+ pipeMeta.coreReportMessage(),
+ remainingEventCount,
+ estimatedRemainingTime));
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (final IOException e) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 4489bf5b957..1f54b8745d5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -63,6 +63,10 @@ public class PipeHeartbeat {
}
}
+ public int getPipeMetaSize() {
+ return pipeMetaMap.size();
+ }
+
public PipeMeta getPipeMeta(final PipeStaticMeta pipeStaticMeta) {
return pipeMetaMap.get(pipeStaticMeta);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 790799cae72..18c964f06c1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -30,6 +32,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.slf4j.Logger;
@@ -153,7 +156,11 @@ public class PipeHeartbeatParser {
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
-
pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName());
+ pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
+ LOGGER.info(
+ "Detected completion of pipe {}, static meta: {}, remove it.",
+ staticMeta.getPipeName(),
+ staticMeta);
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(true);
continue;
@@ -193,20 +200,26 @@ public class PipeHeartbeatParser {
.getValue()
.getProgressIndex()
.equals(runtimeMetaFromAgent.getProgressIndex()))) {
- LOGGER.info(
- "Updating progress index for (pipe name: {}, consensus group id:
{}) ... "
- + "Progress index on coordinator: {}, progress index from
agent: {}",
- pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
- runtimeMetaFromCoordinator.getKey(),
- runtimeMetaFromCoordinator.getValue().getProgressIndex(),
- runtimeMetaFromAgent.getProgressIndex());
- LOGGER.info(
- "Progress index for (pipe name: {}, consensus group id: {}) is
updated to {}",
- pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
- runtimeMetaFromCoordinator.getKey(),
+ final ProgressIndex updatedProgressIndex =
runtimeMetaFromCoordinator
.getValue()
-
.updateProgressIndex(runtimeMetaFromAgent.getProgressIndex()));
+
.updateProgressIndex(runtimeMetaFromAgent.getProgressIndex());
+ PipeConfigNodeResourceManager.log()
+ .schedule(
+ PipeHeartbeatParser.class,
+
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeHeartbeat.getPipeMetaSize())
+ .ifPresent(
+ l ->
+ l.info(
+ "Updated progress index for (pipe name: {},
consensus group id: {}) ... "
+ + "Progress index on coordinator: {}, progress
index from agent: {}, updated progressIndex: {}",
+
pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
+ runtimeMetaFromCoordinator.getKey(),
+
runtimeMetaFromCoordinator.getValue().getProgressIndex(),
+ runtimeMetaFromAgent.getProgressIndex(),
+ updatedProgressIndex));
needWriteConsensusOnConfigNodes.set(true);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index 56c9d012c3c..40918e1b6a7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
-import
org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager;
+import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -101,12 +101,7 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
- super(
- pipeName,
- creationTime,
- pipeTaskMeta,
- pattern,
- PipeConfigNodeSnapshotResourceManager.getInstance());
+ super(pipeName, creationTime, pipeTaskMeta, pattern,
PipeConfigNodeResourceManager.snapshot());
this.snapshotPath = snapshotPath;
this.templateFilePath = Objects.nonNull(templateFilePath) ?
templateFilePath : "";
this.fileType = type;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
new file mode 100644
index 00000000000..3500d26e3c2
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.resource;
+
+import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
+import
org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager;
+
+public class PipeConfigNodeResourceManager {
+
+ private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
+ private final PipeLogManager pipeLogManager;
+
+ public static PipeSnapshotResourceManager snapshot() {
+ return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE
+ .pipeSnapshotResourceManager;
+ }
+
+ public static PipeLogManager log() {
+ return
PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager;
+ }
+
+ ///////////////////////////// SINGLETON /////////////////////////////
+
+ private PipeConfigNodeResourceManager() {
+ pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager();
+ pipeLogManager = new PipeLogManager();
+ }
+
+ private static class PipeResourceManagerHolder {
+ private static final PipeConfigNodeResourceManager INSTANCE =
+ new PipeConfigNodeResourceManager();
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
index 4ab5a540940..17769654560 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
@@ -27,16 +27,7 @@ import java.util.HashSet;
public class PipeConfigNodeSnapshotResourceManager extends
PipeSnapshotResourceManager {
- private PipeConfigNodeSnapshotResourceManager() {
+ public PipeConfigNodeSnapshotResourceManager() {
super(new
HashSet<>(Collections.singletonList(IoTDBConstant.CONSENSUS_FOLDER_NAME)));
}
-
- private static class PipeConfigNodeSnapshotResourceManagerHolder {
- private static final PipeConfigNodeSnapshotResourceManager INSTANCE =
- new PipeConfigNodeSnapshotResourceManager();
- }
-
- public static synchronized PipeConfigNodeSnapshotResourceManager
getInstance() {
- return PipeConfigNodeSnapshotResourceManagerHolder.INSTANCE;
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 843bea6d8e3..5996545461d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -41,6 +42,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePla
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
+import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
@@ -63,6 +65,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -154,7 +157,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private void checkBeforeCreatePipeInternal(TCreatePipeReq createPipeRequest)
+ private void checkBeforeCreatePipeInternal(final TCreatePipeReq
createPipeRequest)
throws PipeException {
if (!isPipeExisted(createPipeRequest.getPipeName())) {
return;
@@ -556,11 +559,20 @@ public class PipeTaskInfo implements SnapshotProcessor {
pipeMetaKeeper.clear();
+ // This method is only triggered by pipe sync / meta report currently
+ // And is guaranteed to print log finally
+ final Optional<Logger> logger =
+ PipeConfigNodeResourceManager.log()
+ .schedule(
+ PipeTaskInfo.class,
+ PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeMetaKeeper.getPipeMetaCount());
plan.getPipeMetaList()
.forEach(
pipeMeta -> {
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
- LOGGER.info("Recording pipe meta: {}", pipeMeta);
+ logger.ifPresent(l -> l.info("Recording pipe meta: {}",
pipeMeta));
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f1fd633c769..c91c76d36be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -47,7 +47,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInser
import
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskBuilder;
@@ -314,7 +314,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
final Optional<Logger> logger =
- PipeResourceManager.log()
+ PipeDataNodeResourceManager.log()
.schedule(
PipeDataNodeTaskAgent.class,
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
@@ -393,7 +393,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
final Optional<Logger> logger =
- PipeResourceManager.log()
+ PipeDataNodeResourceManager.log()
.schedule(
PipeDataNodeTaskAgent.class,
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
@@ -515,7 +515,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
private boolean mayDeletedTsFileSizeReachDangerousThreshold() {
try {
final long linkedButDeletedTsFileSize =
- PipeResourceManager.tsfile().getTotalLinkedButDeletedTsfileSize();
+
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileSize();
final double totalDisk =
MetricService.getInstance()
.getAutoGauge(
@@ -538,7 +538,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
private boolean mayMemTablePinnedCountReachDangerousThreshold() {
- return PipeResourceManager.wal().getPinnedWalCount()
+ return PipeDataNodeResourceManager.wal().getPinnedWalCount()
>= 10 *
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index 965a2a068bf..e45954e7e4d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.connector.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import com.github.benmanes.caffeine.cache.Cache;
@@ -51,15 +51,15 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
public LeaderCacheManager() {
final long initMemorySizeInBytes =
- PipeResourceManager.memory().getTotalMemorySizeInBytes() / 10;
+ PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() /
10;
final long maxMemorySizeInBytes =
(long)
- (PipeResourceManager.memory().getTotalMemorySizeInBytes()
+ (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
* CONFIG.getPipeLeaderCacheMemoryUsagePercentage());
// properties required by pipe memory control framework
final PipeMemoryBlock allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.tryAllocate(initMemorySizeInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
.setShrinkCallback(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 0b97487b04e..a2d7315ae0a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -61,7 +61,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
PipeTabletEventPlainBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
super(maxDelayInMs);
this.allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.tryAllocate(requestMaxBatchSizeInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
.setShrinkCallback(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index bf116c36798..cdecc272772 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -28,7 +28,7 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -88,7 +88,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.tryAllocate(requestMaxBatchSizeInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
.setShrinkCallback(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index 60bbf95feee..39bd51d5f53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -89,7 +89,7 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
- super(pipeName, creationTime, pipeTaskMeta, pattern,
PipeResourceManager.snapshot());
+ super(pipeName, creationTime, pipeTaskMeta, pattern,
PipeDataNodeResourceManager.snapshot());
this.mTreeSnapshotPath = mTreeSnapshotPath;
this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ?
tagLogSnapshotPath : "";
this.databaseName = databaseName;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index faba8482c4d..de3e39ed412 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -132,7 +132,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
try {
- PipeResourceManager.wal().pin(walEntryHandler);
+ PipeDataNodeResourceManager.wal().pin(walEntryHandler);
return true;
} catch (final Exception e) {
LOGGER.warn(
@@ -147,7 +147,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
try {
- PipeResourceManager.wal().unpin(walEntryHandler);
+ PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
// Release the containers' memory.
if (dataContainers != null) {
dataContainers.clear();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 9769299ae68..05227ce0408 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -109,7 +109,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
- allocatedMemoryBlock =
PipeResourceManager.memory().forceAllocateWithRetry(tablet);
+ allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet);
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index b7a883712df..05a5a6ff0f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -196,9 +196,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
try {
- tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile,
true, resource);
+ tsFile =
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true,
resource);
if (isWithMod) {
- modFile = PipeResourceManager.tsfile().increaseFileReference(modFile,
false, null);
+ modFile =
PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false,
null);
}
return true;
} catch (final Exception e) {
@@ -214,9 +214,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
@Override
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
try {
- PipeResourceManager.tsfile().decreaseFileReference(tsFile);
+ PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile);
if (isWithMod) {
- PipeResourceManager.tsfile().decreaseFileReference(modFile);
+ PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile);
}
return true;
} catch (final Exception e) {
@@ -291,7 +291,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
try {
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
- PipeResourceManager.tsfile()
+ PipeDataNodeResourceManager.tsfile()
.getDeviceIsAlignedMapFromCache(
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
final Set<IDeviceID> deviceSet =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 6366cf8bb95..4e324bb8afe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
@@ -107,7 +107,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
this.sourceEvent = sourceEvent;
try {
- final PipeTsFileResourceManager tsFileResourceManager =
PipeResourceManager.tsfile();
+ final PipeTsFileResourceManager tsFileResourceManager =
PipeDataNodeResourceManager.tsfile();
final Map<IDeviceID, List<String>> deviceMeasurementsMap;
// TsFileReader is not thread-safe, so we need to create it here and
close it later.
@@ -138,7 +138,8 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
}
- allocatedMemoryBlock =
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
+ allocatedMemoryBlock =
+
PipeDataNodeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
// Filter again to get the final deviceMeasurementsMap that exactly
matches the pattern.
deviceMeasurementsMapIterator =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 1f36e98bec0..34d331a0b6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -446,7 +446,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
// Pin the resource, in case the file is removed by compaction
or anything.
// Will unpin it after the PipeTsFileInsertionEvent is created
and pinned.
try {
- PipeResourceManager.tsfile().pinTsFileResource(resource,
shouldTransferModFile);
+ PipeDataNodeResourceManager.tsfile()
+ .pinTsFileResource(resource, shouldTransferModFile);
} catch (final IOException e) {
LOGGER.warn("Pipe: failed to pin TsFileResource {}",
resource.getTsFilePath());
}
@@ -507,7 +508,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final Set<IDeviceID> deviceSet;
try {
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
- PipeResourceManager.tsfile()
+ PipeDataNodeResourceManager.tsfile()
.getDeviceIsAlignedMapFromCache(
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
deviceSet =
@@ -595,7 +596,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
try {
- PipeResourceManager.tsfile().unpinTsFileResource(resource);
+ PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin TsFileResource after creating event,
original path: {}",
@@ -627,7 +628,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
pendingQueue.forEach(
resource -> {
try {
- PipeResourceManager.tsfile().unpinTsFileResource(resource);
+
PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin TsFileResource after dropping
pipe, original path: {}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 9854fa963a2..15a020cd577 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -30,7 +30,7 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -222,7 +222,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private boolean mayMemTablePinnedCountReachDangerousThreshold() {
- return PipeResourceManager.wal().getPinnedWalCount()
+ return PipeDataNodeResourceManager.wal().getPinnedWalCount()
>= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
}
@@ -240,7 +240,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private boolean mayTsFileLinkedCountReachDangerousThreshold() {
- return PipeResourceManager.tsfile().getLinkedTsfileCount()
+ return PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
index 3c738a73deb..0a576142a39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import com.lmax.disruptor.BlockingWaitStrategy;
@@ -48,14 +48,14 @@ public class DisruptorQueue {
private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
- public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
+ public DisruptorQueue(final EventHandler<PipeRealtimeEvent> eventHandler) {
final PipeConfig config = PipeConfig.getInstance();
final int ringBufferSize =
config.getPipeExtractorAssignerDisruptorRingBufferSize();
final long ringBufferEntrySizeInBytes =
config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.tryAllocate(
ringBufferSize * ringBufferEntrySizeInBytes, currentSize ->
currentSize / 2);
@@ -80,8 +80,8 @@ public class DisruptorQueue {
ringBuffer = disruptor.start();
}
- public void publish(PipeRealtimeEvent event) {
- EnrichedEvent internalEvent = event.getEvent();
+ public void publish(final PipeRealtimeEvent event) {
+ final EnrichedEvent internalEvent = event.getEvent();
if (internalEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
}
@@ -104,7 +104,7 @@ public class DisruptorQueue {
return event;
}
- public void setEvent(PipeRealtimeEvent event) {
+ public void setEvent(final PipeRealtimeEvent event) {
this.event = event;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
index 3c28e30e437..04fb647a825 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.metric;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
@@ -39,19 +39,19 @@ public class PipeResourceMetrics implements IMetricSet {
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
- public void bindTo(AbstractMetricService metricService) {
+ public void bindTo(final AbstractMetricService metricService) {
// pipe memory related
metricService.createAutoGauge(
Metric.PIPE_MEM.toString(),
MetricLevel.IMPORTANT,
- PipeResourceManager.memory(),
+ PipeDataNodeResourceManager.memory(),
PipeMemoryManager::getUsedMemorySizeInBytes,
Tag.NAME.toString(),
PIPE_USED_MEMORY);
metricService.createAutoGauge(
Metric.PIPE_MEM.toString(),
MetricLevel.IMPORTANT,
- PipeResourceManager.memory(),
+ PipeDataNodeResourceManager.memory(),
PipeMemoryManager::getTotalMemorySizeInBytes,
Tag.NAME.toString(),
PIPE_TOTAL_MEMORY);
@@ -59,17 +59,17 @@ public class PipeResourceMetrics implements IMetricSet {
metricService.createAutoGauge(
Metric.PIPE_PINNED_MEMTABLE_COUNT.toString(),
MetricLevel.IMPORTANT,
- PipeResourceManager.wal(),
+ PipeDataNodeResourceManager.wal(),
PipeWALResourceManager::getPinnedWalCount);
metricService.createAutoGauge(
Metric.PIPE_LINKED_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
- PipeResourceManager.tsfile(),
+ PipeDataNodeResourceManager.tsfile(),
PipeTsFileResourceManager::getLinkedTsfileCount);
}
@Override
- public void unbindFrom(AbstractMetricService metricService) {
+ public void unbindFrom(final AbstractMetricService metricService) {
// pipe memory related
metricService.remove(
MetricType.AUTO_GAUGE, Metric.PIPE_MEM.toString(),
Tag.NAME.toString(), PIPE_USED_MEMORY);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
index ece3963c6f4..3e423f5c437 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.processor.downsampling;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.utils.MemUtils;
@@ -42,7 +42,7 @@ public abstract class PartialPathLastObjectCache<T>
implements AutoCloseable {
protected PartialPathLastObjectCache(final long memoryLimitInBytes) {
allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.tryAllocate(memoryLimitInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
.setShrinkCallback(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
similarity index 93%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
index 9307b46a549..83901ddcfca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.resource;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
-import org.apache.iotdb.db.pipe.resource.log.PipeLogManager;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import
org.apache.iotdb.db.pipe.resource.snapshot.PipeDataNodeSnapshotResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
@@ -31,7 +31,7 @@ import
org.apache.iotdb.db.pipe.resource.wal.selfhost.PipeWALSelfHostResourceMan
import java.util.concurrent.atomic.AtomicReference;
-public class PipeResourceManager {
+public class PipeDataNodeResourceManager {
private final PipeTsFileResourceManager pipeTsFileResourceManager;
private final AtomicReference<PipeWALResourceManager> pipeWALResourceManager;
@@ -71,7 +71,7 @@ public class PipeResourceManager {
///////////////////////////// SINGLETON /////////////////////////////
- private PipeResourceManager() {
+ private PipeDataNodeResourceManager() {
pipeTsFileResourceManager = new PipeTsFileResourceManager();
pipeWALResourceManager = new AtomicReference<>();
pipeSnapshotResourceManager = new PipeDataNodeSnapshotResourceManager();
@@ -80,6 +80,6 @@ public class PipeResourceManager {
}
private static class PipeResourceManagerHolder {
- private static final PipeResourceManager INSTANCE = new
PipeResourceManager();
+ private static final PipeDataNodeResourceManager INSTANCE = new
PipeDataNodeResourceManager();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 06dfc0f64a8..8ebe90b388c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.resource.memory;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@ public class PipeMemoryBlock implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeMemoryBlock.class);
- private final PipeMemoryManager pipeMemoryManager =
PipeResourceManager.memory();
+ private final PipeMemoryManager pipeMemoryManager =
PipeDataNodeResourceManager.memory();
private final ReentrantLock lock = new ReentrantLock();
@@ -48,7 +48,7 @@ public class PipeMemoryBlock implements AutoCloseable {
private volatile boolean isReleased = false;
- public PipeMemoryBlock(long memoryUsageInBytes) {
+ public PipeMemoryBlock(final long memoryUsageInBytes) {
this.memoryUsageInBytes.set(memoryUsageInBytes);
}
@@ -56,26 +56,26 @@ public class PipeMemoryBlock implements AutoCloseable {
return memoryUsageInBytes.get();
}
- public void setMemoryUsageInBytes(long memoryUsageInBytes) {
+ public void setMemoryUsageInBytes(final long memoryUsageInBytes) {
this.memoryUsageInBytes.set(memoryUsageInBytes);
}
- public PipeMemoryBlock setShrinkMethod(LongUnaryOperator shrinkMethod) {
+ public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod)
{
this.shrinkMethod.set(shrinkMethod);
return this;
}
- public PipeMemoryBlock setShrinkCallback(BiConsumer<Long, Long>
shrinkCallback) {
+ public PipeMemoryBlock setShrinkCallback(final BiConsumer<Long, Long>
shrinkCallback) {
this.shrinkCallback.set(shrinkCallback);
return this;
}
- public PipeMemoryBlock setExpandMethod(LongUnaryOperator extendMethod) {
+ public PipeMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod)
{
this.expandMethod.set(extendMethod);
return this;
}
- public PipeMemoryBlock setExpandCallback(BiConsumer<Long, Long>
expandCallback) {
+ public PipeMemoryBlock setExpandCallback(final BiConsumer<Long, Long>
expandCallback) {
this.expandCallback.set(expandCallback);
return this;
}
@@ -180,7 +180,7 @@ public class PipeMemoryBlock implements AutoCloseable {
lock.unlock();
}
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while waiting for the lock.", e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 090af91fb3c..95be9281d5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.resource.tsfile;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -67,7 +67,9 @@ public class PipeTsFileResource implements AutoCloseable {
private Map<String, TSDataType> measurementDataTypeMap = null;
public PipeTsFileResource(
- File hardlinkOrCopiedFile, boolean isTsFile, TsFileResource
tsFileResource) {
+ final File hardlinkOrCopiedFile,
+ final boolean isTsFile,
+ final TsFileResource tsFileResource) {
this.hardlinkOrCopiedFile = hardlinkOrCopiedFile;
this.isTsFile = isTsFile;
this.tsFileResource = tsFileResource;
@@ -190,7 +192,7 @@ public class PipeTsFileResource implements AutoCloseable {
// Only allocate when pipe memory used is less than 50%, because memory
here
// is hard to shrink and may consume too much memory.
allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.forceAllocateIfSufficient(
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes(),
MEMORY_SUFFICIENT_THRESHOLD);
@@ -226,7 +228,7 @@ public class PipeTsFileResource implements AutoCloseable {
// Allocate again for the cached objects.
allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.forceAllocateIfSufficient(memoryRequiredInBytes,
MEMORY_SUFFICIENT_THRESHOLD);
if (allocatedMemoryBlock == null) {
LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 1cf6732786e..e40bafe03b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -81,7 +81,7 @@ public class PipeTsFileResourceManager {
final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
final Optional<Logger> logger =
- PipeResourceManager.log()
+ PipeDataNodeResourceManager.log()
.schedule(
PipeTsFileResourceManager.class,
PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 06b5d68ef96..9adb976c354 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.resource.wal;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
import org.slf4j.Logger;
@@ -43,15 +43,15 @@ public abstract class PipeWALResourceManager {
protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
private static final int SEGMENT_LOCK_COUNT = 32;
- private final ReentrantLock[] memtableIdSegmentLocks;
+ private final ReentrantLock[] memTableIdSegmentLocks;
protected PipeWALResourceManager() {
- // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple
threads
+ // memTableIdToPipeWALResourceMap can be concurrently accessed by multiple
threads
memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
- memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
+ memTableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
- memtableIdSegmentLocks[i] = new ReentrantLock();
+ memTableIdSegmentLocks[i] = new ReentrantLock();
}
PipeDataNodeAgent.runtime()
@@ -65,7 +65,7 @@ public abstract class PipeWALResourceManager {
final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
memtableIdToPipeWALResourceMap.entrySet().iterator();
final Optional<Logger> logger =
- PipeResourceManager.log()
+ PipeDataNodeResourceManager.log()
.schedule(
PipeWALResourceManager.class,
PipeConfig.getInstance().getPipeWalPinMaxLogNumPerRound(),
@@ -76,7 +76,7 @@ public abstract class PipeWALResourceManager {
while (iterator.hasNext()) {
final Map.Entry<Long, PipeWALResource> entry = iterator.next();
final ReentrantLock lock =
- memtableIdSegmentLocks[(int) (entry.getKey() %
SEGMENT_LOCK_COUNT)];
+ memTableIdSegmentLocks[(int) (entry.getKey() %
SEGMENT_LOCK_COUNT)];
lock.lock();
try {
@@ -94,7 +94,7 @@ public abstract class PipeWALResourceManager {
lock.unlock();
}
}
- } catch (ConcurrentModificationException e) {
+ } catch (final ConcurrentModificationException e) {
LOGGER.error(
"Concurrent modification issues happened, skipping the WAL in this
round of ttl check",
e);
@@ -102,34 +102,34 @@ public abstract class PipeWALResourceManager {
}
public final void pin(final WALEntryHandler walEntryHandler) throws
IOException {
- final long memtableId = walEntryHandler.getMemTableId();
- final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
+ final long memTableId = walEntryHandler.getMemTableId();
+ final ReentrantLock lock = memTableIdSegmentLocks[(int) (memTableId %
SEGMENT_LOCK_COUNT)];
lock.lock();
try {
- pinInternal(memtableId, walEntryHandler);
+ pinInternal(memTableId, walEntryHandler);
} finally {
lock.unlock();
}
}
- protected abstract void pinInternal(long memtableId, WALEntryHandler
walEntryHandler)
+ protected abstract void pinInternal(final long memTableId, final
WALEntryHandler walEntryHandler)
throws IOException;
public final void unpin(final WALEntryHandler walEntryHandler) throws
IOException {
- final long memtableId = walEntryHandler.getMemTableId();
- final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
+ final long memTableId = walEntryHandler.getMemTableId();
+ final ReentrantLock lock = memTableIdSegmentLocks[(int) (memTableId %
SEGMENT_LOCK_COUNT)];
lock.lock();
try {
- unpinInternal(memtableId, walEntryHandler);
+ unpinInternal(memTableId, walEntryHandler);
} finally {
lock.unlock();
}
}
- protected abstract void unpinInternal(long memtableId, WALEntryHandler
walEntryHandler)
- throws IOException;
+ protected abstract void unpinInternal(
+ final long memTableId, final WALEntryHandler walEntryHandler) throws
IOException;
public int getPinnedWalCount() {
return Objects.nonNull(memtableIdToPipeWALResourceMap)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
index acd64cf23b4..7570b83fc73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
@@ -36,15 +36,15 @@ import java.util.Map;
public class PipeWALHardlinkResourceManager extends PipeWALResourceManager {
@Override
- protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler)
{
+ protected void pinInternal(final long memTableId, final WALEntryHandler
walEntryHandler) {
memtableIdToPipeWALResourceMap
- .computeIfAbsent(memtableId, id -> new
PipeWALHardlinkResource(walEntryHandler, this))
+ .computeIfAbsent(memTableId, id -> new
PipeWALHardlinkResource(walEntryHandler, this))
.pin();
}
@Override
- protected void unpinInternal(long memtableId, WALEntryHandler
walEntryHandler) {
- memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+ protected void unpinInternal(final long memTableId, final WALEntryHandler
walEntryHandler) {
+ memtableIdToPipeWALResourceMap.get(memTableId).unpin();
}
//////////////////////////// hardlink related ////////////////////////////
@@ -64,7 +64,7 @@ public class PipeWALHardlinkResourceManager extends
PipeWALResourceManager {
* @return the hardlink
* @throws IOException when create hardlink failed
*/
- public synchronized File increaseFileReference(File file) throws IOException
{
+ public synchronized File increaseFileReference(final File file) throws
IOException {
// if the file is already a hardlink, just increase reference count and
return it
if (increaseReferenceIfExists(file.getPath())) {
return file;
@@ -83,17 +83,17 @@ public class PipeWALHardlinkResourceManager extends
PipeWALResourceManager {
return createHardlink(file, hardlink);
}
- private boolean increaseReferenceIfExists(String path) {
+ private boolean increaseReferenceIfExists(final String path) {
hardlinkToReferenceMap.computeIfPresent(path, (k, v) -> v + 1);
return hardlinkToReferenceMap.containsKey(path);
}
// TODO: Check me! Make sure the file is not a hardlink.
// TODO: IF user specify a wal by config, will the method work?
- private static File getHardlinkInPipeWALDir(File file) throws IOException {
+ private static File getHardlinkInPipeWALDir(final File file) throws
IOException {
try {
return new File(getPipeWALDirPath(file), getRelativeFilePath(file));
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new IOException(
String.format(
"failed to get hardlink in pipe dir " + "for file %s, it is not
a wal",
@@ -128,7 +128,8 @@ public class PipeWALHardlinkResourceManager extends
PipeWALResourceManager {
return builder.toString();
}
- private static File createHardlink(File sourceFile, File hardlink) throws
IOException {
+ private static File createHardlink(final File sourceFile, final File
hardlink)
+ throws IOException {
if (!hardlink.getParentFile().exists() &&
!hardlink.getParentFile().mkdirs()) {
throw new IOException(
String.format(
@@ -149,7 +150,7 @@ public class PipeWALHardlinkResourceManager extends
PipeWALResourceManager {
* @param hardlink the hardlinked file
* @throws IOException when delete file failed
*/
- public synchronized void decreaseFileReference(File hardlink) throws
IOException {
+ public synchronized void decreaseFileReference(final File hardlink) throws
IOException {
final Integer updatedReference =
hardlinkToReferenceMap.computeIfPresent(
hardlink.getPath(), (file, reference) -> reference - 1);
@@ -161,7 +162,7 @@ public class PipeWALHardlinkResourceManager extends
PipeWALResourceManager {
}
@TestOnly
- public synchronized int getFileReferenceCount(File hardlink) {
+ public synchronized int getFileReferenceCount(final File hardlink) {
return hardlinkToReferenceMap.getOrDefault(hardlink.getPath(), 0);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
index 94404eafbed..c7fe0accda2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
@@ -25,14 +25,14 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
public class PipeWALSelfHostResourceManager extends PipeWALResourceManager {
@Override
- protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler)
{
+ protected void pinInternal(final long memTableId, final WALEntryHandler
walEntryHandler) {
memtableIdToPipeWALResourceMap
- .computeIfAbsent(memtableId, id -> new
PipeWALSelfHostResource(walEntryHandler))
+ .computeIfAbsent(memTableId, id -> new
PipeWALSelfHostResource(walEntryHandler))
.pin();
}
@Override
- protected void unpinInternal(long memtableId, WALEntryHandler
walEntryHandler) {
- memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+ protected void unpinInternal(final long memTableId, final WALEntryHandler
walEntryHandler) {
+ memtableIdToPipeWALResourceMap.get(memTableId).unpin();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 38e010db7db..777545c738b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.metric.PipeWALInsertNodeCacheMetrics;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -78,7 +78,7 @@ public class WALInsertNodeCache {
(double) 2 * CONFIG.getWalFileSizeThresholdInByte(),
CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.tryAllocate(requestedAllocateSize)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
.setShrinkCallback(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
index 5f7ea9adc5c..b8bf26723c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.subscription.event;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -93,15 +93,15 @@ public class SubscriptionEventBinaryCache {
private SubscriptionEventBinaryCache() {
final long initMemorySizeInBytes =
- PipeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
+ PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
final long maxMemorySizeInBytes =
(long)
- (PipeResourceManager.memory().getTotalMemorySizeInBytes()
+ (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
*
PipeConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
// properties required by pipe memory control framework
this.allocatedMemoryBlock =
- PipeResourceManager.memory()
+ PipeDataNodeResourceManager.memory()
.tryAllocate(initMemorySizeInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
.setShrinkCallback(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
index e56027ca432..cf0971c9bb9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
@@ -57,7 +57,7 @@ public class PipeDataNodeSnapshotResourceManagerTest {
@Before
public void setUp() throws Exception {
- File testDir = new File(ROOT_DIR);
+ final File testDir = new File(ROOT_DIR);
if (testDir.exists()) {
FileUtils.deleteFileOrDirectory(testDir);
}
@@ -70,7 +70,7 @@ public class PipeDataNodeSnapshotResourceManagerTest {
@After
public void tearDown() throws Exception {
- File testDir = new File(ROOT_DIR);
+ final File testDir = new File(ROOT_DIR);
if (testDir.exists()) {
FileUtils.deleteFileOrDirectory(testDir);
}
@@ -79,37 +79,37 @@ public class PipeDataNodeSnapshotResourceManagerTest {
@Test
public void test() {
try {
- PipeResourceManager.snapshot()
+ PipeDataNodeResourceManager.snapshot()
.increaseSnapshotReference(CONSENSUS_SNAPSHOT_DIR + File.separator +
FILE);
- } catch (IOException e) {
+ } catch (final IOException e) {
Assert.fail();
}
Assert.assertEquals(
1,
- PipeResourceManager.snapshot()
+ PipeDataNodeResourceManager.snapshot()
.getSnapshotReferenceCount(PIPE_CONSENSUS_SNAPSHOT_DIR +
File.separator + FILE));
Assert.assertTrue(new File(PIPE_CONSENSUS_SNAPSHOT_DIR, FILE).exists());
- PipeResourceManager.snapshot()
+ PipeDataNodeResourceManager.snapshot()
.decreaseSnapshotReference(PIPE_CONSENSUS_SNAPSHOT_DIR +
File.separator + FILE);
Assert.assertEquals(
0,
- PipeResourceManager.snapshot()
+ PipeDataNodeResourceManager.snapshot()
.getSnapshotReferenceCount(PIPE_CONSENSUS_SNAPSHOT_DIR +
File.separator + FILE));
Assert.assertFalse(new File(PIPE_CONSENSUS_SNAPSHOT_DIR, FILE).exists());
try {
- PipeResourceManager.snapshot()
+ PipeDataNodeResourceManager.snapshot()
.increaseSnapshotReference(WRONG_SNAPSHOT_DIR + File.separator +
FILE);
Assert.fail();
- } catch (IOException e) {
+ } catch (final IOException e) {
}
Assert.assertEquals(
0,
- PipeResourceManager.snapshot()
+ PipeDataNodeResourceManager.snapshot()
.getSnapshotReferenceCount(PIPE_WRONG_SNAPSHOT_DIR +
File.separator + FILE));
Assert.assertFalse(new File(PIPE_WRONG_SNAPSHOT_DIR, FILE).exists());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
similarity index 88%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
index c7f6e445b6a..8e18fcfa546 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.resource.log;
+package org.apache.iotdb.commons.pipe.resource.log;
import org.slf4j.Logger;
@@ -31,7 +31,10 @@ public class PipeLogManager {
new ConcurrentHashMap<>();
public Optional<Logger> schedule(
- Class<?> logClass, int maxAverageScale, int maxLogInterval, int scale) {
+ final Class<?> logClass,
+ final int maxAverageScale,
+ final int maxLogInterval,
+ final int scale) {
return logClass2LogStatusMap
.computeIfAbsent(
logClass, k -> new PipeLogStatus(logClass, maxAverageScale,
maxLogInterval))
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
similarity index 87%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
index 67355dcc01a..9348708281f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.resource.log;
+package org.apache.iotdb.commons.pipe.resource.log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,14 +33,14 @@ class PipeLogStatus {
private final int maxLogInterval;
private final AtomicLong currentRounds = new AtomicLong(0);
- PipeLogStatus(Class<?> logClass, int maxAverageScale, int maxLogInterval) {
+ PipeLogStatus(final Class<?> logClass, final int maxAverageScale, final int
maxLogInterval) {
logger = LoggerFactory.getLogger(logClass);
this.maxAverageScale = maxAverageScale;
this.maxLogInterval = maxLogInterval;
}
- synchronized Optional<Logger> schedule(int scale) {
+ synchronized Optional<Logger> schedule(final int scale) {
if (currentRounds.incrementAndGet()
>= Math.min((int) Math.ceil((double) scale / maxAverageScale),
maxLogInterval)) {
currentRounds.set(0);