This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 411b8e9d273 [To dev/1.3] Pipe: Add retry when TsFile parsing failed to
avoid race among processor threads (#15624, #15644) (#15659)
411b8e9d273 is described below
commit 411b8e9d27314039a03b8d13d8a7a1400b11401f
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 6 16:59:48 2025 +0800
[To dev/1.3] Pipe: Add retry when TsFile parsing failed to avoid race among
processor threads (#15624, #15644) (#15659)
* Pipe: Add retry when TsFile parsing failed to avoid race among processor
threads (#15624)
(cherry picked from commit 7ad757f58a2cc5b5edb41766ff655b31328c3e80)
* Pipe: Add retry when TsFile parsing failed to avoid race among processor
threads (#15624, #15644)
* refactor
* refactor
* refactor
(cherry picked from commit c28e50f7afadc0de48bb7e7cb19a4be9398979a3)
---
.../agent/task/connection/PipeEventCollector.java | 6 +-
.../protocol/websocket/WebSocketConnector.java | 13 +++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 65 +++++++++++++++++++---
.../processor/aggregate/AggregateProcessor.java | 23 +++++++-
.../downsampling/DownSamplingProcessor.java | 25 +++++++--
.../batch/SubscriptionPipeTsFileEventBatch.java | 31 ++++++-----
6 files changed, 123 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index fd863b2ab8a..66bc5ab2a50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtracto
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
@@ -141,9 +140,8 @@ public class PipeEventCollector implements EventCollector {
}
try {
- for (final TabletInsertionEvent parsedEvent :
sourceEvent.toTabletInsertionEvents()) {
- collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent);
- }
+ sourceEvent.consumeTabletInsertionEventsWithRetry(
+ this::collectParsedRawTableEvent,
"PipeEventCollector::parseAndCollectEvent");
} finally {
sourceEvent.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index cf258452346..92436366511 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -139,11 +139,14 @@ public class WebSocketConnector implements PipeConnector {
}
try {
- for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- // Skip report if any tablet events is added
- ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
- transfer(event);
- }
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+ .consumeTabletInsertionEventsWithRetry(
+ event -> {
+ // Skip report if any tablet events is added
+ ((PipeTsFileInsertionEvent)
tsFileInsertionEvent).skipReportOnCommit();
+ transfer(event);
+ },
+ "WebSocketConnector::transfer");
} finally {
tsFileInsertionEvent.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index b648af30e6f..a2843392636 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
@@ -50,11 +51,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class PipeTsFileInsertionEvent extends EnrichedEvent
@@ -413,6 +416,49 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
/////////////////////////// TsFileInsertionEvent ///////////////////////////
+ @FunctionalInterface
+ public interface TabletInsertionEventConsumer {
+ void consume(final PipeRawTabletInsertionEvent event);
+ }
+
+ public void consumeTabletInsertionEventsWithRetry(
+ final TabletInsertionEventConsumer consumer, final String callerName)
throws PipeException {
+ final Iterable<TabletInsertionEvent> iterable = toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+ int tabletEventCount = 0;
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ tabletEventCount++;
+ int retryCount = 0;
+ while (true) {
+ // If failed due do insufficient memory, retry until success to avoid
race among multiple
+ // processor threads
+ try {
+ consumer.consume((PipeRawTabletInsertionEvent) parsedEvent);
+ break;
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ if (retryCount++ % 100 == 0) {
+ LOGGER.warn(
+ "{}: failed to allocate memory for parsing TsFile {}, tablet
event no. {}, retry count is {}, will keep retrying.",
+ callerName,
+ getTsFile(),
+ tabletEventCount,
+ retryCount,
+ e);
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "{}: failed to allocate memory for parsing TsFile {}, tablet
event no. {}, retry count is {}, will keep retrying.",
+ callerName,
+ getTsFile(),
+ tabletEventCount,
+ retryCount,
+ e);
+ }
+ }
+ }
+ }
+ }
+
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws
PipeException {
// 20 - 40 seconds for waiting
@@ -528,18 +574,19 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent
}
public long count(final boolean skipReportOnCommit) throws IOException {
- long count = 0;
+ AtomicLong count = new AtomicLong();
if (shouldParseTime()) {
try {
- for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
- final PipeRawTabletInsertionEvent rawEvent =
((PipeRawTabletInsertionEvent) event);
- count += rawEvent.count();
- if (skipReportOnCommit) {
- rawEvent.skipReportOnCommit();
- }
- }
- return count;
+ consumeTabletInsertionEventsWithRetry(
+ event -> {
+ count.addAndGet(event.count());
+ if (skipReportOnCommit) {
+ event.skipReportOnCommit();
+ }
+ },
+ "PipeTsFileInsertionEvent::count");
+ return count.get();
} finally {
close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 157c6e2103d..dc5a7e4390f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -512,9 +512,26 @@ public class AggregateProcessor implements PipeProcessor {
final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector
eventCollector)
throws Exception {
try {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- process(tabletInsertionEvent, eventCollector);
+ if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+ .consumeTabletInsertionEventsWithRetry(
+ event -> {
+ try {
+ process(event, eventCollector);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ },
+ "AggregateProcessor::process");
+ if (ex.get() != null) {
+ throw ex.get();
+ }
+ } else {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ process(tabletInsertionEvent, eventCollector);
+ }
}
} finally {
tsFileInsertionEvent.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
index fd631772b93..a8e0c270570 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.access.Row;
@@ -45,7 +46,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
public abstract class DownSamplingProcessor implements PipeProcessor {
-
protected long memoryLimitInBytes;
protected boolean shouldSplitFile;
@@ -149,9 +149,26 @@ public abstract class DownSamplingProcessor implements
PipeProcessor {
throws Exception {
if (shouldSplitFile) {
try {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- process(tabletInsertionEvent, eventCollector);
+ if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+ .consumeTabletInsertionEventsWithRetry(
+ event -> {
+ try {
+ process(event, eventCollector);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ },
+ "DownSamplingProcessor::process");
+ if (ex.get() != null) {
+ throw ex.get();
+ }
+ } else {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ process(tabletInsertionEvent, eventCollector);
+ }
}
} finally {
tsFileInsertionEvent.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 7b1a6caf610..15c93d7a5e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
-import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -95,20 +94,22 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
// TODO: parse tsfile event on the fly like
SubscriptionPipeTabletEventBatch
try {
- for (final TabletInsertionEvent parsedEvent :
event.toTabletInsertionEvents()) {
- if (!((PipeRawTabletInsertionEvent) parsedEvent)
- .increaseReferenceCount(this.getClass().getName())) {
- LOGGER.warn(
- "SubscriptionPipeTsFileEventBatch: Failed to increase the
reference count of event {}, skipping it.",
- ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
- } else {
- try {
- batch.onEvent(parsedEvent);
- } catch (final Exception ignored) {
- // no exceptions will be thrown
- }
- }
- }
+ ((PipeTsFileInsertionEvent) event)
+ .consumeTabletInsertionEventsWithRetry(
+ event1 -> {
+ if (!event1.increaseReferenceCount(this.getClass().getName()))
{
+ LOGGER.warn(
+ "SubscriptionPipeTsFileEventBatch: Failed to increase
the reference count of event {}, skipping it.",
+ event1.coreReportMessage());
+ } else {
+ try {
+ batch.onEvent(event1);
+ } catch (final Exception ignored) {
+ // no exceptions will be thrown
+ }
+ }
+ },
+ "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent");
} finally {
try {
event.close();