This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch no-listen-sealed
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 02be9628d37914fd0d59aecffa8b5ee482d8a7e1
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 20:30:00 2026 +0800

    no-listen-sealed
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 58 +++++++++++++++++++++-
 .../pipe/event/PipeTsFileInsertionEventTest.java   | 46 +++++++++++++++--
 2 files changed, 98 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 02ffcfcb2b4..081bd4fd702 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -91,6 +91,9 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   protected final boolean isGeneratedByPipe;
   protected final boolean isGeneratedByIoTConsensusV2;
   protected final boolean isGeneratedByHistoricalExtractor;
+  // Realtime TsFile events are created after TsFileProcessor#endFile(), so 
the file is already
+  // immutable even if TsFileResource status is still UNCLOSED.
+  private final boolean isTsFileSealed;
   private final AtomicBoolean isClosed;
   private final AtomicReference<TsFileInsertionEventParser> eventParser;
 
@@ -130,7 +133,8 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
         null,
         true,
         Long.MIN_VALUE,
-        Long.MAX_VALUE);
+        Long.MAX_VALUE,
+        true);
   }
 
   public PipeTsFileInsertionEvent(
@@ -153,6 +157,50 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       final boolean skipIfNoPrivileges,
       final long startTime,
       final long endTime) {
+    this(
+        isTableModelEvent,
+        databaseNameFromDataRegion,
+        resource,
+        tsFile,
+        isWithMod,
+        isLoaded,
+        isGeneratedByHistoricalExtractor,
+        tableNames,
+        pipeName,
+        creationTime,
+        pipeTaskMeta,
+        treePattern,
+        tablePattern,
+        userId,
+        userName,
+        cliHostname,
+        skipIfNoPrivileges,
+        startTime,
+        endTime,
+        false);
+  }
+
+  private PipeTsFileInsertionEvent(
+      final Boolean isTableModelEvent,
+      final String databaseNameFromDataRegion,
+      final TsFileResource resource,
+      final File tsFile,
+      final boolean isWithMod,
+      final boolean isLoaded,
+      final boolean isGeneratedByHistoricalExtractor,
+      final Set<String> tableNames,
+      final String pipeName,
+      final long creationTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
+      final String userId,
+      final String userName,
+      final String cliHostname,
+      final boolean skipIfNoPrivileges,
+      final long startTime,
+      final long endTime,
+      final boolean isTsFileSealed) {
     super(
         pipeName,
         creationTime,
@@ -186,6 +234,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     this.isGeneratedByPipe = resource.isGeneratedByPipe();
     this.isGeneratedByIoTConsensusV2 = resource.isGeneratedByIoTConsensusV2();
     this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
+    this.isTsFileSealed = isTsFileSealed;
     this.tableNames = tableNames;
 
     isClosed = new AtomicBoolean(resource.isClosed());
@@ -242,6 +291,10 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       return true;
     }
 
+    if (isTsFileSealed) {
+      return !resource.isEmpty();
+    }
+
     if (!isClosed.get()) {
       isClosed.set(resource.isClosed());
 
@@ -452,7 +505,8 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
             cliHostname,
             skipIfNoPrivileges,
             startTime,
-            endTime)
+            endTime,
+            isTsFileSealed)
         .bindTsFileDedupScopeID(tsFileDedupScopeID);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
index 338a851cc90..3d57e632baa 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
@@ -242,18 +242,56 @@ public class PipeTsFileInsertionEventTest {
     }
   }
 
+  @Test(timeout = 5000)
+  public void 
testRealtimeEventCanSkipWaitingForClosedStatusAfterTsFileSealed() throws 
Exception {
+    final File tempDir = 
Files.createTempDirectory("pipeTsFileSealed").toFile();
+
+    try {
+      final TsFileResource resource =
+          createNonEmptyTsFileResource(tempDir, "realtime.tsfile", 1L, 1);
+      Assert.assertFalse(resource.isClosed());
+      Assert.assertFalse(resource.isEmpty());
+
+      final PipeTsFileInsertionEvent sourceEvent =
+          new PipeTsFileInsertionEvent(false, "root.db", resource, false);
+      Assert.assertTrue(sourceEvent.waitForTsFileClose());
+
+      final PipeTsFileInsertionEvent copiedEvent =
+          sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+              "pipe", 1L, null, null, null, null, null, null, true, 
Long.MIN_VALUE, Long.MAX_VALUE);
+      Assert.assertTrue(copiedEvent.waitForTsFileClose());
+
+      copiedEvent.close();
+      sourceEvent.close();
+    } finally {
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
   private TsFileResource createSpyTsFileResource(
       final File tempDir, final String fileName, final long flushOrderId, 
final int dataRegionId)
       throws IOException {
+    final TsFileResource resource =
+        createNonEmptyTsFileResource(tempDir, fileName, flushOrderId, 
dataRegionId);
+    final TsFileResource spyResource = Mockito.spy(resource);
+    
Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId();
+    return spyResource;
+  }
+
+  private TsFileResource createNonEmptyTsFileResource(
+      final File tempDir, final String fileName, final long flushOrderId, 
final int dataRegionId)
+      throws IOException {
     final File file = new File(tempDir, fileName);
     Assert.assertTrue(file.createNewFile());
 
     final TsFileResource resource = new TsFileResource(file);
     resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
-
-    final TsFileResource spyResource = Mockito.spy(resource);
-    
Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId();
-    return spyResource;
+    final ITimeIndex timeIndex = new ArrayDeviceTimeIndex();
+    final IDeviceID deviceID = 
IDeviceID.Factory.DEFAULT_FACTORY.create("root.db.d" + dataRegionId);
+    timeIndex.putStartTime(deviceID, 1);
+    timeIndex.putEndTime(deviceID, 1);
+    resource.setTimeIndex(timeIndex);
+    return resource;
   }
 
   static class TestAccessControl implements AccessControl {

Reply via email to