This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eaa5bcb012e Fix pipe permission retry and table parser progress 
(#17844)
eaa5bcb012e is described below

commit eaa5bcb012e48525424dcba0140b293747e7f564
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 4 18:23:43 2026 +0800

    Fix pipe permission retry and table parser progress (#17844)
---
 .../table/TsFileInsertionEventTableParser.java     | 197 +++++++++++----------
 .../sink/protocol/writeback/WriteBackSink.java     |  21 ++-
 .../pipe/event/TsFileInsertionEventParserTest.java |  88 +++++++++
 3 files changed, 213 insertions(+), 93 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index edaaa3aa06d..8ecdcc0cec5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -148,39 +148,24 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
               new Iterator<TabletInsertionEvent>() {
 
                 private TsFileInsertionEventTableParserTabletIterator 
tabletIterator;
+                private PipeRawTabletInsertionEvent nextEvent;
+                private Tablet bufferedTablet;
+                private boolean iterationClosed = false;
 
                 @Override
                 public boolean hasNext() {
                   try {
-                    if (tabletIterator == null) {
-                      tabletIterator =
-                          new TsFileInsertionEventTableParserTabletIterator(
-                              tsFileSequenceReader,
-                              entry ->
-                                  (Objects.isNull(tablePattern)
-                                          || 
tablePattern.matchesTable(entry.getKey()))
-                                      && hasTablePrivilege(entry.getKey()),
-                              allocatedMemoryBlockForTablet,
-                              allocatedMemoryBlockForBatchData,
-                              allocatedMemoryBlockForChunk,
-                              allocatedMemoryBlockForChunkMeta,
-                              allocatedMemoryBlockForTableSchemas,
-                              currentModifications,
-                              startTime,
-                              endTime);
+                    if (nextEvent != null) {
+                      return true;
                     }
-                    final boolean hasNext = tabletIterator.hasNext();
-                    if (hasNext && !parseStartTimeRecorded) {
-                      // Record start time on first hasNext() that returns true
-                      recordParseStartTime();
-                    } else if (!hasNext && parseStartTimeRecorded && 
!parseEndTimeRecorded) {
-                      // Record end time on last hasNext() that returns false
-                      recordParseEndTime();
-                      close();
-                    } else if (!hasNext) {
-                      close();
+
+                    final Tablet tablet = pollNextNonEmptyTablet();
+                    if (tablet == null) {
+                      return false;
                     }
-                    return hasNext;
+
+                    nextEvent = buildTabletInsertionEvent(tablet, 
!prepareNextNonEmptyTablet());
+                    return true;
                   } catch (Exception e) {
                     close();
                     throw new PipeException(
@@ -211,74 +196,108 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
                   return false;
                 }
 
+                private Tablet pollNextNonEmptyTablet() throws Exception {
+                  if (!prepareNextNonEmptyTablet()) {
+                    return null;
+                  }
+
+                  final Tablet tablet = bufferedTablet;
+                  bufferedTablet = null;
+                  return tablet;
+                }
+
+                private boolean prepareNextNonEmptyTablet() throws Exception {
+                  if (bufferedTablet != null) {
+                    return true;
+                  }
+                  if (iterationClosed) {
+                    return false;
+                  }
+
+                  if (tabletIterator == null) {
+                    tabletIterator =
+                        new TsFileInsertionEventTableParserTabletIterator(
+                            tsFileSequenceReader,
+                            entry ->
+                                (Objects.isNull(tablePattern)
+                                        || 
tablePattern.matchesTable(entry.getKey()))
+                                    && hasTablePrivilege(entry.getKey()),
+                            allocatedMemoryBlockForTablet,
+                            allocatedMemoryBlockForBatchData,
+                            allocatedMemoryBlockForChunk,
+                            allocatedMemoryBlockForChunkMeta,
+                            allocatedMemoryBlockForTableSchemas,
+                            currentModifications,
+                            startTime,
+                            endTime);
+                  }
+
+                  while (tabletIterator.hasNext()) {
+                    if (!parseStartTimeRecorded) {
+                      recordParseStartTime();
+                    }
+
+                    final Tablet tablet = tabletIterator.next();
+                    recordTabletMetrics(tablet);
+                    if (!PipeRawTabletInsertionEvent.isTabletEmpty(tablet)) {
+                      bufferedTablet = tablet;
+                      return true;
+                    }
+                  }
+
+                  closeIteration();
+                  return false;
+                }
+
+                private void closeIteration() {
+                  if (iterationClosed) {
+                    return;
+                  }
+
+                  if (parseStartTimeRecorded && !parseEndTimeRecorded) {
+                    recordParseEndTime();
+                  }
+                  close();
+                  iterationClosed = true;
+                }
+
+                private PipeRawTabletInsertionEvent buildTabletInsertionEvent(
+                    final Tablet tablet, final boolean needToReport) {
+                  return sourceEvent == null
+                      ? new PipeRawTabletInsertionEvent(
+                          Boolean.TRUE,
+                          null,
+                          null,
+                          null,
+                          tablet,
+                          true,
+                          null,
+                          0,
+                          pipeTaskMeta,
+                          sourceEvent,
+                          needToReport)
+                      : new PipeRawTabletInsertionEvent(
+                          Boolean.TRUE,
+                          sourceEvent.getSourceDatabaseNameFromDataRegion(),
+                          sourceEvent.getRawTableModelDataBase(),
+                          sourceEvent.getRawTreeModelDataBase(),
+                          tablet,
+                          true,
+                          sourceEvent.getPipeName(),
+                          sourceEvent.getCreationTime(),
+                          pipeTaskMeta,
+                          sourceEvent,
+                          needToReport);
+                }
+
                 @Override
                 public TabletInsertionEvent next() {
                   if (!hasNext()) {
-                    close();
                     throw new NoSuchElementException();
                   }
 
-                  final Tablet tablet = tabletIterator.next();
-                  // Record tablet metrics
-                  recordTabletMetrics(tablet);
-
-                  final TabletInsertionEvent next;
-                  if (!hasNext()) {
-                    next =
-                        sourceEvent == null
-                            ? new PipeRawTabletInsertionEvent(
-                                Boolean.TRUE,
-                                null,
-                                null,
-                                null,
-                                tablet,
-                                true,
-                                null,
-                                0,
-                                pipeTaskMeta,
-                                sourceEvent,
-                                true)
-                            : new PipeRawTabletInsertionEvent(
-                                Boolean.TRUE,
-                                
sourceEvent.getSourceDatabaseNameFromDataRegion(),
-                                sourceEvent.getRawTableModelDataBase(),
-                                sourceEvent.getRawTreeModelDataBase(),
-                                tablet,
-                                true,
-                                sourceEvent.getPipeName(),
-                                sourceEvent.getCreationTime(),
-                                pipeTaskMeta,
-                                sourceEvent,
-                                true);
-                    close();
-                  } else {
-                    next =
-                        sourceEvent == null
-                            ? new PipeRawTabletInsertionEvent(
-                                Boolean.TRUE,
-                                null,
-                                null,
-                                null,
-                                tablet,
-                                true,
-                                null,
-                                0,
-                                pipeTaskMeta,
-                                sourceEvent,
-                                false)
-                            : new PipeRawTabletInsertionEvent(
-                                Boolean.TRUE,
-                                
sourceEvent.getSourceDatabaseNameFromDataRegion(),
-                                sourceEvent.getRawTableModelDataBase(),
-                                sourceEvent.getRawTreeModelDataBase(),
-                                tablet,
-                                true,
-                                sourceEvent.getPipeName(),
-                                sourceEvent.getCreationTime(),
-                                pipeTaskMeta,
-                                sourceEvent,
-                                false);
-                  }
+                  final TabletInsertionEvent next = nextEvent;
+                  nextEvent = null;
                   return next;
                 }
               };
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 581792475c0..9bec114ede1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.queryengine.common.SqlDialect;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -284,7 +285,8 @@ public class WriteBackSink implements PipeConnector {
         && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
         && !(skipIfNoPrivileges
             && status.getCode() == 
TSStatusCode.NO_PERMISSION.getStatusCode())) {
-      throw new PipeException(
+      throwWriteBackExceptionIfNecessary(
+          status,
           String.format(
               "Write back PipeInsertNodeTabletInsertionEvent %s error, result 
status %s",
               pipeInsertNodeTabletInsertionEvent, status));
@@ -328,7 +330,8 @@ public class WriteBackSink implements PipeConnector {
         && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
         && !(skipIfNoPrivileges
             && status.getCode() == 
TSStatusCode.NO_PERMISSION.getStatusCode())) {
-      throw new PipeException(
+      throwWriteBackExceptionIfNecessary(
+          status,
           String.format(
               "Write back PipeRawTabletInsertionEvent %s error, result status 
%s",
               pipeRawTabletInsertionEvent, status));
@@ -373,13 +376,23 @@ public class WriteBackSink implements PipeConnector {
         && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
         && !(skipIfNoPrivileges
             && status.getCode() == 
TSStatusCode.NO_PERMISSION.getStatusCode())) {
-      throw new PipeException(
+      throwWriteBackExceptionIfNecessary(
+          status,
           String.format(
               "Write back PipeStatementInsertionEvent %s error, result status 
%s",
               pipeStatementInsertionEvent, status));
     }
   }
 
+  private static void throwWriteBackExceptionIfNecessary(
+      final TSStatus status, final String exceptionMessage) {
+    if (status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) {
+      throw new 
PipeRuntimeSinkNonReportTimeConfigurableException(exceptionMessage, 
Long.MAX_VALUE);
+    }
+
+    throw new PipeException(exceptionMessage);
+  }
+
   @Override
   public void close() throws Exception {
     if (session != null) {
@@ -410,7 +423,7 @@ public class WriteBackSink implements PipeConnector {
           .status;
     } catch (final AccessDeniedException e) {
       if (!skipIfNoPrivileges) {
-        throw e;
+        throw new 
PipeRuntimeSinkNonReportTimeConfigurableException(e.getMessage(), 
Long.MAX_VALUE);
       }
       LOGGER.debug(
           DataNodePipeMessages.EXECUTE_STATEMENT_TO_DATABASE_SKIP_BECAUSE_NO,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 84569bf586c..50109b935c3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -525,6 +525,77 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  @Test
+  public void testTableParserWithTablePatternReportsLastNonEmptyTablet() 
throws Exception {
+    final int originalPipeDataStructureTabletRowSize =
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
+    
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);
+
+    try {
+      alignedTsFile = new File("table-parser-table-pattern.tsfile");
+      if (alignedTsFile.exists()) {
+        Assert.assertTrue(alignedTsFile.delete());
+      }
+
+      final List<IMeasurementSchema> schemaList =
+          Arrays.asList(
+              new MeasurementSchema("tag0", TSDataType.STRING),
+              new MeasurementSchema("s0", TSDataType.INT64));
+      final List<String> columnNameList = Arrays.asList("tag0", "s0");
+      final List<TSDataType> dataTypeList = Arrays.asList(TSDataType.STRING, 
TSDataType.INT64);
+      final List<ColumnCategory> columnCategoryList =
+          Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD);
+
+      try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+        writer.registerTableSchema(new TableSchema("test", schemaList, 
columnCategoryList));
+        writer.registerTableSchema(new TableSchema("test1", schemaList, 
columnCategoryList));
+        writer.writeTable(
+            generateSimpleTableTablet(
+                "test", columnNameList, dataTypeList, columnCategoryList, 
"ignored", 0, 2));
+        writer.writeTable(
+            generateSimpleTableTablet(
+                "test1", columnNameList, dataTypeList, columnCategoryList, 
"matched", 3, 4));
+        writer.writeTable(
+            generateSimpleTableTablet(
+                "test1", columnNameList, dataTypeList, columnCategoryList, 
"unmatched", 2, 10));
+      }
+
+      try (final TsFileInsertionEventTableParser parser =
+          new TsFileInsertionEventTableParser(
+              alignedTsFile,
+              new TablePattern(true, null, "test1"),
+              3,
+              5,
+              null,
+              null,
+              null,
+              false)) {
+        final Iterator<TabletInsertionEvent> iterator = 
parser.toTabletInsertionEvents().iterator();
+        int rowCount = 0;
+        PipeRawTabletInsertionEvent lastEvent = null;
+        while (iterator.hasNext()) {
+          final PipeRawTabletInsertionEvent event = 
(PipeRawTabletInsertionEvent) iterator.next();
+          final Tablet tablet = event.convertToTablet();
+          Assert.assertEquals("test1", tablet.getTableName());
+          
Assert.assertFalse(PipeRawTabletInsertionEvent.isTabletEmpty(tablet));
+          rowCount += tablet.getRowSize();
+          if (lastEvent != null) {
+            Assert.assertFalse(lastEvent.isNeedToReport());
+          }
+          lastEvent = event;
+        }
+
+        Assert.assertEquals(2, rowCount);
+        Assert.assertNotNull(lastEvent);
+        Assert.assertTrue(lastEvent.isNeedToReport());
+      }
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+    }
+  }
+
   @Test
   public void manualTestScanParserSplitPerformance() throws Exception {
     Assume.assumeTrue(
@@ -1343,6 +1414,23 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  private Tablet generateSimpleTableTablet(
+      final String tableName,
+      final List<String> columnNameList,
+      final List<TSDataType> dataTypeList,
+      final List<ColumnCategory> columnCategoryList,
+      final String tagValue,
+      final long... timestamps) {
+    final Tablet tablet =
+        new Tablet(tableName, columnNameList, dataTypeList, 
columnCategoryList, timestamps.length);
+    for (int rowIndex = 0; rowIndex < timestamps.length; ++rowIndex) {
+      tablet.addTimestamp(rowIndex, timestamps[rowIndex]);
+      tablet.addValue(rowIndex, 0, tagValue);
+      tablet.addValue(rowIndex, 1, (long) rowIndex);
+    }
+    return tablet;
+  }
+
   private void generateLargeTableTsFile(
       final File tsFile,
       final int tableCount,

Reply via email to