This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 55e4e74012 [core] Optimize drop partitions to avoid stack overflow
(#4663)
55e4e74012 is described below
commit 55e4e740128a10a9352be0af7906fb6c7d3bcda9
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 9 16:41:38 2024 +0800
[core] Optimize drop partitions to avoid stack overflow (#4663)
This closes #4663.
---
.../paimon/utils/InternalRowPartitionComputer.java | 18 ++++++++++++
.../paimon/operation/FileStoreCommitImpl.java | 32 ++++++++++++++--------
.../paimon/partition/PartitionPredicate.java | 15 ++++++++++
3 files changed, 54 insertions(+), 11 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index 8de2720a26..6bb26d7613 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.types.DataType;
@@ -32,6 +33,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.paimon.utils.InternalRowUtils.createNullCheckingFieldGetter;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.TypeUtils.castFromString;
/** PartitionComputer for {@link InternalRow}. */
@@ -102,6 +104,22 @@ public class InternalRowPartitionComputer {
return partValues;
}
+ public static GenericRow convertSpecToInternalRow(
+ Map<String, String> spec, RowType partType, String
defaultPartValue) {
+ checkArgument(spec.size() == partType.getFieldCount());
+ GenericRow partRow = new GenericRow(spec.size());
+ List<String> fieldNames = partType.getFieldNames();
+ for (Map.Entry<String, String> entry : spec.entrySet()) {
+ Object value =
+ defaultPartValue.equals(entry.getValue())
+ ? null
+ : castFromString(
+ entry.getValue(),
partType.getField(entry.getKey()).type());
+ partRow.setField(fieldNames.indexOf(entry.getKey()), value);
+ }
+ return partRow;
+ }
+
public static String partToSimpleString(
RowType partitionType, BinaryRow partition, String delimiter, int
maxLength) {
FieldGetter[] getters = partitionType.fieldGetters();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 43faadc4d8..547c6e29be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -83,6 +83,7 @@ import static
org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
+import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
@@ -530,17 +531,26 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
}
- // partitions may be partial partition fields, so here must to use
predicate way.
- Predicate predicate =
- partitions.stream()
- .map(
- partition ->
- createPartitionPredicate(
- partition, partitionType,
partitionDefaultName))
- .reduce(PredicateBuilder::or)
- .orElseThrow(() -> new RuntimeException("Failed to get
partition filter."));
- PartitionPredicate partitionFilter =
- PartitionPredicate.fromPredicate(partitionType, predicate);
+ boolean fullMode =
+ partitions.stream().allMatch(part -> part.size() ==
partitionType.getFieldCount());
+ PartitionPredicate partitionFilter;
+ if (fullMode) {
+ List<BinaryRow> binaryPartitions =
+ createBinaryPartitions(partitions, partitionType,
partitionDefaultName);
+ partitionFilter = PartitionPredicate.fromMultiple(partitionType,
binaryPartitions);
+ } else {
+ // partitions may be partial partition fields, so here must to use
predicate way.
+ Predicate predicate =
+ partitions.stream()
+ .map(
+ partition ->
+ createPartitionPredicate(
+ partition, partitionType,
partitionDefaultName))
+ .reduce(PredicateBuilder::or)
+ .orElseThrow(
+ () -> new RuntimeException("Failed to get
partition filter."));
+ partitionFilter = PartitionPredicate.fromPredicate(partitionType,
predicate);
+ }
tryOverwrite(
partitionFilter,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 12ea884be1..1f6c2cfe45 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -19,8 +19,10 @@
package org.apache.paimon.partition;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.format.SimpleColStats;
@@ -33,6 +35,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -40,6 +43,7 @@ import java.util.Map;
import java.util.Set;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
+import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -231,4 +235,15 @@ public interface PartitionPredicate {
.map(p -> createPartitionPredicate(p, rowType,
defaultPartValue))
.toArray(Predicate[]::new));
}
+
+ static List<BinaryRow> createBinaryPartitions(
+ List<Map<String, String>> partitions, RowType partitionType,
String defaultPartValue) {
+ InternalRowSerializer serializer = new
InternalRowSerializer(partitionType);
+ List<BinaryRow> result = new ArrayList<>();
+ for (Map<String, String> spec : partitions) {
+ GenericRow row = convertSpecToInternalRow(spec, partitionType,
defaultPartValue);
+ result.add(serializer.toBinaryRow(row).copy());
+ }
+ return result;
+ }
}