This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 90014271c3 [core] Incremental delta scan should be in stream mode, not 
mergeing (#5361)
90014271c3 is described below

commit 90014271c3470c1dbfd567767843b99df717563a
Author: JackeyLee007 <[email protected]>
AuthorDate: Fri Mar 28 16:35:04 2025 +0800

    [core] Incremental delta scan should be in stream mode, not mergeing (#5361)
---
 .../snapshot/IncrementalDeltaStartingScanner.java  |  1 +
 .../apache/paimon/table/IncrementalTableTest.java  | 19 ++++++++++-------
 .../table/IncrementalTimeStampTableTest.java       | 24 ++++++++++++++++------
 .../IncrementalDeltaStartingScannerTest.java       |  7 ++++++-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  4 +++-
 5 files changed, 40 insertions(+), 15 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index df837bab0a..0cfd360433 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -131,6 +131,7 @@ public class IncrementalDeltaStartingScanner extends 
AbstractStartingScanner {
                                             .collect(Collectors.toList()))) {
                 DataSplit.Builder dataSplitBuilder =
                         DataSplit.builder()
+                                .isStreaming(true)
                                 .withSnapshot(endingSnapshotId)
                                 .withPartition(partition)
                                 .withBucket(bucket)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 4810e9f4f5..d3d6aa6ad9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -104,10 +104,13 @@ public class IncrementalTableTest extends TableTestBase {
         List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN, 
"2,5"));
         assertThat(result)
                 .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 3),
+                        GenericRow.of(1, 2, 3),
+                        GenericRow.of(2, 1, 3),
+                        GenericRow.of(2, 2, 1),
                         GenericRow.of(1, 1, 4),
                         GenericRow.of(1, 2, 4),
-                        GenericRow.of(2, 1, 4),
-                        GenericRow.of(2, 2, 1));
+                        GenericRow.of(2, 1, 4));
     }
 
     @Test
@@ -208,12 +211,14 @@ public class IncrementalTableTest extends TableTestBase {
         List<InternalRow> result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, 
"1,3"));
         assertThat(result)
                 .containsExactlyInAnyOrder(
-                        GenericRow.of(fromString("+I"), 2, 1, 3),
-                        GenericRow.of(fromString("+I"), 2, 2, 1),
-                        GenericRow.of(fromString("+I"), 1, 1, 2),
-                        GenericRow.of(fromString("+I"), 1, 4, 1),
+                        GenericRow.of(fromString("-D"), 1, 1, 1),
                         GenericRow.of(fromString("-D"), 1, 2, 1),
-                        GenericRow.of(fromString("-D"), 1, 3, 1));
+                        GenericRow.of(fromString("+I"), 1, 4, 1),
+                        GenericRow.of(fromString("+I"), 2, 1, 2),
+                        GenericRow.of(fromString("-D"), 1, 3, 1),
+                        GenericRow.of(fromString("+I"), 1, 1, 2),
+                        GenericRow.of(fromString("+I"), 2, 1, 3),
+                        GenericRow.of(fromString("+I"), 2, 2, 1));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
index 147021daf5..c6a8e2abf2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -129,9 +129,12 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
                                 String.format("%s,%s", timestampEarliest, 
timestampSnapshot2)));
         assertThat(result2)
                 .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 1),
+                        GenericRow.of(1, 2, 1),
+                        GenericRow.of(1, 3, 1),
+                        GenericRow.of(2, 1, 1),
                         GenericRow.of(1, 1, 2),
                         GenericRow.of(1, 2, 2),
-                        GenericRow.of(1, 3, 1),
                         GenericRow.of(1, 4, 1),
                         GenericRow.of(2, 1, 2));
         result2 =
@@ -144,9 +147,12 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
                                         timestampEarliestString, 
timestampSnapshot2String)));
         assertThat(result2)
                 .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 1),
+                        GenericRow.of(1, 2, 1),
+                        GenericRow.of(1, 3, 1),
+                        GenericRow.of(2, 1, 1),
                         GenericRow.of(1, 1, 2),
                         GenericRow.of(1, 2, 2),
-                        GenericRow.of(1, 3, 1),
                         GenericRow.of(1, 4, 1),
                         GenericRow.of(2, 1, 2));
 
@@ -158,10 +164,13 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
                                 String.format("%s,%s", timestampSnapshot2, 
timestampSnapshot4)));
         assertThat(result3)
                 .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 3),
+                        GenericRow.of(1, 2, 3),
+                        GenericRow.of(2, 1, 3),
+                        GenericRow.of(2, 2, 1),
                         GenericRow.of(1, 1, 4),
                         GenericRow.of(1, 2, 4),
-                        GenericRow.of(2, 1, 4),
-                        GenericRow.of(2, 2, 1));
+                        GenericRow.of(2, 1, 4));
         result3 =
                 read(
                         table,
@@ -172,10 +181,13 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
                                         timestampSnapshot2String, 
timestampSnapshot4String)));
         assertThat(result3)
                 .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 3),
+                        GenericRow.of(1, 2, 3),
+                        GenericRow.of(2, 1, 3),
+                        GenericRow.of(2, 2, 1),
                         GenericRow.of(1, 1, 4),
                         GenericRow.of(1, 2, 4),
-                        GenericRow.of(2, 1, 4),
-                        GenericRow.of(2, 2, 1));
+                        GenericRow.of(2, 1, 4));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index b19d686577..5da9cbbf67 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -74,7 +74,12 @@ public class IncrementalDeltaStartingScannerTest extends 
ScannerTestBase {
         List<Split> splits = 
table.copy(dynamicOptions).newScan().plan().splits();
         assertThat(getResult(table.newRead(), splits))
                 .hasSameElementsAs(
-                        Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 
3|40|400", "+U 3|40|500"));
+                        Arrays.asList(
+                                "+I 2|20|200",
+                                "+I 1|10|100",
+                                "+I 3|40|400",
+                                "-U 3|40|400",
+                                "+U 3|40|500"));
 
         dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "delta");
         splits = table.copy(dynamicOptions).newScan().plan().splits();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 8d55c0e647..f1d8a44ca5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -771,6 +771,8 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
                 sql(
                         "SELECT * FROM `test_scan_mode$audit_log` "
                                 + "/*+ 
OPTIONS('incremental-between'='1,8','incremental-between-scan-mode'='delta') 
*/");
-        assertThat(result).containsExactlyInAnyOrder(Row.of("-D", 2, "B"), 
Row.of("+I", 3, "C"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("+I", 2, "B"), Row.of("-D", 2, "B"), 
Row.of("+I", 3, "C"));
     }
 }

Reply via email to