This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1c438ee9d5 [core] Fix FileMonitorTable with Deletion Vectors Enabled
(#5406)
1c438ee9d5 is described below
commit 1c438ee9d591fc7b3d12831fe593dfe3984ede78
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 7 10:17:29 2025 +0800
[core] Fix FileMonitorTable with Deletion Vectors Enabled (#5406)
---
.../apache/paimon/table/source/DataTableStreamScan.java | 16 +++++++++++-----
.../apache/paimon/table/PrimaryKeySimpleTableTest.java | 12 +++++++++---
2 files changed, 20 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index f3581b114c..60253d49fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.StreamScanMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.lookup.LookupStrategy;
@@ -48,6 +49,7 @@ import javax.annotation.Nullable;
import java.util.List;
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
/** {@link StreamTableScan} implementation for streaming planning. */
public class DataTableStreamScan extends AbstractDataTableScan implements
StreamDataTableScan {
@@ -55,6 +57,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
private static final Logger LOG =
LoggerFactory.getLogger(DataTableStreamScan.class);
private final CoreOptions options;
+ private final StreamScanMode scanMode;
private final SnapshotManager snapshotManager;
private final boolean supportStreamingReadOverwrite;
private final DefaultValueAssigner defaultValueAssigner;
@@ -80,6 +83,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
this.options = options;
+ this.scanMode =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
this.snapshotManager = snapshotManager;
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
this.defaultValueAssigner = defaultValueAssigner;
@@ -139,7 +143,9 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
private Plan tryFirstPlan() {
StartingScanner.Result result;
- if (options.needLookup()) {
+ if (scanMode == FILE_MONITOR) {
+ result = startingScanner.scan(snapshotReader);
+ } else if (options.needLookup()) {
result = startingScanner.scan(snapshotReader.withLevelFilter(level
-> level > 0));
snapshotReader.withLevelFilter(Filter.alwaysTrue());
} else if (options.changelogProducer().equals(FULL_COMPACTION)) {
@@ -157,7 +163,9 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
currentWatermark = scannedResult.currentWatermark();
long currentSnapshotId = scannedResult.currentSnapshotId();
LookupStrategy lookupStrategy = options.lookupStrategy();
- if (!lookupStrategy.produceChangelog &&
lookupStrategy.deletionVector) {
+ if (scanMode == FILE_MONITOR) {
+ nextSnapshotId = currentSnapshotId + 1;
+ } else if (!lookupStrategy.produceChangelog &&
lookupStrategy.deletionVector) {
// For DELETION_VECTOR_ONLY mode, we need to return the
remaining data from level 0
// in the subsequent plan.
nextSnapshotId = currentSnapshotId;
@@ -250,9 +258,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
}
protected FollowUpScanner createFollowUpScanner() {
- CoreOptions.StreamScanMode type =
- options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
- switch (type) {
+ switch (scanMode) {
case COMPACT_BUCKET_TABLE:
return new DeltaFollowUpScanner();
case FILE_MONITOR:
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index d80b8debdd..cbf7df0692 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -1673,19 +1673,23 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
.containsExactly("1|10|200|binary|varbinary|mapKey:mapVal|multiset");
}
- @Test
- public void testInnerStreamScanMode() throws Exception {
- FileStoreTable table = createFileStoreTable();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFileMonitorTableScan(boolean dvEnabled) throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(options ->
options.set(DELETION_VECTORS_ENABLED, dvEnabled));
FileMonitorTable monitorTable = new FileMonitorTable(table);
ReadBuilder readBuilder = monitorTable.newReadBuilder();
StreamTableScan scan = readBuilder.newStreamScan();
TableRead read = readBuilder.newRead();
+ IOManager ioManager = IOManager.create(tempDir.toString());
// 1. first write
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
+ write.withIOManager(ioManager);
BatchTableCommit commit = writeBuilder.newCommit();
write.write(rowData(1, 10, 100L));
@@ -1707,6 +1711,7 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
commit.close();
writeBuilder = table.newBatchWriteBuilder();
write = writeBuilder.newWrite();
+ write.withIOManager(ioManager);
commit = writeBuilder.newCommit();
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 11, 101L));
@@ -1736,6 +1741,7 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
commit.close();
writeBuilder = table.newBatchWriteBuilder().withOverwrite();
write = writeBuilder.newWrite();
+ write.withIOManager(ioManager);
commit = writeBuilder.newCommit();
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 11, 101L));