This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c28e50f7afa Pipe: Add retry when TsFile parsing failed to avoid race 
among processor threads (follow up #15624) (#15644)
c28e50f7afa is described below

commit c28e50f7afadc0de48bb7e7cb19a4be9398979a3
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 6 10:29:21 2025 +0800

    Pipe: Add retry when TsFile parsing failed to avoid race among processor 
threads (follow up #15624) (#15644)
    
    * Pipe: Add retry when TsFile parsing failed to avoid race among processor 
threads
    
    * refactor
    
    * refactor
    
    * refactor
---
 .../agent/task/connection/PipeEventCollector.java  | 31 +----------
 .../subtask/processor/PipeProcessorSubtask.java    | 15 ++++-
 .../protocol/websocket/WebSocketConnector.java     | 13 +++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 65 +++++++++++++++++++---
 .../processor/aggregate/AggregateProcessor.java    | 23 +++++++-
 .../downsampling/DownSamplingProcessor.java        | 25 +++++++--
 .../batch/SubscriptionPipeTsFileEventBatch.java    | 31 ++++++-----
 7 files changed, 135 insertions(+), 68 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index a78c4e2e4ef..e64248a5797 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.agent.task.connection;
 
-import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
@@ -36,13 +35,11 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDele
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class PipeEventCollector implements EventCollector {
@@ -144,32 +141,8 @@ public class PipeEventCollector implements EventCollector {
     }
 
     try {
-      final Iterable<TabletInsertionEvent> iterable = 
sourceEvent.toTabletInsertionEvents();
-      final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
-      while (iterator.hasNext()) {
-        final TabletInsertionEvent parsedEvent = iterator.next();
-        int retryCount = 0;
-        while (true) {
-          try {
-            collectParsedRawTableEvent((PipeRawTabletInsertionEvent) 
parsedEvent);
-            break;
-          } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
-            if (retryCount++ % 100 == 0) {
-              LOGGER.warn(
-                  "parseAndCollectEvent: failed to allocate memory for parsing 
TsFile {}, retry count is {}, will keep retrying.",
-                  sourceEvent.getTsFile(),
-                  retryCount,
-                  e);
-            } else if (LOGGER.isDebugEnabled()) {
-              LOGGER.debug(
-                  "parseAndCollectEvent: failed to allocate memory for parsing 
TsFile {}, retry count is {}, will keep retrying.",
-                  sourceEvent.getTsFile(),
-                  retryCount,
-                  e);
-            }
-          }
-        }
-      }
+      sourceEvent.consumeTabletInsertionEventsWithRetry(
+          this::collectParsedRawTableEvent, 
"PipeEventCollector::parseAndCollectEvent");
     } finally {
       sourceEvent.close();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 4b11ef97285..1f7262c0c16 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -149,9 +149,18 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
               && ((PipeTsFileInsertionEvent) event).shouldParse4Privilege()) {
             try (final PipeTsFileInsertionEvent tsFileInsertionEvent =
                 (PipeTsFileInsertionEvent) event) {
-              for (final TabletInsertionEvent tabletInsertionEvent :
-                  tsFileInsertionEvent.toTabletInsertionEvents()) {
-                pipeProcessor.process(tabletInsertionEvent, 
outputEventCollector);
+              final AtomicReference<Exception> ex = new AtomicReference<>();
+              tsFileInsertionEvent.consumeTabletInsertionEventsWithRetry(
+                  event1 -> {
+                    try {
+                      pipeProcessor.process(event1, outputEventCollector);
+                    } catch (Exception e) {
+                      ex.set(e);
+                    }
+                  },
+                  "PipeProcessorSubtask::executeOnce");
+              if (ex.get() != null) {
+                throw ex.get();
               }
             }
           } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index a9ed5cb46a7..57c51af161e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -141,11 +141,14 @@ public class WebSocketConnector implements PipeConnector {
     }
 
     try {
-      for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        // Skip report if any tablet events is added
-        ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
-        transfer(event);
-      }
+      ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+          .consumeTabletInsertionEventsWithRetry(
+              event -> {
+                // Skip report if any tablet events is added
+                ((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).skipReportOnCommit();
+                transfer(event);
+              },
+              "WebSocketConnector::transfer");
     } finally {
       tsFileInsertionEvent.close();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 2c3cfd10cf2..55f40750662 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
@@ -55,11 +56,13 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
@@ -562,6 +565,49 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
 
+  @FunctionalInterface
+  public interface TabletInsertionEventConsumer {
+    void consume(final PipeRawTabletInsertionEvent event);
+  }
+
+  public void consumeTabletInsertionEventsWithRetry(
+      final TabletInsertionEventConsumer consumer, final String callerName) 
throws PipeException {
+    final Iterable<TabletInsertionEvent> iterable = toTabletInsertionEvents();
+    final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
+    int tabletEventCount = 0;
+    while (iterator.hasNext()) {
+      final TabletInsertionEvent parsedEvent = iterator.next();
+      tabletEventCount++;
+      int retryCount = 0;
+      while (true) {
+        // If failed due do insufficient memory, retry until success to avoid 
race among multiple
+        // processor threads
+        try {
+          consumer.consume((PipeRawTabletInsertionEvent) parsedEvent);
+          break;
+        } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+          if (retryCount++ % 100 == 0) {
+            LOGGER.warn(
+                "{}: failed to allocate memory for parsing TsFile {}, tablet 
event no. {}, retry count is {}, will keep retrying.",
+                callerName,
+                getTsFile(),
+                tabletEventCount,
+                retryCount,
+                e);
+          } else if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(
+                "{}: failed to allocate memory for parsing TsFile {}, tablet 
event no. {}, retry count is {}, will keep retrying.",
+                callerName,
+                getTsFile(),
+                tabletEventCount,
+                retryCount,
+                e);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws 
PipeException {
     // 20 - 40 seconds for waiting
@@ -685,18 +731,19 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   }
 
   public long count(final boolean skipReportOnCommit) throws IOException {
-    long count = 0;
+    AtomicLong count = new AtomicLong();
 
     if (shouldParseTime()) {
       try {
-        for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
-          final PipeRawTabletInsertionEvent rawEvent = 
((PipeRawTabletInsertionEvent) event);
-          count += rawEvent.count();
-          if (skipReportOnCommit) {
-            rawEvent.skipReportOnCommit();
-          }
-        }
-        return count;
+        consumeTabletInsertionEventsWithRetry(
+            event -> {
+              count.addAndGet(event.count());
+              if (skipReportOnCommit) {
+                event.skipReportOnCommit();
+              }
+            },
+            "PipeTsFileInsertionEvent::count");
+        return count.get();
       } finally {
         close();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index ec1683358d4..1119deaf712 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -524,9 +524,26 @@ public class AggregateProcessor implements PipeProcessor {
       final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector 
eventCollector)
       throws Exception {
     try {
-      for (final TabletInsertionEvent tabletInsertionEvent :
-          tsFileInsertionEvent.toTabletInsertionEvents()) {
-        process(tabletInsertionEvent, eventCollector);
+      if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+        final AtomicReference<Exception> ex = new AtomicReference<>();
+        ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+            .consumeTabletInsertionEventsWithRetry(
+                event -> {
+                  try {
+                    process(event, eventCollector);
+                  } catch (Exception e) {
+                    ex.set(e);
+                  }
+                },
+                "AggregateProcessor::process");
+        if (ex.get() != null) {
+          throw ex.get();
+        }
+      } else {
+        for (final TabletInsertionEvent tabletInsertionEvent :
+            tsFileInsertionEvent.toTabletInsertionEvents()) {
+          process(tabletInsertionEvent, eventCollector);
+        }
       }
     } finally {
       tsFileInsertionEvent.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
index fd631772b93..a8e0c270570 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.access.Row;
@@ -45,7 +46,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
 
 public abstract class DownSamplingProcessor implements PipeProcessor {
-
   protected long memoryLimitInBytes;
 
   protected boolean shouldSplitFile;
@@ -149,9 +149,26 @@ public abstract class DownSamplingProcessor implements 
PipeProcessor {
       throws Exception {
     if (shouldSplitFile) {
       try {
-        for (final TabletInsertionEvent tabletInsertionEvent :
-            tsFileInsertionEvent.toTabletInsertionEvents()) {
-          process(tabletInsertionEvent, eventCollector);
+        if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+          final AtomicReference<Exception> ex = new AtomicReference<>();
+          ((PipeTsFileInsertionEvent) tsFileInsertionEvent)
+              .consumeTabletInsertionEventsWithRetry(
+                  event -> {
+                    try {
+                      process(event, eventCollector);
+                    } catch (Exception e) {
+                      ex.set(e);
+                    }
+                  },
+                  "DownSamplingProcessor::process");
+          if (ex.get() != null) {
+            throw ex.get();
+          }
+        } else {
+          for (final TabletInsertionEvent tabletInsertionEvent :
+              tsFileInsertionEvent.toTabletInsertionEvents()) {
+            process(tabletInsertionEvent, eventCollector);
+          }
         }
       } finally {
         tsFileInsertionEvent.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index e6328a39ef4..d8c68d8ec2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
-import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -96,20 +95,22 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
   protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
     // TODO: parse tsfile event on the fly like 
SubscriptionPipeTabletEventBatch
     try {
-      for (final TabletInsertionEvent parsedEvent : 
event.toTabletInsertionEvents()) {
-        if (!((PipeRawTabletInsertionEvent) parsedEvent)
-            .increaseReferenceCount(this.getClass().getName())) {
-          LOGGER.warn(
-              "SubscriptionPipeTsFileEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
-              ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
-        } else {
-          try {
-            batch.onEvent(parsedEvent);
-          } catch (final Exception ignored) {
-            // no exceptions will be thrown
-          }
-        }
-      }
+      ((PipeTsFileInsertionEvent) event)
+          .consumeTabletInsertionEventsWithRetry(
+              event1 -> {
+                if (!event1.increaseReferenceCount(this.getClass().getName())) 
{
+                  LOGGER.warn(
+                      "SubscriptionPipeTsFileEventBatch: Failed to increase 
the reference count of event {}, skipping it.",
+                      event1.coreReportMessage());
+                } else {
+                  try {
+                    batch.onEvent(event1);
+                  } catch (final Exception ignored) {
+                    // no exceptions will be thrown
+                  }
+                }
+              },
+              "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent");
     } finally {
       try {
         event.close();

Reply via email to