This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5b19b57c6c [core] Introduce
PrimaryKeyTableUtils.validatePKUpsertDeletable to reuse codes
5b19b57c6c is described below
commit 5b19b57c6cf153874e91593a807bf8370a7922f8
Author: JingsongLi <[email protected]>
AuthorDate: Mon Aug 4 16:48:34 2025 +0800
[core] Introduce PrimaryKeyTableUtils.validatePKUpsertDeletable to reuse
codes
---
.../apache/paimon/table/PrimaryKeyTableUtils.java | 54 ++++++++++++++++++++++
.../SupportsRowLevelOperationFlinkTableSink.java | 54 ++--------------------
.../commands/DeleteFromPaimonTableCommand.scala | 26 ++++-------
3 files changed, 65 insertions(+), 69 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index a47e44718e..de320d78f9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -34,6 +34,11 @@ import org.apache.paimon.types.RowType;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
+import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
+import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
@@ -94,4 +99,53 @@ public class PrimaryKeyTableUtils {
return schema.fields();
}
}
+
+ /**
+ * This method checks if a table is properly configured to handle delete
operations by primary
+ * key upsert. It checks primary key and merge-engine.
+ */
+ public static void validatePKUpsertDeletable(Table table) {
+ if (table.primaryKeys().isEmpty()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "table '%s' can not support delete, because there
is no primary key.",
+ table.getClass().getName()));
+ }
+
+ Options options = Options.fromMap(table.options());
+ CoreOptions.MergeEngine mergeEngine = options.get(MERGE_ENGINE);
+
+ switch (mergeEngine) {
+ case DEDUPLICATE:
+ return;
+ case PARTIAL_UPDATE:
+ if (options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)
+ ||
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null) {
+ return;
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s doesn't support batch
delete by default. To support batch delete, "
+ + "please set %s to true when
there is no %s or set %s.",
+ mergeEngine,
+
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key(),
+ SEQUENCE_GROUP,
+
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
+ }
+ case AGGREGATE:
+ if (options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE)) {
+ return;
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s doesn't support batch
delete by default. To support batch delete, "
+ + "please set %s to true.",
+ mergeEngine,
AGGREGATION_REMOVE_RECORD_ON_DELETE.key()));
+ }
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s can not support batch
delete.", mergeEngine));
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 6bdd17055c..fc4357907d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -53,13 +53,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
-import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
-import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
-import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
+import static
org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Flink table sink that supports row level update and delete. */
@@ -135,7 +132,7 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
@Override
public RowLevelDeleteInfo applyRowLevelDelete(
@Nullable RowLevelModificationScanContext
rowLevelModificationScanContext) {
- validateDeletable();
+ validatePKUpsertDeletable(table);
return new RowLevelDeleteInfo() {};
}
@@ -143,7 +140,7 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
@Override
public boolean applyDeleteFilters(List<ResolvedExpression> list) {
- validateDeletable();
+ validatePKUpsertDeletable(table);
List<Predicate> predicates = new ArrayList<>();
RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());
for (ResolvedExpression filter : list) {
@@ -174,51 +171,6 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
}
}
- private void validateDeletable() {
- if (table.primaryKeys().isEmpty()) {
- throw new UnsupportedOperationException(
- String.format(
- "table '%s' can not support delete, because there
is no primary key.",
- table.getClass().getName()));
- }
-
- Options options = Options.fromMap(table.options());
- MergeEngine mergeEngine = options.get(MERGE_ENGINE);
-
- switch (mergeEngine) {
- case DEDUPLICATE:
- return;
- case PARTIAL_UPDATE:
- if (options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)
- ||
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null) {
- return;
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "Merge engine %s doesn't support batch
delete by default. To support batch delete, "
- + "please set %s to true when
there is no %s or set %s.",
- mergeEngine,
-
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key(),
- SEQUENCE_GROUP,
-
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
- }
- case AGGREGATE:
- if (options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE)) {
- return;
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "Merge engine %s doesn't support batch
delete by default. To support batch delete, "
- + "please set %s to true.",
- mergeEngine,
AGGREGATION_REMOVE_RECORD_ON_DELETE.key()));
- }
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Merge engine %s can not support batch
delete.", mergeEngine));
- }
- }
-
private boolean canPushDownDeleteFilter() {
CoreOptions options = CoreOptions.fromMap(table.options());
return (deletePredicate == null || deleteIsDropPartition())
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index dab18b444d..55a387dd0f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -18,13 +18,11 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.MergeEngine
-import org.apache.paimon.options.Options
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.types.RowKind
import org.apache.paimon.utils.InternalRowPartitionComputer
@@ -95,7 +93,7 @@ case class DeleteFromPaimonTableCommand(
dvSafeWriter.commit(Seq.empty)
}
} else {
- val commitMessages = if (usePrimaryKeyDelete()) {
+ val commitMessages = if (usePKUpsertDelete()) {
performPrimaryKeyDelete(sparkSession)
} else {
performNonPrimaryKeyDelete(sparkSession)
@@ -107,20 +105,12 @@ case class DeleteFromPaimonTableCommand(
Seq.empty[Row]
}
- /**
- * Maintain alignment with
- *
org.apache.paimon.flink.sink.SupportsRowLevelOperationFlinkTableSink#validateDeletable
- * @return
- */
- private def usePrimaryKeyDelete(): Boolean = withPrimaryKeys && {
- val options = Options.fromMap(table.options())
- table.coreOptions().mergeEngine() match {
- case MergeEngine.DEDUPLICATE => true
- case MergeEngine.PARTIAL_UPDATE =>
- options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE) ||
-
options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null
- case MergeEngine.AGGREGATE =>
options.get(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE)
- case _ => false
+ private def usePKUpsertDelete(): Boolean = {
+ try {
+ validatePKUpsertDeletable(table)
+ true
+ } catch {
+ case _: UnsupportedOperationException => false
}
}