This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b5626c8922 Pipe: filtered empty tsFiles which should not be parsed or 
reported (#12216)
8b5626c8922 is described below

commit 8b5626c8922f536dc9724293628a33ce1d3144fa
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 22 20:59:39 2024 +0800

    Pipe: filtered empty tsFiles which should not be parsed or reported (#12216)
---
 .../pipe/it/autocreate/IoTDBPipeProcessorIT.java   |  9 ++++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 34 +++++++++++++++-------
 2 files changed, 29 insertions(+), 14 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
index fe12fab2bf1..693af799453 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
@@ -70,14 +70,15 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualAutoIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      // Test the mixture of historical and realtime data
+      // Test empty tsFile parsing
+      // Assert that an empty tsFile will not be parsed by the processor then 
block
+      // the subsequent data processing
       // Do not fail if the failure has nothing to do with pipe
       // Because the failures will randomly generate due to resource limitation
       if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
           Arrays.asList(
-              "insert into root.vehicle.d0(time, s1) values (0, 1)",
-              "insert into root.vehicle.d0(time, s1) values (10000, 2)"))) {
+              "insert into root.vehicle.d0(time, s1) values (0, 1)", "delete 
from root.**"))) {
         return;
       }
 
@@ -110,6 +111,8 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualAutoIT {
       if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
           Arrays.asList(
+              "insert into root.vehicle.d0(time, s1) values (0, 1)",
+              "insert into root.vehicle.d0(time, s1) values (10000, 2)",
               "insert into root.vehicle.d0(time, s1) values (19999, 3)",
               "insert into root.vehicle.d0(time, s1) values (20000, 4)",
               "insert into root.vehicle.d0(time, s1) values (20001, 5)",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 110414a64ba..5e42d7960b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class PipeTsFileInsertionEvent extends EnrichedEvent implements 
TsFileInsertionEvent {
@@ -156,7 +157,12 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   @Override
   public ProgressIndex getProgressIndex() {
     try {
-      waitForTsFileClose();
+      if (!waitForTsFileClose()) {
+        LOGGER.warn(
+            "Skipping temporary TsFile {}'s progressIndex, will report 
MinimumProgressIndex",
+            tsFile);
+        return MinimumProgressIndex.INSTANCE;
+      }
       return resource.getMaxProgressIndexAfterClose();
     } catch (InterruptedException e) {
       LOGGER.warn(
@@ -217,18 +223,13 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    return initDataContainer().toTabletInsertionEvents();
-  }
-
-  private TsFileInsertionDataContainer initDataContainer() {
     try {
-      if (dataContainer == null) {
-        waitForTsFileClose();
-        dataContainer =
-            new TsFileInsertionDataContainer(
-                tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
+      if (!waitForTsFileClose()) {
+        LOGGER.warn(
+            "Pipe skipping temporary TsFile's parsing which shouldn't be 
transferred: {}", tsFile);
+        return Collections.emptyList();
       }
-      return dataContainer;
+      return initDataContainer().toTabletInsertionEvents();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       close();
@@ -238,6 +239,17 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
               "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath());
       LOGGER.warn(errorMsg, e);
       throw new PipeException(errorMsg);
+    }
+  }
+
+  private TsFileInsertionDataContainer initDataContainer() {
+    try {
+      if (dataContainer == null) {
+        dataContainer =
+            new TsFileInsertionDataContainer(
+                tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
+      }
+      return dataContainer;
     } catch (IOException e) {
       close();
 

Reply via email to