This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 90014271c3 [core] Incremental delta scan should be in stream mode, not
mergeing (#5361)
90014271c3 is described below
commit 90014271c3470c1dbfd567767843b99df717563a
Author: JackeyLee007 <[email protected]>
AuthorDate: Fri Mar 28 16:35:04 2025 +0800
[core] Incremental delta scan should be in stream mode, not mergeing (#5361)
---
.../snapshot/IncrementalDeltaStartingScanner.java | 1 +
.../apache/paimon/table/IncrementalTableTest.java | 19 ++++++++++-------
.../table/IncrementalTimeStampTableTest.java | 24 ++++++++++++++++------
.../IncrementalDeltaStartingScannerTest.java | 7 ++++++-
.../apache/paimon/flink/BatchFileStoreITCase.java | 4 +++-
5 files changed, 40 insertions(+), 15 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index df837bab0a..0cfd360433 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -131,6 +131,7 @@ public class IncrementalDeltaStartingScanner extends
AbstractStartingScanner {
.collect(Collectors.toList()))) {
DataSplit.Builder dataSplitBuilder =
DataSplit.builder()
+ .isStreaming(true)
.withSnapshot(endingSnapshotId)
.withPartition(partition)
.withBucket(bucket)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 4810e9f4f5..d3d6aa6ad9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -104,10 +104,13 @@ public class IncrementalTableTest extends TableTestBase {
List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN,
"2,5"));
assertThat(result)
.containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1),
GenericRow.of(1, 1, 4),
GenericRow.of(1, 2, 4),
- GenericRow.of(2, 1, 4),
- GenericRow.of(2, 2, 1));
+ GenericRow.of(2, 1, 4));
}
@Test
@@ -208,12 +211,14 @@ public class IncrementalTableTest extends TableTestBase {
List<InternalRow> result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN,
"1,3"));
assertThat(result)
.containsExactlyInAnyOrder(
- GenericRow.of(fromString("+I"), 2, 1, 3),
- GenericRow.of(fromString("+I"), 2, 2, 1),
- GenericRow.of(fromString("+I"), 1, 1, 2),
- GenericRow.of(fromString("+I"), 1, 4, 1),
+ GenericRow.of(fromString("-D"), 1, 1, 1),
GenericRow.of(fromString("-D"), 1, 2, 1),
- GenericRow.of(fromString("-D"), 1, 3, 1));
+ GenericRow.of(fromString("+I"), 1, 4, 1),
+ GenericRow.of(fromString("+I"), 2, 1, 2),
+ GenericRow.of(fromString("-D"), 1, 3, 1),
+ GenericRow.of(fromString("+I"), 1, 1, 2),
+ GenericRow.of(fromString("+I"), 2, 1, 3),
+ GenericRow.of(fromString("+I"), 2, 2, 1));
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
index 147021daf5..c6a8e2abf2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -129,9 +129,12 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
String.format("%s,%s", timestampEarliest,
timestampSnapshot2)));
assertThat(result2)
.containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1),
GenericRow.of(1, 1, 2),
GenericRow.of(1, 2, 2),
- GenericRow.of(1, 3, 1),
GenericRow.of(1, 4, 1),
GenericRow.of(2, 1, 2));
result2 =
@@ -144,9 +147,12 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
timestampEarliestString,
timestampSnapshot2String)));
assertThat(result2)
.containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1),
GenericRow.of(1, 1, 2),
GenericRow.of(1, 2, 2),
- GenericRow.of(1, 3, 1),
GenericRow.of(1, 4, 1),
GenericRow.of(2, 1, 2));
@@ -158,10 +164,13 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
String.format("%s,%s", timestampSnapshot2,
timestampSnapshot4)));
assertThat(result3)
.containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1),
GenericRow.of(1, 1, 4),
GenericRow.of(1, 2, 4),
- GenericRow.of(2, 1, 4),
- GenericRow.of(2, 2, 1));
+ GenericRow.of(2, 1, 4));
result3 =
read(
table,
@@ -172,10 +181,13 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
timestampSnapshot2String,
timestampSnapshot4String)));
assertThat(result3)
.containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1),
GenericRow.of(1, 1, 4),
GenericRow.of(1, 2, 4),
- GenericRow.of(2, 1, 4),
- GenericRow.of(2, 2, 1));
+ GenericRow.of(2, 1, 4));
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index b19d686577..5da9cbbf67 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -74,7 +74,12 @@ public class IncrementalDeltaStartingScannerTest extends
ScannerTestBase {
List<Split> splits =
table.copy(dynamicOptions).newScan().plan().splits();
assertThat(getResult(table.newRead(), splits))
.hasSameElementsAs(
- Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I
3|40|400", "+U 3|40|500"));
+ Arrays.asList(
+ "+I 2|20|200",
+ "+I 1|10|100",
+ "+I 3|40|400",
+ "-U 3|40|400",
+ "+U 3|40|500"));
dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "delta");
splits = table.copy(dynamicOptions).newScan().plan().splits();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 8d55c0e647..f1d8a44ca5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -771,6 +771,8 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
sql(
"SELECT * FROM `test_scan_mode$audit_log` "
+ "/*+
OPTIONS('incremental-between'='1,8','incremental-between-scan-mode'='delta')
*/");
- assertThat(result).containsExactlyInAnyOrder(Row.of("-D", 2, "B"),
Row.of("+I", 3, "C"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("+I", 2, "B"), Row.of("-D", 2, "B"),
Row.of("+I", 3, "C"));
}
}