This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b19b57c6c [core] Introduce 
PrimaryKeyTableUtils.validatePKUpsertDeletable to reuse codes
5b19b57c6c is described below

commit 5b19b57c6cf153874e91593a807bf8370a7922f8
Author: JingsongLi <[email protected]>
AuthorDate: Mon Aug 4 16:48:34 2025 +0800

    [core] Introduce PrimaryKeyTableUtils.validatePKUpsertDeletable to reuse 
codes
---
 .../apache/paimon/table/PrimaryKeyTableUtils.java  | 54 ++++++++++++++++++++++
 .../SupportsRowLevelOperationFlinkTableSink.java   | 54 ++--------------------
 .../commands/DeleteFromPaimonTableCommand.scala    | 26 ++++-------
 3 files changed, 65 insertions(+), 69 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index a47e44718e..de320d78f9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -34,6 +34,11 @@ import org.apache.paimon.types.RowType;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
+import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
+import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
 import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;
 import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
 
@@ -94,4 +99,53 @@ public class PrimaryKeyTableUtils {
             return schema.fields();
         }
     }
+
+    /**
+     * This method checks if a table is properly configured to handle delete 
operations by primary
+     * key upsert. It checks primary key and merge-engine.
+     */
+    public static void validatePKUpsertDeletable(Table table) {
+        if (table.primaryKeys().isEmpty()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "table '%s' can not support delete, because there 
is no primary key.",
+                            table.getClass().getName()));
+        }
+
+        Options options = Options.fromMap(table.options());
+        CoreOptions.MergeEngine mergeEngine = options.get(MERGE_ENGINE);
+
+        switch (mergeEngine) {
+            case DEDUPLICATE:
+                return;
+            case PARTIAL_UPDATE:
+                if (options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)
+                        || 
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null) {
+                    return;
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Merge engine %s doesn't support batch 
delete by default. To support batch delete, "
+                                            + "please set %s to true when 
there is no %s or set %s.",
+                                    mergeEngine,
+                                    
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key(),
+                                    SEQUENCE_GROUP,
+                                    
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
+                }
+            case AGGREGATE:
+                if (options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE)) {
+                    return;
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Merge engine %s doesn't support batch 
delete by default. To support batch delete, "
+                                            + "please set %s to true.",
+                                    mergeEngine, 
AGGREGATION_REMOVE_RECORD_ON_DELETE.key()));
+                }
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Merge engine %s can not support batch 
delete.", mergeEngine));
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 6bdd17055c..fc4357907d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -53,13 +53,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
-import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
-import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
-import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
+import static 
org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Flink table sink that supports row level update and delete. */
@@ -135,7 +132,7 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
     @Override
     public RowLevelDeleteInfo applyRowLevelDelete(
             @Nullable RowLevelModificationScanContext 
rowLevelModificationScanContext) {
-        validateDeletable();
+        validatePKUpsertDeletable(table);
         return new RowLevelDeleteInfo() {};
     }
 
@@ -143,7 +140,7 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
 
     @Override
     public boolean applyDeleteFilters(List<ResolvedExpression> list) {
-        validateDeletable();
+        validatePKUpsertDeletable(table);
         List<Predicate> predicates = new ArrayList<>();
         RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());
         for (ResolvedExpression filter : list) {
@@ -174,51 +171,6 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
         }
     }
 
-    private void validateDeletable() {
-        if (table.primaryKeys().isEmpty()) {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "table '%s' can not support delete, because there 
is no primary key.",
-                            table.getClass().getName()));
-        }
-
-        Options options = Options.fromMap(table.options());
-        MergeEngine mergeEngine = options.get(MERGE_ENGINE);
-
-        switch (mergeEngine) {
-            case DEDUPLICATE:
-                return;
-            case PARTIAL_UPDATE:
-                if (options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)
-                        || 
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null) {
-                    return;
-                } else {
-                    throw new UnsupportedOperationException(
-                            String.format(
-                                    "Merge engine %s doesn't support batch 
delete by default. To support batch delete, "
-                                            + "please set %s to true when 
there is no %s or set %s.",
-                                    mergeEngine,
-                                    
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key(),
-                                    SEQUENCE_GROUP,
-                                    
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
-                }
-            case AGGREGATE:
-                if (options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE)) {
-                    return;
-                } else {
-                    throw new UnsupportedOperationException(
-                            String.format(
-                                    "Merge engine %s doesn't support batch 
delete by default. To support batch delete, "
-                                            + "please set %s to true.",
-                                    mergeEngine, 
AGGREGATION_REMOVE_RECORD_ON_DELETE.key()));
-                }
-            default:
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Merge engine %s can not support batch 
delete.", mergeEngine));
-        }
-    }
-
     private boolean canPushDownDeleteFilter() {
         CoreOptions options = CoreOptions.fromMap(table.options());
         return (deletePredicate == null || deleteIsDropPartition())
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index dab18b444d..55a387dd0f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -18,13 +18,11 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.MergeEngine
-import org.apache.paimon.options.Options
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable
 import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.types.RowKind
 import org.apache.paimon.utils.InternalRowPartitionComputer
@@ -95,7 +93,7 @@ case class DeleteFromPaimonTableCommand(
           dvSafeWriter.commit(Seq.empty)
         }
       } else {
-        val commitMessages = if (usePrimaryKeyDelete()) {
+        val commitMessages = if (usePKUpsertDelete()) {
           performPrimaryKeyDelete(sparkSession)
         } else {
           performNonPrimaryKeyDelete(sparkSession)
@@ -107,20 +105,12 @@ case class DeleteFromPaimonTableCommand(
     Seq.empty[Row]
   }
 
-  /**
-   * Maintain alignment with
-   * 
org.apache.paimon.flink.sink.SupportsRowLevelOperationFlinkTableSink#validateDeletable
-   * @return
-   */
-  private def usePrimaryKeyDelete(): Boolean = withPrimaryKeys && {
-    val options = Options.fromMap(table.options())
-    table.coreOptions().mergeEngine() match {
-      case MergeEngine.DEDUPLICATE => true
-      case MergeEngine.PARTIAL_UPDATE =>
-        options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE) ||
-        
options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null
-      case MergeEngine.AGGREGATE => 
options.get(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE)
-      case _ => false
+  private def usePKUpsertDelete(): Boolean = {
+    try {
+      validatePKUpsertDeletable(table)
+      true
+    } catch {
+      case _: UnsupportedOperationException => false
     }
   }
 

Reply via email to