This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 55e4e74012 [core] Optimize drop partitions to avoid stack overflow 
(#4663)
55e4e74012 is described below

commit 55e4e740128a10a9352be0af7906fb6c7d3bcda9
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 9 16:41:38 2024 +0800

    [core] Optimize drop partitions to avoid stack overflow (#4663)
    
    This closes #4663.
---
 .../paimon/utils/InternalRowPartitionComputer.java | 18 ++++++++++++
 .../paimon/operation/FileStoreCommitImpl.java      | 32 ++++++++++++++--------
 .../paimon/partition/PartitionPredicate.java       | 15 ++++++++++
 3 files changed, 54 insertions(+), 11 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index 8de2720a26..6bb26d7613 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.types.DataType;
@@ -32,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.paimon.utils.InternalRowUtils.createNullCheckingFieldGetter;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.TypeUtils.castFromString;
 
 /** PartitionComputer for {@link InternalRow}. */
@@ -102,6 +104,22 @@ public class InternalRowPartitionComputer {
         return partValues;
     }
 
+    public static GenericRow convertSpecToInternalRow(
+            Map<String, String> spec, RowType partType, String 
defaultPartValue) {
+        checkArgument(spec.size() == partType.getFieldCount());
+        GenericRow partRow = new GenericRow(spec.size());
+        List<String> fieldNames = partType.getFieldNames();
+        for (Map.Entry<String, String> entry : spec.entrySet()) {
+            Object value =
+                    defaultPartValue.equals(entry.getValue())
+                            ? null
+                            : castFromString(
+                                    entry.getValue(), 
partType.getField(entry.getKey()).type());
+            partRow.setField(fieldNames.indexOf(entry.getKey()), value);
+        }
+        return partRow;
+    }
+
     public static String partToSimpleString(
             RowType partitionType, BinaryRow partition, String delimiter, int 
maxLength) {
         FieldGetter[] getters = partitionType.fieldGetters();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 43faadc4d8..547c6e29be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -83,6 +83,7 @@ import static 
org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.apache.paimon.manifest.ManifestEntry.recordCount;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
+import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
@@ -530,17 +531,26 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
         }
 
-        // partitions may be partial partition fields, so here must to use 
predicate way.
-        Predicate predicate =
-                partitions.stream()
-                        .map(
-                                partition ->
-                                        createPartitionPredicate(
-                                                partition, partitionType, 
partitionDefaultName))
-                        .reduce(PredicateBuilder::or)
-                        .orElseThrow(() -> new RuntimeException("Failed to get 
partition filter."));
-        PartitionPredicate partitionFilter =
-                PartitionPredicate.fromPredicate(partitionType, predicate);
+        boolean fullMode =
+                partitions.stream().allMatch(part -> part.size() == 
partitionType.getFieldCount());
+        PartitionPredicate partitionFilter;
+        if (fullMode) {
+            List<BinaryRow> binaryPartitions =
+                    createBinaryPartitions(partitions, partitionType, 
partitionDefaultName);
+            partitionFilter = PartitionPredicate.fromMultiple(partitionType, 
binaryPartitions);
+        } else {
+            // partitions may be partial partition fields, so here must to use 
predicate way.
+            Predicate predicate =
+                    partitions.stream()
+                            .map(
+                                    partition ->
+                                            createPartitionPredicate(
+                                                    partition, partitionType, 
partitionDefaultName))
+                            .reduce(PredicateBuilder::or)
+                            .orElseThrow(
+                                    () -> new RuntimeException("Failed to get 
partition filter."));
+            partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
+        }
 
         tryOverwrite(
                 partitionFilter,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 12ea884be1..1f6c2cfe45 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -19,8 +19,10 @@
 package org.apache.paimon.partition;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.format.SimpleColStats;
@@ -33,6 +35,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,6 +43,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
+import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
@@ -231,4 +235,15 @@ public interface PartitionPredicate {
                         .map(p -> createPartitionPredicate(p, rowType, 
defaultPartValue))
                         .toArray(Predicate[]::new));
     }
+
+    static List<BinaryRow> createBinaryPartitions(
+            List<Map<String, String>> partitions, RowType partitionType, 
String defaultPartValue) {
+        InternalRowSerializer serializer = new 
InternalRowSerializer(partitionType);
+        List<BinaryRow> result = new ArrayList<>();
+        for (Map<String, String> spec : partitions) {
+            GenericRow row = convertSpecToInternalRow(spec, partitionType, 
defaultPartValue);
+            result.add(serializer.toBinaryRow(row).copy());
+        }
+        return result;
+    }
 }

Reply via email to