This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 69df0c4313 [core][flink] Streaming first plan refactor level0
filtering to avoid that full mode wrong (#5866)
69df0c4313 is described below
commit 69df0c4313bf1619577ba5e2ae59bee7c4adca07
Author: yuzelin <[email protected]>
AuthorDate: Thu Jul 17 13:14:34 2025 +0800
[core][flink] Streaming first plan refactor level0 filtering to avoid that
full mode wrong (#5866)
---
.../paimon/table/source/DataTableStreamScan.java | 16 ++---
.../apache/paimon/flink/DeletionVectorITCase.java | 75 ++++++++++++++++------
2 files changed, 61 insertions(+), 30 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index a6ab782dba..25563ded2b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StreamScanMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
-import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
@@ -50,6 +49,7 @@ import javax.annotation.Nullable;
import java.util.List;
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
/** {@link StreamTableScan} implementation for streaming planning. */
@@ -157,7 +157,8 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
StartingScanner.Result result;
if (scanMode == FILE_MONITOR) {
result = startingScanner.scan(snapshotReader);
- } else if (options.needLookup()) {
+ } else if (options.changelogProducer().equals(LOOKUP)) {
+ // level0 data will be compacted to produce changelog in the future
result = startingScanner.scan(snapshotReader.withLevelFilter(level
-> level > 0));
snapshotReader.withLevelFilter(Filter.alwaysTrue());
} else if (options.changelogProducer().equals(FULL_COMPACTION)) {
@@ -174,16 +175,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
ScannedResult scannedResult = (ScannedResult) result;
currentWatermark = scannedResult.currentWatermark();
long currentSnapshotId = scannedResult.currentSnapshotId();
- LookupStrategy lookupStrategy = options.lookupStrategy();
- if (scanMode == FILE_MONITOR) {
- nextSnapshotId = currentSnapshotId + 1;
- } else if (!lookupStrategy.produceChangelog &&
lookupStrategy.deletionVector) {
- // For DELETION_VECTOR_ONLY mode, we need to return the
remaining data from level 0
- // in the subsequent plan.
- nextSnapshotId = currentSnapshotId;
- } else {
- nextSnapshotId = currentSnapshotId + 1;
- }
+ nextSnapshotId = currentSnapshotId + 1;
isFullPhaseEnd =
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
LOG.debug(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index a5b1795037..34200cbc41 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -70,14 +70,14 @@ public class DeletionVectorITCase extends CatalogITCaseBase
{
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
- assertThat(iter.collect(8))
+
+ // the first two values will be merged
+ assertThat(iter.collect(6))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
- Row.ofKind(RowKind.INSERT, 2, "2"),
- Row.ofKind(RowKind.INSERT, 3, "3"),
- Row.ofKind(RowKind.INSERT, 4, "4"),
Row.ofKind(RowKind.INSERT, 2, "2_1"),
Row.ofKind(RowKind.INSERT, 3, "3_1"),
+ Row.ofKind(RowKind.INSERT, 4, "4"),
Row.ofKind(RowKind.INSERT, 2, "2_2"),
Row.ofKind(RowKind.INSERT, 4, "4_1"));
}
@@ -118,20 +118,34 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
- assertThat(iter.collect(12))
- .containsExactlyInAnyOrder(
- Row.ofKind(RowKind.INSERT, 1, "111111111"),
- Row.ofKind(RowKind.INSERT, 2, "2"),
- Row.ofKind(RowKind.INSERT, 3, "3"),
- Row.ofKind(RowKind.INSERT, 4, "4"),
- Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2"),
- Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1"),
- Row.ofKind(RowKind.UPDATE_BEFORE, 3, "3"),
- Row.ofKind(RowKind.UPDATE_AFTER, 3, "3_1"),
- Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"),
- Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"),
- Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"),
- Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1"));
+ if (changelogProducer.equals("none")) {
+ // the first two values will be merged
+ assertThat(iter.collect(8))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, "111111111"),
+ Row.ofKind(RowKind.INSERT, 2, "2_1"),
+ Row.ofKind(RowKind.INSERT, 3, "3_1"),
+ Row.ofKind(RowKind.INSERT, 4, "4"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1"));
+ } else {
+ assertThat(iter.collect(12))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, "111111111"),
+ Row.ofKind(RowKind.INSERT, 2, "2"),
+ Row.ofKind(RowKind.INSERT, 3, "3"),
+ Row.ofKind(RowKind.INSERT, 4, "4"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 3, "3"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 3, "3_1"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1"));
+ }
}
// test read from COMPACT snapshot
@@ -380,4 +394,29 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
sql("ALTER TABLE TT SET('deletion-vectors.enabled' = 'false')");
assertThat(sql("SELECT * FROM TT").size()).isEqualTo(5);
}
+
+ // No compaction to verify that level0 data can be read at full phase
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testStreamingReadFullWithoutCompact(boolean isPk) throws
Exception {
+ if (isPk) {
+ sql(
+ "CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) "
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = 'none', 'write-only' = 'true')");
+ } else {
+ sql(
+ "CREATE TABLE T (a INT, b INT) WITH
('deletion-vectors.enabled' = 'true', 'write-only' = 'true')");
+ }
+
+ sql("INSERT INTO T VALUES (1, 1)");
+ sql("INSERT INTO T VALUES (2, 2)");
+ sql("INSERT INTO T VALUES (3, 3)");
+
+ try (BlockingIterator<Row, Row> iter =
+ streamSqlBlockIter(
+ "SELECT * FROM T /*+ OPTIONS('scan.mode' =
'from-snapshot-full', 'scan.snapshot-id' = '2') */")) {
+ assertThat(iter.collect(3))
+ .containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2),
Row.of(3, 3));
+ }
+ }
}