This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c438ee9d5 [core] Fix FileMonitorTable with Deletion Vectors Enabled 
(#5406)
1c438ee9d5 is described below

commit 1c438ee9d591fc7b3d12831fe593dfe3984ede78
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 7 10:17:29 2025 +0800

    [core] Fix FileMonitorTable with Deletion Vectors Enabled (#5406)
---
 .../apache/paimon/table/source/DataTableStreamScan.java  | 16 +++++++++++-----
 .../apache/paimon/table/PrimaryKeySimpleTableTest.java   | 12 +++++++++---
 2 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index f3581b114c..60253d49fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.StreamScanMode;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.lookup.LookupStrategy;
@@ -48,6 +49,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
 
 /** {@link StreamTableScan} implementation for streaming planning. */
 public class DataTableStreamScan extends AbstractDataTableScan implements 
StreamDataTableScan {
@@ -55,6 +57,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
     private static final Logger LOG = 
LoggerFactory.getLogger(DataTableStreamScan.class);
 
     private final CoreOptions options;
+    private final StreamScanMode scanMode;
     private final SnapshotManager snapshotManager;
     private final boolean supportStreamingReadOverwrite;
     private final DefaultValueAssigner defaultValueAssigner;
@@ -80,6 +83,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
             DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
         this.options = options;
+        this.scanMode = 
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
         this.snapshotManager = snapshotManager;
         this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
         this.defaultValueAssigner = defaultValueAssigner;
@@ -139,7 +143,9 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
 
     private Plan tryFirstPlan() {
         StartingScanner.Result result;
-        if (options.needLookup()) {
+        if (scanMode == FILE_MONITOR) {
+            result = startingScanner.scan(snapshotReader);
+        } else if (options.needLookup()) {
             result = startingScanner.scan(snapshotReader.withLevelFilter(level 
-> level > 0));
             snapshotReader.withLevelFilter(Filter.alwaysTrue());
         } else if (options.changelogProducer().equals(FULL_COMPACTION)) {
@@ -157,7 +163,9 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
             currentWatermark = scannedResult.currentWatermark();
             long currentSnapshotId = scannedResult.currentSnapshotId();
             LookupStrategy lookupStrategy = options.lookupStrategy();
-            if (!lookupStrategy.produceChangelog && 
lookupStrategy.deletionVector) {
+            if (scanMode == FILE_MONITOR) {
+                nextSnapshotId = currentSnapshotId + 1;
+            } else if (!lookupStrategy.produceChangelog && 
lookupStrategy.deletionVector) {
                 // For DELETION_VECTOR_ONLY mode, we need to return the 
remaining data from level 0
                 // in the subsequent plan.
                 nextSnapshotId = currentSnapshotId;
@@ -250,9 +258,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
     }
 
     protected FollowUpScanner createFollowUpScanner() {
-        CoreOptions.StreamScanMode type =
-                options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
-        switch (type) {
+        switch (scanMode) {
             case COMPACT_BUCKET_TABLE:
                 return new DeltaFollowUpScanner();
             case FILE_MONITOR:
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index d80b8debdd..cbf7df0692 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -1673,19 +1673,23 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                 
.containsExactly("1|10|200|binary|varbinary|mapKey:mapVal|multiset");
     }
 
-    @Test
-    public void testInnerStreamScanMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testFileMonitorTableScan(boolean dvEnabled) throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(options -> 
options.set(DELETION_VECTORS_ENABLED, dvEnabled));
 
         FileMonitorTable monitorTable = new FileMonitorTable(table);
         ReadBuilder readBuilder = monitorTable.newReadBuilder();
         StreamTableScan scan = readBuilder.newStreamScan();
         TableRead read = readBuilder.newRead();
+        IOManager ioManager = IOManager.create(tempDir.toString());
 
         // 1. first write
 
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         BatchTableWrite write = writeBuilder.newWrite();
+        write.withIOManager(ioManager);
         BatchTableCommit commit = writeBuilder.newCommit();
 
         write.write(rowData(1, 10, 100L));
@@ -1707,6 +1711,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         commit.close();
         writeBuilder = table.newBatchWriteBuilder();
         write = writeBuilder.newWrite();
+        write.withIOManager(ioManager);
         commit = writeBuilder.newCommit();
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 11, 101L));
@@ -1736,6 +1741,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         commit.close();
         writeBuilder = table.newBatchWriteBuilder().withOverwrite();
         write = writeBuilder.newWrite();
+        write.withIOManager(ioManager);
         commit = writeBuilder.newCommit();
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 11, 101L));

Reply via email to