This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ef43c411e0 [core] Fix incremental query audit log table with delete 
(#5225)
ef43c411e0 is described below

commit ef43c411e08d087fa316099331def16101b9e98b
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Mar 7 11:43:04 2025 +0800

    [core] Fix incremental query audit log table with delete (#5225)
---
 .../source/splitread/IncrementalDiffSplitRead.java | 13 ++++++--
 .../spark/sql/TableValuedFunctionsTest.scala       | 35 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 3b9f28d191..2827e450af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -191,6 +191,7 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
         @Nullable
         @Override
         public KeyValue getResult() {
+            KeyValue toReturn = null;
             if (kvs.size() == 1) {
                 KeyValue kv = kvs.get(0);
                 if (kv.level() == BEFORE_LEVEL) {
@@ -198,19 +199,25 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
                         return kv.replaceValueKind(RowKind.DELETE);
                     }
                 } else {
-                    return kv;
+                    toReturn = kv;
                 }
             } else if (kvs.size() == 2) {
                 KeyValue latest = kvs.get(1);
                 if (latest.level() == AFTER_LEVEL) {
-                    if (!valueEquals()) {
-                        return latest;
+                    // Return when the value or rowKind is different. Since 
before is always add, we
+                    // only need to check if after is not add.
+                    if (!valueEquals() || !latest.isAdd()) {
+                        toReturn = latest;
                     }
                 }
             } else {
                 throw new IllegalArgumentException("Illegal kv number: " + 
kvs.size());
             }
 
+            if (toReturn != null && (keepDelete || toReturn.isAdd())) {
+                return toReturn;
+            }
+
             return null;
         }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 117a5af02d..4f0ca73071 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -285,6 +285,41 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
     }
   }
 
+  test("Table Valued Functions: incremental query with delete after compact") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id INT) USING paimon
+            |TBLPROPERTIES ('primary-key'='id', 'bucket' = '1', 'write-only' = 
'true')
+            |""".stripMargin)
+
+      sql("INSERT INTO t VALUES 1")
+      sql("INSERT INTO t VALUES 2")
+      sql("CALL sys.create_tag('t', 'tag1')")
+
+      sql("CALL sys.compact(table => 'T')")
+      sql("DELETE FROM t WHERE id = 1")
+      sql("CALL sys.create_tag('t', 'tag2')")
+
+      //         tag1                    tag2
+      // l0      f(+I 1),f(+I 2)         f(-D 1)
+      // l1
+      // l2
+      // l3
+      // l4
+      // l5                              f(+I 1,2)
+      checkAnswer(
+        sql("SELECT level FROM `t$files` VERSION AS OF 'tag1' ORDER BY level"),
+        Seq(Row(0), Row(0)))
+      checkAnswer(
+        sql("SELECT level FROM `t$files` VERSION AS OF 'tag2' ORDER BY level"),
+        Seq(Row(0), Row(5)))
+
+      checkAnswer(
+        sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1', 
'tag2') ORDER BY id"),
+        Seq(Row("-D", 1)))
+    }
+  }
+
   private def incrementalDF(tableIdent: String, start: Int, end: Int): 
DataFrame = {
     spark.read
       .format("paimon")

Reply via email to