This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c28e50f7afa Pipe: Add retry when TsFile parsing failed to avoid race
among processor threads (follow up #15624) (#15644)
c28e50f7afa is described below
commit c28e50f7afadc0de48bb7e7cb19a4be9398979a3
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 6 10:29:21 2025 +0800
Pipe: Add retry when TsFile parsing failed to avoid race among processor
threads (follow up #15624) (#15644)
* Pipe: Add retry when TsFile parsing failed to avoid race among processor
threads
* refactor
* refactor
* refactor
---
.../agent/task/connection/PipeEventCollector.java | 31 +----------
.../subtask/processor/PipeProcessorSubtask.java | 15 ++++-
.../protocol/websocket/WebSocketConnector.java | 13 +++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 65 +++++++++++++++++++---
.../processor/aggregate/AggregateProcessor.java | 23 +++++++-
.../downsampling/DownSamplingProcessor.java | 25 +++++++--
.../batch/SubscriptionPipeTsFileEventBatch.java | 31 ++++++-----
7 files changed, 135 insertions(+), 68 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index a78c4e2e4ef..e64248a5797 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.agent.task.connection;
-import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
@@ -36,13 +35,11 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDele
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
public class PipeEventCollector implements EventCollector {
@@ -144,32 +141,8 @@ public class PipeEventCollector implements EventCollector {
}
try {
- final Iterable<TabletInsertionEvent> iterable =
sourceEvent.toTabletInsertionEvents();
- final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
- while (iterator.hasNext()) {
- final TabletInsertionEvent parsedEvent = iterator.next();
- int retryCount = 0;
- while (true) {
- try {
- collectParsedRawTableEvent((PipeRawTabletInsertionEvent)
parsedEvent);
- break;
- } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
- if (retryCount++ % 100 == 0) {
- LOGGER.warn(
- "parseAndCollectEvent: failed to allocate memory for parsing
TsFile {}, retry count is {}, will keep retrying.",
- sourceEvent.getTsFile(),
- retryCount,
- e);
- } else if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "parseAndCollectEvent: failed to allocate memory for parsing
TsFile {}, retry count is {}, will keep retrying.",
- sourceEvent.getTsFile(),
- retryCount,
- e);
- }
- }
- }
- }
+ sourceEvent.consumeTabletInsertionEventsWithRetry(
+ this::collectParsedRawTableEvent,
"PipeEventCollector::parseAndCollectEvent");
} finally {
sourceEvent.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 4b11ef97285..1f7262c0c16 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -149,9 +149,18 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
&& ((PipeTsFileInsertionEvent) event).shouldParse4Privilege()) {
try (final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) event) {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- pipeProcessor.process(tabletInsertionEvent,
outputEventCollector);
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ tsFileInsertionEvent.consumeTabletInsertionEventsWithRetry(
+ event1 -> {
+ try {
+ pipeProcessor.process(event1, outputEventCollector);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ },
+ "PipeProcessorSubtask::executeOnce");
+ if (ex.get() != null) {
+ throw ex.get();
}
}
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index a9ed5cb46a7..57c51af161e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -141,11 +141,14 @@ public class WebSocketConnector implements PipeConnector {
}
try {
- for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- // Skip report if any tablet events is added
- ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
- transfer(event);
- }
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+ .consumeTabletInsertionEventsWithRetry(
+ event -> {
+ // Skip report if any tablet events is added
+ ((PipeTsFileInsertionEvent)
tsFileInsertionEvent).skipReportOnCommit();
+ transfer(event);
+ },
+ "WebSocketConnector::transfer");
} finally {
tsFileInsertionEvent.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 2c3cfd10cf2..55f40750662 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
@@ -55,11 +56,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
@@ -562,6 +565,49 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
/////////////////////////// TsFileInsertionEvent ///////////////////////////
+ @FunctionalInterface
+ public interface TabletInsertionEventConsumer {
+ void consume(final PipeRawTabletInsertionEvent event);
+ }
+
+ public void consumeTabletInsertionEventsWithRetry(
+ final TabletInsertionEventConsumer consumer, final String callerName)
throws PipeException {
+ final Iterable<TabletInsertionEvent> iterable = toTabletInsertionEvents();
+ final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+ int tabletEventCount = 0;
+ while (iterator.hasNext()) {
+ final TabletInsertionEvent parsedEvent = iterator.next();
+ tabletEventCount++;
+ int retryCount = 0;
+ while (true) {
+ // If failed due do insufficient memory, retry until success to avoid
race among multiple
+ // processor threads
+ try {
+ consumer.consume((PipeRawTabletInsertionEvent) parsedEvent);
+ break;
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ if (retryCount++ % 100 == 0) {
+ LOGGER.warn(
+ "{}: failed to allocate memory for parsing TsFile {}, tablet
event no. {}, retry count is {}, will keep retrying.",
+ callerName,
+ getTsFile(),
+ tabletEventCount,
+ retryCount,
+ e);
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "{}: failed to allocate memory for parsing TsFile {}, tablet
event no. {}, retry count is {}, will keep retrying.",
+ callerName,
+ getTsFile(),
+ tabletEventCount,
+ retryCount,
+ e);
+ }
+ }
+ }
+ }
+ }
+
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws
PipeException {
// 20 - 40 seconds for waiting
@@ -685,18 +731,19 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
}
public long count(final boolean skipReportOnCommit) throws IOException {
- long count = 0;
+ AtomicLong count = new AtomicLong();
if (shouldParseTime()) {
try {
- for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
- final PipeRawTabletInsertionEvent rawEvent =
((PipeRawTabletInsertionEvent) event);
- count += rawEvent.count();
- if (skipReportOnCommit) {
- rawEvent.skipReportOnCommit();
- }
- }
- return count;
+ consumeTabletInsertionEventsWithRetry(
+ event -> {
+ count.addAndGet(event.count());
+ if (skipReportOnCommit) {
+ event.skipReportOnCommit();
+ }
+ },
+ "PipeTsFileInsertionEvent::count");
+ return count.get();
} finally {
close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index ec1683358d4..1119deaf712 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -524,9 +524,26 @@ public class AggregateProcessor implements PipeProcessor {
final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector
eventCollector)
throws Exception {
try {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- process(tabletInsertionEvent, eventCollector);
+ if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+ .consumeTabletInsertionEventsWithRetry(
+ event -> {
+ try {
+ process(event, eventCollector);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ },
+ "AggregateProcessor::process");
+ if (ex.get() != null) {
+ throw ex.get();
+ }
+ } else {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ process(tabletInsertionEvent, eventCollector);
+ }
}
} finally {
tsFileInsertionEvent.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
index fd631772b93..a8e0c270570 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.access.Row;
@@ -45,7 +46,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
public abstract class DownSamplingProcessor implements PipeProcessor {
-
protected long memoryLimitInBytes;
protected boolean shouldSplitFile;
@@ -149,9 +149,26 @@ public abstract class DownSamplingProcessor implements
PipeProcessor {
throws Exception {
if (shouldSplitFile) {
try {
- for (final TabletInsertionEvent tabletInsertionEvent :
- tsFileInsertionEvent.toTabletInsertionEvents()) {
- process(tabletInsertionEvent, eventCollector);
+ if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+ .consumeTabletInsertionEventsWithRetry(
+ event -> {
+ try {
+ process(event, eventCollector);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ },
+ "DownSamplingProcessor::process");
+ if (ex.get() != null) {
+ throw ex.get();
+ }
+ } else {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ process(tabletInsertionEvent, eventCollector);
+ }
}
} finally {
tsFileInsertionEvent.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index e6328a39ef4..d8c68d8ec2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
-import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -96,20 +95,22 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
// TODO: parse tsfile event on the fly like
SubscriptionPipeTabletEventBatch
try {
- for (final TabletInsertionEvent parsedEvent :
event.toTabletInsertionEvents()) {
- if (!((PipeRawTabletInsertionEvent) parsedEvent)
- .increaseReferenceCount(this.getClass().getName())) {
- LOGGER.warn(
- "SubscriptionPipeTsFileEventBatch: Failed to increase the
reference count of event {}, skipping it.",
- ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
- } else {
- try {
- batch.onEvent(parsedEvent);
- } catch (final Exception ignored) {
- // no exceptions will be thrown
- }
- }
- }
+ ((PipeTsFileInsertionEvent) event)
+ .consumeTabletInsertionEventsWithRetry(
+ event1 -> {
+ if (!event1.increaseReferenceCount(this.getClass().getName()))
{
+ LOGGER.warn(
+ "SubscriptionPipeTsFileEventBatch: Failed to increase
the reference count of event {}, skipping it.",
+ event1.coreReportMessage());
+ } else {
+ try {
+ batch.onEvent(event1);
+ } catch (final Exception ignored) {
+ // no exceptions will be thrown
+ }
+ }
+ },
+ "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent");
} finally {
try {
event.close();