This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 00a36e35ba [core] fix the issue where streaming reading of overwrite
data would fail when retract type data appeared. (#4697)
00a36e35ba is described below
commit 00a36e35bafbe10893725418443ea8fa1cd85c30
Author: liming.1018 <[email protected]>
AuthorDate: Fri Dec 13 09:50:29 2024 +0800
[core] fix the issue where streaming reading of overwrite data would fail
when retract type data appeared. (#4697)
---
.../IncrementalChangelogReadProvider.java | 8 ++---
.../apache/paimon/flink/ReadWriteTableITCase.java | 37 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index 308c09d142..eb41d02669 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -60,20 +60,20 @@ public class IncrementalChangelogReadProvider implements
SplitReadProvider {
ConcatRecordReader.create(
() ->
new ReverseReader(
- read.createNoMergeReader(
+ read.createMergeReader(
split.partition(),
split.bucket(),
split.beforeFiles(),
split.beforeDeletionFiles()
.orElse(null),
- true)),
+ false)),
() ->
- read.createNoMergeReader(
+ read.createMergeReader(
split.partition(),
split.bucket(),
split.dataFiles(),
split.deletionFiles().orElse(null),
- true));
+ false));
return unwrap(reader);
};
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 10de1ae483..732e964542 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -801,6 +801,43 @@ public class ReadWriteTableITCase extends AbstractTestBase
{
streamingItr.close();
}
+ @Test
+ public void testStreamingReadOverwriteWithDeleteRecords() throws Exception
{
+ String table =
+ createTable(
+ Arrays.asList("currency STRING", "rate BIGINT", "dt
STRING"),
+ Collections.singletonList("currency"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ streamingReadOverwrite);
+
+ insertInto(
+ table,
+ "('US Dollar', 102, '2022-01-01')",
+ "('Yen', 1, '2022-01-02')",
+ "('Euro', 119, '2022-01-02')");
+
+ bEnv.executeSql(String.format("DELETE FROM %s WHERE currency =
'Euro'", table)).await();
+
+ checkFileStorePath(table, Collections.emptyList());
+
+ // test projection and filter
+ BlockingIterator<Row, Row> streamingItr =
+ testStreamingRead(
+ buildQuery(table, "currency, rate", "WHERE dt =
'2022-01-02'"),
+ Collections.singletonList(changelogRow("+I", "Yen",
1L)));
+
+ insertOverwrite(table, "('US Dollar', 100, '2022-01-02')", "('Yen',
10, '2022-01-01')");
+
+ validateStreamingReadResult(
+ streamingItr,
+ Arrays.asList(
+ changelogRow("-D", "Yen", 1L), changelogRow("+I", "US
Dollar", 100L)));
+ assertNoMoreRecords(streamingItr);
+
+ streamingItr.close();
+ }
+
@Test
public void testUnsupportStreamingReadOverwriteWithoutPk() {
assertThatThrownBy(