This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d4edc22aae [core] Fix incremental query with delete after minor
compact (#5115)
d4edc22aae is described below
commit d4edc22aaed6c029312c17aeec5d7e7b44593443
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Feb 20 10:18:27 2025 +0800
[core] Fix incremental query with delete after minor compact (#5115)
---
.../source/splitread/IncrementalDiffSplitRead.java | 2 +-
.../spark/sql/TableValuedFunctionsTest.scala | 40 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 2e236b1dff..3b9f28d191 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -98,7 +98,7 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
split.bucket(),
split.dataFiles(),
split.deletionFiles().orElse(null),
- false),
+ forceKeepDelete),
mergeRead.keyComparator(),
mergeRead.createUdsComparator(),
mergeRead.mergeSorter(),
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index addf846100..117a5af02d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -245,6 +245,46 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
}
}
+ test("Table Valued Functions: incremental query with delete after minor
compact") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id INT) USING paimon
+ |TBLPROPERTIES ('primary-key'='id', 'bucket' = '1', 'write-only' =
'true')
+ |""".stripMargin)
+
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id FROM range (1,
100001)")
+ sql("CALL sys.compact(table => 'T')")
+ sql("INSERT INTO t VALUES 100001")
+ sql("INSERT INTO t VALUES 100002")
+ sql("CALL sys.create_tag('t', 'tag1')")
+
+ sql(
+ "CALL sys.compact(table => 'T', compact_strategy => 'minor', options
=> 'num-sorted-run.compaction-trigger=2')")
+ sql("DELETE FROM t WHERE id = 999")
+ sql("CALL sys.create_tag('t', 'tag2')")
+
+ // tag1 tag2
+ // l0 f(+I 10001),f(+I 10002) f(-D 999)
+ // l1
+ // l2
+ // l3
+ // l4 f(+I 10001,10002)
+ // l5 f(+I 1-10000) f(+I 1-10000)
+ checkAnswer(
+ sql("SELECT level FROM `t$files` VERSION AS OF 'tag1' ORDER BY level"),
+ Seq(Row(0), Row(0), Row(5)))
+ checkAnswer(
+ sql("SELECT level FROM `t$files` VERSION AS OF 'tag2' ORDER BY level"),
+ Seq(Row(0), Row(4), Row(5)))
+
+ // before files: f(+I 10001), f(+I 10002)
+ // after files: f(-D 999), f(+I 10001,10002)
+ checkAnswer(
+ sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1',
'tag2') ORDER BY id"),
+ Seq(Row("-D", 999)))
+ }
+ }
+
private def incrementalDF(tableIdent: String, start: Int, end: Int):
DataFrame = {
spark.read
.format("paimon")