This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 3d08001d793 Pipe: Fixed the deadlock cause by terminate event
reporting & Added sink.batch.max-delay-ms to enable delicate control over batch
delay & Fixed the bug of premature halt in extractor snapshot mode cause by
real-time-first transfer & Stabilized the trigger of default batch sending &
Added "isNeedToReport" getter in PipeRawTabletInsertionEvent for user-defined
plugins & Reduce logs from PipeEventCommitter (#15377) + Fix batch type can not
be changed & Enhance close() in [...]
3d08001d793 is described below
commit 3d08001d793cc4f34d8b8b854b08a42cd04499a7
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 10:41:20 2025 +0800
Pipe: Fixed the deadlock cause by terminate event reporting & Added
sink.batch.max-delay-ms to enable delicate control over batch delay & Fixed the
bug of premature halt in extractor snapshot mode cause by real-time-first
transfer & Stabilized the trigger of default batch sending & Added
"isNeedToReport" getter in PipeRawTabletInsertionEvent for user-defined plugins
& Reduce logs from PipeEventCommitter (#15377) + Fix batch type can not be
changed & Enhance close() in PipeTsFileResour [...]
* Pipe: Fixed the deadlock cause by terminate event reporting & Added
sink.batch.max-delay-ms to enable delicate control over batch delay & Fixed the
bug of premature halt in extractor snapshot mode cause by real-time-first
transfer & Stabilized the trigger of default batch sending & Added
"isNeedToReport" getter in PipeRawTabletInsertionEvent for user-defined plugins
& Reduce logs from PipeEventCommitter (#15377)
* Pipe: Fix batch type can not be changed (Introduced in #15377) & Enhance
close() in PipeTsFileResource (#15401)
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../it/dual/IoTDBSubscriptionTopicIT.java | 6 ++--
.../task/builder/PipeDataNodeTaskBuilder.java | 40 +++++++++++++++++----
.../subtask/connector/PipeConnectorSubtask.java | 13 ++++---
.../batch/PipeTransferBatchReqBuilder.java | 41 +++++++++++-----------
.../PipeConsensusTransferBatchReqBuilder.java | 19 +++++++---
.../common/tablet/PipeRawTabletInsertionEvent.java | 5 +++
.../event/common/terminate/PipeTerminateEvent.java | 6 +++-
.../PipeHistoricalDataRegionTsFileExtractor.java | 21 ++---------
.../pipe/resource/tsfile/PipeTsFileResource.java | 14 ++++++--
.../resource/tsfile/PipeTsFileResourceManager.java | 2 +-
.../agent/task/progress/PipeEventCommitter.java | 28 ++++++++-------
.../config/constant/PipeConnectorConstant.java | 3 ++
.../pipe/connector/protocol/IoTDBConnector.java | 10 ++++++
13 files changed, 131 insertions(+), 77 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 6010430fe00..723090d433e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -833,7 +833,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
final String topicName = "topic11";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
- try (final SubscriptionTreeSession session = new
SubscriptionTreeSession(host, port)) {
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
@@ -848,8 +848,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
final Thread thread =
new Thread(
() -> {
- try (final SubscriptionTreePullConsumer consumer =
- new SubscriptionTreePullConsumer.Builder()
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 1b517419c98..4b78fc49c30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -51,6 +51,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
public class PipeDataNodeTaskBuilder {
@@ -171,14 +176,25 @@ public class PipeDataNodeTaskBuilder {
private void checkConflict(
final PipeParameters extractorParameters, final PipeParameters
connectorParameters) {
+ final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
+ final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
try {
- final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+ insertionDeletionListeningOptionPair =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters);
- if (!insertionDeletionListeningOptionPair.right) {
+
+ final String extractorModeValue =
+ extractorParameters.getStringOrDefault(
+ Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
+ shouldTerminatePipeOnAllHistoricalEventsConsumed =
+ extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+ ||
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+
+ if (!insertionDeletionListeningOptionPair.right
+ && !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
return;
}
- } catch (IllegalPathException e) {
+ } catch (final IllegalPathException e) {
LOGGER.warn(
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion'
parameters: {}",
e.getMessage(),
@@ -192,14 +208,24 @@ public class PipeDataNodeTaskBuilder {
PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
if (isRealtime == null) {
connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
"false");
- LOGGER.info(
- "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete',
'realtime-first' is defaulted to 'false' to prevent sync issues after
deletion.");
+ if (insertionDeletionListeningOptionPair.right) {
+ LOGGER.info(
+ "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete',
'realtime-first' is defaulted to 'false' to prevent sync issues after
deletion.");
+ } else {
+ LOGGER.info(
+ "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' is defaulted to 'false' to prevent premature halt before
transfer completion.");
+ }
return;
}
if (isRealtime) {
- LOGGER.warn(
- "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete',
'realtime-first' set to 'true' may result in data synchronization issues after
deletion.");
+ if (insertionDeletionListeningOptionPair.right) {
+ LOGGER.warn(
+ "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete',
'realtime-first' set to 'true' may result in data synchronization issues after
deletion.");
+ } else {
+ LOGGER.warn(
+ "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' set to 'true' may cause prevent premature halt before transfer
completion.");
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 18673931a96..5e77505186c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
+
public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorSubtask.class);
@@ -102,11 +104,12 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
}
try {
- if (event == null) {
- if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
- > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
- transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
- }
+ if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
+ > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
+ transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
+ }
+
+ if (Objects.isNull(event)) {
return false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 74ab3a1ebdc..50160ba61f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -45,6 +45,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
@@ -54,6 +55,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
@@ -87,34 +89,31 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
- final int requestMaxDelayInSeconds;
- if (usingTsFileBatch) {
- requestMaxDelayInSeconds =
+ final Integer requestMaxDelayInMillis =
+ parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY,
SINK_IOTDB_BATCH_DELAY_MS_KEY);
+ if (Objects.isNull(requestMaxDelayInMillis)) {
+ final int requestMaxDelayInSeconds =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
- CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE);
+ usingTsFileBatch
+ ? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE
+ : CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
requestMaxDelayInMs =
requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE :
requestMaxDelayInSeconds * 1000;
- requestMaxBatchSizeInBytes =
- parameters.getLongOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
- CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE);
- this.defaultBatch =
- new PipeTabletEventTsFileBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
} else {
- requestMaxDelayInSeconds =
- parameters.getIntOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
- CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
requestMaxDelayInMs =
- requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE :
requestMaxDelayInSeconds * 1000;
- requestMaxBatchSizeInBytes =
- parameters.getLongOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
- CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
- this.defaultBatch =
- new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
+ requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE :
requestMaxDelayInMillis;
}
+ requestMaxBatchSizeInBytes =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
+ usingTsFileBatch
+ ? CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE
+ : CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
+ this.defaultBatch =
+ usingTsFileBatch
+ ? new PipeTabletEventTsFileBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes)
+ : new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index cdecc272772..870cea0ee87 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -47,10 +47,12 @@ import java.util.List;
import java.util.Objects;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
public abstract class PipeConsensusTransferBatchReqBuilder implements
AutoCloseable {
@@ -73,11 +75,18 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
protected PipeConsensusTransferBatchReqBuilder(
PipeParameters parameters, TConsensusGroupId consensusGroupId, int
thisDataNodeId) {
- maxDelayInMs =
- parameters.getIntOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
- CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE)
- * 1000;
+ final Integer requestMaxDelayInMillis =
+ parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY,
SINK_IOTDB_BATCH_DELAY_MS_KEY);
+ if (Objects.isNull(requestMaxDelayInMillis)) {
+ final int requestMaxDelayInSeconds =
+ parameters.getIntOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
+ CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
+ maxDelayInMs =
+ requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE :
requestMaxDelayInSeconds * 1000;
+ } else {
+ maxDelayInMs = requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE :
requestMaxDelayInMillis;
+ }
this.consensusGroupId = consensusGroupId;
this.thisDataNodeId = thisDataNodeId;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index f7884076f26..ae9207c6343 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -231,6 +231,11 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent
this.needToReport = true;
}
+ // This getter is reserved for user-defined plugins
+ public boolean isNeedToReport() {
+ return needToReport;
+ }
+
public String getDeviceId() {
// NonNull indicates that the internallyDecreaseResourceReferenceCount has
not been called.
return Objects.nonNull(tablet) ? tablet.deviceId : deviceId;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index dbe8aada88d..1e3c9e4faf9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import java.util.concurrent.CompletableFuture;
+
/**
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls
the termination of pipe,
* that is, when the historical {@link PipeTsFileInsertionEvent}s are all
processed, this will be
@@ -91,7 +93,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
@Override
public void reportProgress() {
- PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId);
+ // To avoid deadlock
+ CompletableFuture.runAsync(
+ () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index dba9561105a..753075f3b20 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressInde
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
@@ -118,7 +117,6 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private boolean shouldExtractInsertion;
private boolean shouldTransferModFile; // Whether to transfer mods
- private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
private boolean isTerminateSignalSent = false;
private volatile boolean hasBeenStarted = false;
@@ -329,19 +327,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
|| // Should extract deletion
listeningOptionPair.getRight());
- final String extractorModeValue =
- parameters.getStringOrDefault(
- Arrays.asList(
- PipeExtractorConstant.EXTRACTOR_MODE_KEY,
PipeExtractorConstant.SOURCE_MODE_KEY),
- PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
- shouldTerminatePipeOnAllHistoricalEventsConsumed =
-
extractorModeValue.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE)
- || extractorModeValue.equalsIgnoreCase(
- PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE);
-
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
- "Pipe {}@{}: historical data extraction time range, start time
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should
transfer mod file {}, should terminate pipe on all historical events consumed
{}",
+ "Pipe {}@{}: historical data extraction time range, start time
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should
transfer mod file {}",
pipeName,
dataRegionId,
DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
@@ -350,8 +338,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
historicalDataExtractionEndTime,
sloppyPattern,
sloppyTimeRange,
- shouldTransferModFile,
- shouldTerminatePipeOnAllHistoricalEventsConsumed);
+ shouldTransferModFile);
}
}
@@ -672,9 +659,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
// If the pendingQueue is null when the function is called, it implies
that the extractor only
// extracts deletion thus the historical event has nothing to consume.
return hasBeenStarted
- && (Objects.isNull(pendingQueue)
- || pendingQueue.isEmpty()
- && (!shouldTerminatePipeOnAllHistoricalEventsConsumed ||
isTerminateSignalSent));
+ && (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() &&
isTerminateSignalSent);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index dd3ca9c5bc0..bf789b9732d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -122,7 +122,7 @@ public class PipeTsFileResource implements AutoCloseable {
return finalReferenceCount;
}
- public synchronized boolean closeIfOutOfTimeToLive() throws IOException {
+ public synchronized boolean closeIfOutOfTimeToLive() {
if (referenceCount.get() <= 0
&& (deviceMeasurementsMap == null // Not cached yet.
|| System.currentTimeMillis() - lastUnpinToZeroTime.get()
@@ -135,7 +135,7 @@ public class PipeTsFileResource implements AutoCloseable {
}
@Override
- public synchronized void close() throws IOException {
+ public synchronized void close() {
if (deviceMeasurementsMap != null) {
deviceMeasurementsMap = null;
}
@@ -153,7 +153,15 @@ public class PipeTsFileResource implements AutoCloseable {
allocatedMemoryBlock = null;
}
- Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
+ try {
+ Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
+ } catch (final Exception e) {
+ LOGGER.error(
+ "PipeTsFileResource: Failed to delete tsfile {} when closing,
because {}. Please MANUALLY delete it.",
+ hardlinkOrCopiedFile,
+ e.getMessage(),
+ e);
+ }
LOGGER.info("PipeTsFileResource: Closed tsfile {} and cleaned up.",
hardlinkOrCopiedFile);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 19de7a107be..d00318ae561 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -104,7 +104,7 @@ public class PipeTsFileResourceManager {
entry.getValue().getReferenceCount(),
entry.getValue().getFileSize()));
}
- } catch (final IOException e) {
+ } catch (final Exception e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ",
e);
} finally {
segmentLock.unlock(new File(hardlinkOrCopiedFile));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
index ba61f03f841..9479c7a752a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
@@ -62,19 +62,21 @@ public class PipeEventCommitter {
final int commitQueueSizeBeforeCommit = commitQueue.size();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
- committerKey,
- event.getCommitId(),
- lastCommitId.get(),
- commitQueueSizeBeforeCommit);
- } else if (commitQueueSizeBeforeCommit != 0 && commitQueueSizeBeforeCommit
% 100 == 0) {
- LOGGER.info(
- "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
- committerKey,
- event.getCommitId(),
- lastCommitId.get(),
- commitQueueSizeBeforeCommit);
+ if (commitQueueSizeBeforeCommit != 0 && commitQueueSizeBeforeCommit %
100 == 0) {
+ LOGGER.info(
+ "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
+ committerKey,
+ event.getCommitId(),
+ lastCommitId.get(),
+ commitQueueSizeBeforeCommit);
+ } else {
+ LOGGER.debug(
+ "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
+ committerKey,
+ event.getCommitId(),
+ lastCommitId.get(),
+ commitQueueSizeBeforeCommit);
+ }
}
while (!commitQueue.isEmpty()) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 13a9cd8a7f9..8a04d133d60 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -68,6 +68,9 @@ public class PipeConnectorConstant {
public static final int CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE = 1;
public static final int CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE =
5;
+ public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY =
"connector.batch.max-delay-ms";
+ public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY =
"sink.batch.max-delay-ms";
+
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
public static final String SINK_IOTDB_BATCH_SIZE_KEY =
"sink.batch.size-bytes";
public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = 16
* MB;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 9dc7bac704a..6af18db77e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -72,6 +72,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
@@ -106,6 +108,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
@@ -201,6 +205,12 @@ public abstract class IoTDBConnector implements
PipeConnector {
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
false);
+ // Check coexistence of batch.max-delay-ms and batch.max-delay-seconds
+ validator.validateSynonymAttributes(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY,
SINK_IOTDB_BATCH_DELAY_MS_KEY),
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
+ false);
+
username =
parameters.getStringOrDefault(
Arrays.asList(