This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 08ab2c9dcde Pipe: Optimized the degrading logger & Deleted useless UT
& Copied some historical filter logic from dev/1.3 (#16019)
08ab2c9dcde is described below
commit 08ab2c9dcde2748165660a3308d058f63be1b80f
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 18:58:38 2025 +0800
Pipe: Optimized the degrading logger & Deleted useless UT & Copied some
historical filter logic from dev/1.3 (#16019)
* logger
* fix-ut
* Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
---
...oricalDataRegionTsFileAndDeletionExtractor.java | 22 +++------
.../PipeRealtimeDataRegionHybridExtractor.java | 14 ++++--
.../PipeDataNodeRemainingEventAndTimeOperator.java | 4 ++
.../iotdb/db/pipe/connector/PipeConnectorTest.java | 54 ----------------------
4 files changed, 22 insertions(+), 72 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index 7b765c143a2..b9d536db936 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -387,21 +388,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
}
}
- private void flushDataRegionAllTsFiles() {
- final DataRegion dataRegion =
- StorageEngine.getInstance().getDataRegion(new
DataRegionId(dataRegionId));
- if (Objects.isNull(dataRegion)) {
- return;
- }
-
- dataRegion.writeLock("Pipe: create historical TsFile extractor");
- try {
- dataRegion.syncCloseAllWorkingTsFileProcessors();
- } finally {
- dataRegion.writeUnlock();
- }
- }
-
/**
* IoTV2 will only resend event that contains un-replicated local write
data. So we only extract
* ProgressIndex containing local writes for comparison to prevent
misjudgment on whether
@@ -563,6 +549,9 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
// Some resource may not be closed due to the
control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
!resource.isClosed()
+ &&
Optional.ofNullable(resource.getProcessor())
+
.map(TsFileProcessor::alreadyMarkedClosing)
+ .orElse(true)
|| mayTsFileContainUnprocessedData(resource)
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
mayTsFileResourceOverlappedWithPattern(resource)))
@@ -585,6 +574,9 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
// Some resource may not be closed due to the
control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
!resource.isClosed()
+ &&
Optional.ofNullable(resource.getProcessor())
+
.map(TsFileProcessor::alreadyMarkedClosing)
+ .orElse(true)
|| mayTsFileContainUnprocessedData(resource)
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
mayTsFileResourceOverlappedWithPattern(resource)))
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 8ee2f7a15a1..f1ddff90c67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -30,6 +30,8 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.pipe.api.event.Event;
@@ -40,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import java.util.Optional;
public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegionExtractor {
@@ -218,13 +221,18 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
final boolean mayInsertNodeMemoryReachDangerousThreshold =
floatingMemoryUsageInByte * pipeCount >=
totalFloatingMemorySizeInBytes;
if (mayInsertNodeMemoryReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
+
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory
usage of the insert node {} has reached the dangerous threshold {}",
+ "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory
usage of the insert node {} has reached the dangerous threshold of single pipe
{}, event count: {}",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
- floatingMemoryUsageInByte * pipeCount,
- totalFloatingMemorySizeInBytes);
+ floatingMemoryUsageInByte,
+ totalFloatingMemorySizeInBytes / pipeCount,
+ Optional.ofNullable(operator)
+
.map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount)
+ .orElse(0));
}
return mayInsertNodeMemoryReachDangerousThreshold;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 0308e9b5b63..996b71cfc3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -116,6 +116,10 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
return remainingEvents >= 0 ? remainingEvents : 0;
}
+ public int getInsertNodeEventCount() {
+ return insertNodeEventCount.get();
+ }
+
long getRemainingEvents() {
final long remainingEvents =
tsfileEventCount.get()
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
index f06c5de1168..5d9e444da24 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionA
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.junit.Assert;
import org.junit.Test;
@@ -35,24 +34,6 @@ import java.util.HashMap;
public class PipeConnectorTest {
- @Test(expected = PipeParameterNotValidException.class)
- public void testIoTDBLegacyPipeConnectorToSelf() throws Exception {
- try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
- connector.validate(
- new PipeParameterValidator(
- new PipeParameters(
- new HashMap<String, String>() {
- {
- put(
- PipeConnectorConstant.CONNECTOR_KEY,
-
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
- put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY,
"127.0.0.1");
- put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY,
"6667");
- }
- })));
- }
- }
-
@Test
public void testIoTDBLegacyPipeConnectorToOthers() {
try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
@@ -73,24 +54,6 @@ public class PipeConnectorTest {
}
}
- @Test(expected = PipeParameterNotValidException.class)
- public void testIoTDBThriftSyncConnectorToSelf() throws Exception {
- try (IoTDBDataRegionSyncConnector connector = new
IoTDBDataRegionSyncConnector()) {
- connector.validate(
- new PipeParameterValidator(
- new PipeParameters(
- new HashMap<String, String>() {
- {
- put(
- PipeConnectorConstant.CONNECTOR_KEY,
-
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
- put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY,
"127.0.0.1");
- put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY,
"6667");
- }
- })));
- }
- }
-
@Test
public void testIoTDBThriftSyncConnectorToOthers() {
try (IoTDBDataRegionSyncConnector connector = new
IoTDBDataRegionSyncConnector()) {
@@ -111,23 +74,6 @@ public class PipeConnectorTest {
}
}
- @Test(expected = PipeParameterNotValidException.class)
- public void testIoTDBThriftAsyncConnectorToSelf() throws Exception {
- try (IoTDBDataRegionAsyncConnector connector = new
IoTDBDataRegionAsyncConnector()) {
- connector.validate(
- new PipeParameterValidator(
- new PipeParameters(
- new HashMap<String, String>() {
- {
- put(
- PipeConnectorConstant.CONNECTOR_KEY,
-
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
- put(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY,
"127.0.0.1:6667");
- }
- })));
- }
- }
-
@Test
public void testIoTDBThriftAsyncConnectorToOthers() {
try (IoTDBDataRegionAsyncConnector connector = new
IoTDBDataRegionAsyncConnector()) {