This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a49a29939a [spark] fix audit_log streaming read for rowkind delete
(#6021)
a49a29939a is described below
commit a49a29939ad9e18c16965b1e8ef59779ec8635a9
Author: Yu Gan <[email protected]>
AuthorDate: Mon Aug 4 16:27:57 2025 +0800
[spark] fix audit_log streaming read for rowkind delete (#6021)
---
.../commands/DeleteFromPaimonTableCommand.scala | 25 +++++--
.../spark/sql/AuditLogStreamingReadTest.scala | 83 ++++++++++++++++++++++
.../paimon/spark/sql/DeleteFromTableTestBase.scala | 32 +++++++++
3 files changed, 133 insertions(+), 7 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index a1505593ef..dab18b444d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -18,14 +18,14 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.MergeEngine
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.options.Options
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
-import org.apache.paimon.spark.util.SQLHelper
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
+import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.types.RowKind
import org.apache.paimon.utils.InternalRowPartitionComputer
@@ -37,8 +37,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter,
SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit
-import java.util.UUID
-
import scala.collection.JavaConverters._
case class DeleteFromPaimonTableCommand(
@@ -109,8 +107,21 @@ case class DeleteFromPaimonTableCommand(
Seq.empty[Row]
}
- private def usePrimaryKeyDelete(): Boolean = {
- withPrimaryKeys && table.coreOptions().mergeEngine() ==
MergeEngine.DEDUPLICATE
+ /**
+ * Maintain alignment with
+ *
org.apache.paimon.flink.sink.SupportsRowLevelOperationFlinkTableSink#validateDeletable
+ * @return
+ */
+ private def usePrimaryKeyDelete(): Boolean = withPrimaryKeys && {
+ val options = Options.fromMap(table.options())
+ table.coreOptions().mergeEngine() match {
+ case MergeEngine.DEDUPLICATE => true
+ case MergeEngine.PARTIAL_UPDATE =>
+ options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE) ||
+
options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null
+ case MergeEngine.AGGREGATE =>
options.get(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE)
+ case _ => false
+ }
}
private def performPrimaryKeyDelete(sparkSession: SparkSession):
Seq[CommitMessage] = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AuditLogStreamingReadTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AuditLogStreamingReadTest.scala
new file mode 100644
index 0000000000..b98147c598
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AuditLogStreamingReadTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+class AuditLogStreamingReadTest extends PaimonSparkTestBase {
+
+ test(s"test delete with primary key, partial-update,
remove-record-on-delete, lookup") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, age INT)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'id', 'bucket' = '4',
+ | 'merge-engine' = 'partial-update',
+ | 'partial-update.remove-record-on-delete' = 'true',
+ | 'changelog-producer' = 'lookup')
+ |""".stripMargin)
+
+ withTempDir {
+ checkpointDir =>
+ {
+ val readStream = spark.readStream
+ .format("paimon")
+ .table("`T$audit_log`")
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+
+ try {
+ // insert
+ spark.sql("INSERT INTO T VALUES (1, 'a', NULL)")
+ spark.sql("INSERT INTO T VALUES (2, 'b', NULL)")
+ // update
+ spark.sql("INSERT INTO T VALUES (1, NULL, 16)")
+
+ assertThat(spark.sql("SELECT * FROM T").collectAsList().toString)
+ .isEqualTo("[[2,b,null], [1,a,16]]")
+
+ // delete
+ spark.sql("DELETE FROM T WHERE id = 1")
+
+ assertThat(spark.sql("SELECT * FROM T").collectAsList().toString)
+ .isEqualTo("[[2,b,null]]")
+
+ readStream.processAllAvailable()
+ checkAnswer(
+ currentResult(),
+ Row("+I", 1, "a", null) ::
+ Row("+I", 2, "b", null) ::
+ Row("-U", 1, "a", null) ::
+ Row("+U", 1, "a", 16) ::
+ Row("-D", 1, "a", 16) :: Nil
+ )
+ } finally {
+ readStream.stop()
+ }
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 43fe3d71b2..6b38c1f5f0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -403,4 +403,36 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
val paths2 = spark.sql("SELECT __paimon_file_path FROM T").collect()
assert(paths2.length == 0)
}
+
+ CoreOptions.MergeEngine.values().foreach {
+ mergeEngine =>
+ {
+ test(s"test delete with lookup, $mergeEngine") {
+
+ val otherOptions = mergeEngine match {
+ case MergeEngine.PARTIAL_UPDATE =>
"'partial-update.remove-record-on-delete' = 'true',"
+ case MergeEngine.AGGREGATE =>
"'aggregation.remove-record-on-delete' = 'true',"
+ case _ => ""
+ }
+
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, age INT)
+ |TBLPROPERTIES (
+ | 'changelog-producer' = 'lookup',
+ | $otherOptions
+ | 'primary-key' = 'id',
+ | 'merge-engine' = '$mergeEngine')
+ |""".stripMargin)
+ // insert
+ spark.sql("INSERT INTO T VALUES (1, 'a', NULL)")
+ spark.sql("INSERT INTO T VALUES (2, 'b', NULL)")
+ // update
+ spark.sql("INSERT INTO T VALUES (1, NULL, 16)")
+ // delete
+ spark.sql("DELETE FROM T WHERE id = 1")
+ assertThat(spark.sql("SELECT * FROM T").collectAsList().toString)
+ .isEqualTo("[[2,b,null]]")
+ }
+ }
+ }
}