This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 214a27ffeb [flink] Cast Predicate to PartitionPredicate to avoid stack
overflow (#5880)
214a27ffeb is described below
commit 214a27ffeba03a1a37aae364466c17855dbd239e
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jul 14 14:19:01 2025 +0800
[flink] Cast Predicate to PartitionPredicate to avoid stack overflow (#5880)
---
.../main/java/org/apache/paimon/types/RowType.java | 5 ++
.../paimon/append/AppendCompactCoordinator.java | 16 +++--
.../paimon/partition/PartitionPredicate.java | 15 ++++-
.../paimon/table/FallbackReadFileStoreTable.java | 8 +++
.../paimon/table/source/AbstractDataTableScan.java | 7 ++
.../apache/paimon/table/source/InnerTableScan.java | 5 ++
.../apache/paimon/table/source/ReadBuilder.java | 4 ++
.../paimon/table/source/ReadBuilderImpl.java | 30 ++++++++-
.../table/source/snapshot/SnapshotReader.java | 3 +
.../table/source/snapshot/SnapshotReaderImpl.java | 13 +++-
.../apache/paimon/table/system/AuditLogTable.java | 13 ++++
.../apache/paimon/flink/action/CompactAction.java | 55 ++++++++++++----
.../apache/paimon/flink/action/RescaleAction.java | 19 ++----
.../paimon/flink/action/SortCompactAction.java | 2 +-
.../flink/compact/AppendTableCompactBuilder.java | 8 +--
.../flink/source/AppendTableCompactSource.java | 17 +++--
.../paimon/flink/source/BaseDataTableSource.java | 6 +-
.../flink/source/CompactorSourceBuilder.java | 13 ++--
.../paimon/flink/source/FlinkSourceBuilder.java | 15 ++++-
.../paimon/flink/source/FlinkTableSource.java | 41 +++++++-----
.../flink/source/LogHybridSourceFactory.java | 10 ++-
.../paimon/flink/source/SystemTableSource.java | 5 +-
.../paimon/flink/action/CompactActionITCase.java | 53 +++++++++++++++-
.../paimon/flink/sink/CompactorSinkITCase.java | 25 ++++----
.../paimon/flink/source/CompactorSourceITCase.java | 14 ++--
.../flink/source/FileStoreSourceMetricsTest.java | 2 +-
.../paimon/spark/procedure/CompactProcedure.java | 26 +++++---
.../paimon/spark/sql/DeletionVectorTest.scala | 74 ++++++++++++++++++++++
28 files changed, 394 insertions(+), 110 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index e93ef23136..e886c97ca6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -323,6 +323,11 @@ public final class RowType extends DataType {
.copy(isNullable());
}
+ public int[] projectNames(List<String> names) {
+ List<String> fieldNames =
fields.stream().map(DataField::name).collect(Collectors.toList());
+ return names.stream().mapToInt(fieldNames::indexOf).toArray();
+ }
+
public RowType project(String... names) {
return project(Arrays.asList(names));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
index 1fead5e612..07374831e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
@@ -28,7 +28,7 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.EndOfScanException;
@@ -90,7 +90,9 @@ public class AppendCompactCoordinator {
}
public AppendCompactCoordinator(
- FileStoreTable table, boolean isStreaming, @Nullable Predicate
filter) {
+ FileStoreTable table,
+ boolean isStreaming,
+ @Nullable PartitionPredicate partitionPredicate) {
checkArgument(table.primaryKeys().isEmpty());
this.snapshotManager = table.snapshotManager();
CoreOptions options = table.coreOptions();
@@ -103,7 +105,7 @@ public class AppendCompactCoordinator {
options.deletionVectorsEnabled()
? new
DvMaintainerCache(table.store().newIndexFileHandler())
: null;
- this.filesIterator = new FilesIterator(table, isStreaming, filter);
+ this.filesIterator = new FilesIterator(table, isStreaming,
partitionPredicate);
}
public List<AppendCompactTask> run() {
@@ -389,10 +391,12 @@ public class AppendCompactCoordinator {
@Nullable private Iterator<ManifestEntry> currentIterator;
public FilesIterator(
- FileStoreTable table, boolean isStreaming, @Nullable Predicate
filter) {
+ FileStoreTable table,
+ boolean isStreaming,
+ @Nullable PartitionPredicate partitionPredicate) {
this.snapshotReader = table.newSnapshotReader();
- if (filter != null) {
- snapshotReader.withFilter(filter);
+ if (partitionPredicate != null) {
+ snapshotReader.withPartitionFilter(partitionPredicate);
}
// drop stats to reduce memory
if (table.coreOptions().manifestDeleteFileDropStats()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index bfc241d62f..4a9b7d646a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -35,6 +35,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -48,7 +49,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** A special predicate to filter partition only, just like {@link Predicate}.
*/
-public interface PartitionPredicate {
+public interface PartitionPredicate extends Serializable {
boolean test(BinaryRow part);
@@ -282,4 +283,16 @@ public interface PartitionPredicate {
}
return result;
}
+
+ static PartitionPredicate fromMap(
+ RowType partitionType, Map<String, String> values, String
defaultPartValue) {
+ return fromPredicate(
+ partitionType, createPartitionPredicate(values, partitionType,
defaultPartValue));
+ }
+
+ static PartitionPredicate fromMaps(
+ RowType partitionType, List<Map<String, String>> values, String
defaultPartValue) {
+ return fromMultiple(
+ partitionType, createBinaryPartitions(values, partitionType,
defaultPartValue));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 2f8b2a1f0f..c438b93fe6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
@@ -337,6 +338,13 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return this;
}
+ @Override
+ public InnerTableScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ mainScan.withPartitionFilter(partitionPredicate);
+ fallbackScan.withPartitionFilter(partitionPredicate);
+ return this;
+ }
+
@Override
public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter)
{
mainScan.withBucketFilter(bucketFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 7f8714522d..76b348691e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
@@ -138,6 +139,12 @@ abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
+ @Override
+ public AbstractDataTableScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ snapshotReader.withPartitionFilter(partitionPredicate);
+ return this;
+ }
+
@Override
public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
snapshotReader.withLevelFilter(levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 647520a3bc..89f15055d5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
@@ -54,6 +55,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ return this;
+ }
+
default InnerTableScan withBucket(int bucket) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 212d08d907..1f3d52b63c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
@@ -100,6 +101,9 @@ public interface ReadBuilder extends Serializable {
/** Push partition filter. */
ReadBuilder withPartitionFilter(Map<String, String> partitionSpec);
+ /** Push partition filters. */
+ ReadBuilder withPartitionFilter(PartitionPredicate partitionPredicate);
+
ReadBuilder withBucket(int bucket);
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 386425ce79..f12c4fe4be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -18,6 +18,8 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.InnerTable;
@@ -29,6 +31,8 @@ import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
import static org.apache.paimon.utils.Preconditions.checkState;
/** Implementation for {@link ReadBuilder}. */
@@ -37,6 +41,8 @@ public class ReadBuilderImpl implements ReadBuilder {
private static final long serialVersionUID = 1L;
private final InnerTable table;
+ private final RowType partitionType;
+ private final String defaultPartitionName;
private Predicate filter;
@@ -45,7 +51,7 @@ public class ReadBuilderImpl implements ReadBuilder {
private Integer shardIndexOfThisSubtask;
private Integer shardNumberOfParallelSubtasks;
- private Map<String, String> partitionSpec;
+ private @Nullable PartitionPredicate partitionFilter;
private @Nullable Integer specifiedBucket = null;
private Filter<Integer> bucketFilter;
@@ -56,6 +62,8 @@ public class ReadBuilderImpl implements ReadBuilder {
public ReadBuilderImpl(InnerTable table) {
this.table = table;
+ this.partitionType = table.rowType().project(table.partitionKeys());
+ this.defaultPartitionName = new
CoreOptions(table.options()).partitionDefaultName();
}
@Override
@@ -84,7 +92,19 @@ public class ReadBuilderImpl implements ReadBuilder {
@Override
public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
- this.partitionSpec = partitionSpec;
+ if (partitionSpec != null) {
+ this.partitionFilter =
+ fromPredicate(
+ partitionType,
+ createPartitionPredicate(
+ partitionSpec, partitionType,
defaultPartitionName));
+ }
+ return this;
+ }
+
+ @Override
+ public ReadBuilder withPartitionFilter(@Nullable PartitionPredicate
partitions) {
+ this.partitionFilter = partitions;
return this;
}
@@ -154,7 +174,10 @@ public class ReadBuilderImpl implements ReadBuilder {
}
private InnerTableScan configureScan(InnerTableScan scan) {
-
scan.withFilter(filter).withReadType(readType).withPartitionFilter(partitionSpec);
+ // `filter` may contains partition related predicate, but
`partitionFilter` will overwrite
+ // it if `partitionFilter` is not null. So we must avoid to put part
of partition filter in
+ // `filter`, another part in `partitionFilter`
+
scan.withFilter(filter).withReadType(readType).withPartitionFilter(partitionFilter);
checkState(
bucketFilter == null || shardIndexOfThisSubtask == null,
"Bucket filter and shard configuration cannot be used
together. "
@@ -200,6 +223,7 @@ public class ReadBuilderImpl implements ReadBuilder {
ReadBuilderImpl that = (ReadBuilderImpl) o;
return Objects.equals(table.name(), that.table.name())
&& Objects.equals(filter, that.filter)
+ && Objects.equals(partitionFilter, that.partitionFilter)
&& Objects.equals(readType, that.readType);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 12600b7ddc..79a1c94c32 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
@@ -76,6 +77,8 @@ public interface SnapshotReader {
SnapshotReader withPartitionFilter(List<BinaryRow> partitions);
+ SnapshotReader withPartitionFilter(PartitionPredicate partitionPredicate);
+
SnapshotReader withPartitionsFilter(List<Map<String, String>> partitions);
SnapshotReader withMode(ScanMode scanMode);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 5855e88ab1..8239bcabba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -37,6 +37,7 @@ import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.metrics.ScanMetrics;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.TableSchema;
@@ -198,9 +199,19 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ if (partitionPredicate != null) {
+ scan.withPartitionFilter(partitionPredicate);
+ }
+ return this;
+ }
+
@Override
public SnapshotReader withPartitionsFilter(List<Map<String, String>>
partitions) {
- scan.withPartitionsFilter(partitions);
+ if (partitions != null) {
+ scan.withPartitionsFilter(partitions);
+ }
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 3325c7ce11..b9afacec2b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -328,6 +329,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ wrapped.withPartitionFilter(partitionPredicate);
+ return this;
+ }
+
@Override
public SnapshotReader withPartitionsFilter(List<Map<String, String>>
partitions) {
wrapped.withPartitionsFilter(partitions);
@@ -485,6 +492,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public InnerTableScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ batchScan.withPartitionFilter(partitionPredicate);
+ return this;
+ }
+
@Override
public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
batchScan.withBucketFilter(bucketFilter);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 93cfc4b0d3..b0368082aa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -36,11 +36,14 @@ import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
@@ -66,6 +69,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
/** Table compact action for Flink. */
@@ -191,24 +195,38 @@ public class CompactAction extends TableActionBase {
builder.build();
}
- protected Predicate getPredicate() throws Exception {
+ protected PartitionPredicate getPredicate() throws Exception {
Preconditions.checkArgument(
partitions == null || whereSql == null,
"partitions and where cannot be used together.");
Predicate predicate = null;
+ RowType partitionType = table.rowType().project(table.partitionKeys());
+ String partitionDefaultName = ((FileStoreTable)
table).coreOptions().partitionDefaultName();
if (partitions != null) {
- predicate =
- PredicateBuilder.or(
- partitions.stream()
- .map(
- p ->
- createPartitionPredicate(
- p,
- table.rowType(),
- ((FileStoreTable)
table)
-
.coreOptions()
-
.partitionDefaultName()))
- .toArray(Predicate[]::new));
+ boolean fullMode =
+ partitions.stream()
+ .allMatch(part -> part.size() ==
partitionType.getFieldCount());
+ PartitionPredicate partitionFilter;
+ if (fullMode) {
+ List<BinaryRow> binaryPartitions =
+ createBinaryPartitions(partitions, partitionType,
partitionDefaultName);
+ return PartitionPredicate.fromMultiple(partitionType,
binaryPartitions);
+ } else {
+ // partitions may be partial partition fields, so here must to
use predicate way.
+ predicate =
+ partitions.stream()
+ .map(
+ partition ->
+ createPartitionPredicate(
+ partition,
+ table.rowType(),
+ partitionDefaultName))
+ .reduce(PredicateBuilder::or)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Failed to get
partition filter."));
+ }
} else if (whereSql != null) {
SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
new SimpleSqlPredicateConvertor(table.rowType());
@@ -223,9 +241,18 @@ public class CompactAction extends TableActionBase {
Preconditions.checkArgument(
predicate.visit(partitionPredicateVisitor),
"Only partition key can be specialized in compaction
action.");
+ predicate =
+ predicate
+ .visit(
+ new PredicateProjectionConverter(
+
table.rowType().projectNames(table.partitionKeys())))
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Failed to convert
partition predicate."));
}
- return predicate;
+ return PartitionPredicate.fromPredicate(partitionType, predicate);
}
private boolean buildForPostponeBucketCompaction(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 7e83abc3df..fb3d068ed1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -24,11 +24,8 @@ import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -98,14 +95,12 @@ public class RescaleAction extends TableActionBase {
String.valueOf(snapshot.id()));
fileStoreTable = fileStoreTable.copy(dynamicOptions);
- RowType partitionType = fileStoreTable.schema().logicalPartitionType();
- Predicate partitionPredicate =
- PartitionPredicate.createPartitionPredicate(
- partitionType,
- InternalRowPartitionComputer.convertSpecToInternal(
- partition,
- partitionType,
-
fileStoreTable.coreOptions().partitionDefaultName()));
+ PartitionPredicate predicate =
+ PartitionPredicate.fromMap(
+ fileStoreTable.schema().logicalPartitionType(),
+ partition,
+ fileStoreTable.coreOptions().partitionDefaultName());
+
DataStream<RowData> source =
new FlinkSourceBuilder(fileStoreTable)
.env(env)
@@ -114,7 +109,7 @@ public class RescaleAction extends TableActionBase {
scanParallelism == null
? currentBucketNum(snapshot)
: scanParallelism)
- .predicate(partitionPredicate)
+ .partitionPredicate(predicate)
.build();
Map<String, String> bucketOptions = new
HashMap<>(fileStoreTable.options());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 2724a5da3a..535cdfc7a9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -91,7 +91,7 @@ public class SortCompactAction extends CompactAction {
identifier.getObjectName())
.asSummaryString());
- sourceBuilder.predicate(getPredicate());
+ sourceBuilder.partitionPredicate(getPredicate());
String scanParallelism =
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
index 007e625afd..8cd9bca856 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
@@ -25,7 +25,7 @@ import org.apache.paimon.flink.sink.AppendTableCompactSink;
import org.apache.paimon.flink.source.AppendTableCompactSource;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
@@ -62,7 +62,7 @@ public class AppendTableCompactBuilder {
private boolean isContinuous = false;
- @Nullable private Predicate partitionPredicate;
+ @Nullable private PartitionPredicate partitionPredicate;
@Nullable private Duration partitionIdleTime = null;
public AppendTableCompactBuilder(
@@ -76,8 +76,8 @@ public class AppendTableCompactBuilder {
this.isContinuous = isContinuous;
}
- public void withPartitionPredicate(Predicate predicate) {
- this.partitionPredicate = predicate;
+ public void withPartitionPredicate(PartitionPredicate partitionPredicate) {
+ this.partitionPredicate = partitionPredicate;
}
public void withPartitionIdleTime(@Nullable Duration partitionIdleTime) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
index 7c707e850f..dae05ff60f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.append.AppendCompactCoordinator;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
@@ -58,17 +58,17 @@ public class AppendTableCompactSource extends
AbstractNonCoordinatedSource<Appen
private final FileStoreTable table;
private final boolean streaming;
private final long scanInterval;
- private final Predicate filter;
+ private final PartitionPredicate partitionFilter;
public AppendTableCompactSource(
FileStoreTable table,
boolean isStreaming,
long scanInterval,
- @Nullable Predicate filter) {
+ @Nullable PartitionPredicate partitionFilter) {
this.table = table;
this.streaming = isStreaming;
this.scanInterval = scanInterval;
- this.filter = filter;
+ this.partitionFilter = partitionFilter;
}
@Override
@@ -82,7 +82,7 @@ public class AppendTableCompactSource extends
AbstractNonCoordinatedSource<Appen
Preconditions.checkArgument(
readerContext.currentParallelism() == 1,
"Compaction Operator parallelism in paimon MUST be one.");
- return new CompactSourceReader(table, streaming, filter, scanInterval);
+ return new CompactSourceReader(table, streaming, partitionFilter,
scanInterval);
}
/** BucketUnawareCompactSourceReader. */
@@ -92,9 +92,12 @@ public class AppendTableCompactSource extends
AbstractNonCoordinatedSource<Appen
private final long scanInterval;
public CompactSourceReader(
- FileStoreTable table, boolean streaming, Predicate filter,
long scanInterval) {
+ FileStoreTable table,
+ boolean streaming,
+ PartitionPredicate partitions,
+ long scanInterval) {
this.scanInterval = scanInterval;
- compactionCoordinator = new AppendCompactCoordinator(table,
streaming, filter);
+ compactionCoordinator = new AppendCompactCoordinator(table,
streaming, partitions);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 66d903c72b..73ccbf7fb1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -218,7 +218,8 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
.sourceBounded(!unbounded)
.logSourceProvider(logSourceProvider)
.projection(projectFields)
- .predicate(getPredicateWithScanPartitions())
+ .predicate(predicate)
+ .partitionPredicate(partitionPredicate)
.limit(limit)
.watermarkStrategy(watermarkStrategy)
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());
@@ -373,7 +374,8 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
table.newReadBuilder()
.dropStats()
.withProjection(new int[0])
- .withFilter(getPredicateWithScanPartitions())
+ .withFilter(predicate)
+ .withPartitionFilter(partitionPredicate)
.newScan()
.plan()
.splits();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 47e1997fb6..ada68c05c0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -24,7 +24,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.CompactBucketsTable;
@@ -62,7 +62,7 @@ public class CompactorSourceBuilder {
private boolean isContinuous = false;
private StreamExecutionEnvironment env;
- @Nullable private Predicate partitionPredicate = null;
+ @Nullable private PartitionPredicate partitionPredicate = null;
@Nullable private Duration partitionIdleTime = null;
public CompactorSourceBuilder(String tableIdentifier, FileStoreTable
table) {
@@ -89,8 +89,10 @@ public class CompactorSourceBuilder {
compactBucketsTable =
compactBucketsTable.copy(
isContinuous ? streamingCompactOptions() :
batchCompactOptions());
- ReadBuilder readBuilder =
-
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
+ ReadBuilder readBuilder = compactBucketsTable.newReadBuilder();
+ if (partitionPredicate != null) {
+ readBuilder.withPartitionFilter(partitionPredicate);
+ }
if
(CoreOptions.fromMap(table.options()).manifestDeleteFileDropStats()) {
readBuilder = readBuilder.dropStats();
}
@@ -174,7 +176,8 @@ public class CompactorSourceBuilder {
};
}
- public CompactorSourceBuilder withPartitionPredicate(@Nullable Predicate
partitionPredicate) {
+ public CompactorSourceBuilder withPartitionPredicate(
+ @Nullable PartitionPredicate partitionPredicate) {
this.partitionPredicate = partitionPredicate;
return this;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index b5ef9c469f..5f9f01f1cc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -31,6 +31,7 @@ import
org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.operator.MonitorSource;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -90,6 +91,7 @@ public class FlinkSourceBuilder {
private StreamExecutionEnvironment env;
@Nullable private int[][] projectedFields;
@Nullable private Predicate predicate;
+ @Nullable private PartitionPredicate partitionPredicate;
@Nullable private LogSourceProvider logSourceProvider;
@Nullable private Integer parallelism;
@Nullable private Long limit;
@@ -154,6 +156,11 @@ public class FlinkSourceBuilder {
return this;
}
+ public FlinkSourceBuilder partitionPredicate(PartitionPredicate
partitionPredicate) {
+ this.partitionPredicate = partitionPredicate;
+ return this;
+ }
+
public FlinkSourceBuilder limit(@Nullable Long limit) {
this.limit = limit;
return this;
@@ -197,7 +204,12 @@ public class FlinkSourceBuilder {
if (readType != null) {
readBuilder.withReadType(readType);
}
- readBuilder.withFilter(predicate);
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+ if (partitionPredicate != null) {
+ readBuilder.withPartitionFilter(partitionPredicate);
+ }
if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
@@ -340,6 +352,7 @@ public class FlinkSourceBuilder {
table,
projectedRowType(),
predicate,
+ partitionPredicate,
outerProject()))
.addSource(
new
LogHybridSourceFactory(logSourceProvider),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 5af489ed5c..621ceb7357 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -71,6 +71,12 @@ public abstract class FlinkTableSource
protected final Options options;
@Nullable protected Predicate predicate;
+ /**
+ * This field is only used for normal source (not lookup source).
Specified partitions in lookup
+ * sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}.
+ */
+ @Nullable protected PartitionPredicate partitionPredicate;
+
@Nullable protected int[][] projectFields;
@Nullable protected Long limit;
protected SplitStatistics splitStatistics;
@@ -86,6 +92,7 @@ public abstract class FlinkTableSource
@Nullable Long limit) {
this.table = table;
this.options = Options.fromMap(table.options());
+ this.partitionPredicate = getPartitionPredicateWithOptions();
this.predicate = predicate;
this.projectFields = projectFields;
@@ -130,32 +137,30 @@ public abstract class FlinkTableSource
* This method is only used for normal source (not lookup source).
Specified partitions in
* lookup sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}.
*/
- protected Predicate getPredicateWithScanPartitions() {
+ private PartitionPredicate getPartitionPredicateWithOptions() {
if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
- Predicate partitionPredicate;
+ PartitionPredicate partitionPredicate;
try {
partitionPredicate =
- PartitionPredicate.createPartitionPredicate(
- ParameterUtils.getPartitions(
-
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
- .split(";")),
- table.rowType(),
-
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
+ PartitionPredicate.fromPredicate(
+ table.rowType().project(table.partitionKeys()),
+ PartitionPredicate.createPartitionPredicate(
+ ParameterUtils.getPartitions(
+
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
+ .split(";")),
+ table.rowType(),
+
options.get(CoreOptions.PARTITION_DEFAULT_NAME)));
+ return partitionPredicate;
} catch (IllegalArgumentException e) {
// In older versions of Flink, however, lookup sources will
first be treated as
// normal sources. So this method will also be visited by
lookup tables, whose
// option value might be max_pt() or max_two_pt(). In this
case we ignore the
// filters.
- return predicate;
+ return null;
}
- if (predicate == null) {
- return partitionPredicate;
- } else {
- return PredicateBuilder.and(predicate, partitionPredicate);
- }
} else {
- return predicate;
+ return null;
}
}
@@ -222,7 +227,8 @@ public abstract class FlinkTableSource
List<PartitionEntry> partitionEntries =
table.newReadBuilder()
.dropStats()
- .withFilter(getPredicateWithScanPartitions())
+ .withFilter(predicate)
+ .withPartitionFilter(partitionPredicate)
.newScan()
.listPartitionEntries();
long totalSize = 0;
@@ -238,7 +244,8 @@ public abstract class FlinkTableSource
List<Split> splits =
table.newReadBuilder()
.dropStats()
- .withFilter(getPredicateWithScanPartitions())
+ .withFilter(predicate)
+ .withPartitionFilter(partitionPredicate)
.withProjection(new int[0])
.newScan()
.plan()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index afdcae2fff..d3dd232f12 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
@@ -74,6 +75,7 @@ public class LogHybridSourceFactory
Table table,
@Nullable RowType readType,
@Nullable Predicate predicate,
+ @Nullable PartitionPredicate partitionPredicate,
@Nullable NestedProjectedRowData rowData) {
if (!(table instanceof DataTable)) {
throw new UnsupportedOperationException(
@@ -88,9 +90,15 @@ public class LogHybridSourceFactory
if (readType != null) {
readBuilder.withReadType(readType);
}
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+ if (partitionPredicate != null) {
+ readBuilder.withPartitionFilter(partitionPredicate);
+ }
return new FlinkHybridFirstSource(
- readBuilder.withFilter(predicate),
+ readBuilder,
dataTable.snapshotManager(),
dataTable.coreOptions().toConfiguration(),
rowData);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index e9331414f5..05563fdeac 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -92,7 +92,10 @@ public class SystemTableSource extends FlinkTableSource {
if (readType != null) {
readBuilder.withReadType(readType);
}
- readBuilder.withFilter(getPredicateWithScanPartitions());
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+ readBuilder.withPartitionFilter(partitionPredicate);
if (unbounded && table instanceof DataTable) {
source =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 419eb6a521..edc8b469e7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -358,6 +358,45 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
checkFileAndRowSize(table, 3L, 0L, 1, 6);
}
+ @Test
+ public void testLotsOfPartitionsCompact() throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+
+ FileStoreTable table =
+ prepareTable(
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ tableOptions);
+
+ // base records
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+ List<String> partitions = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ partitions.add("--partition");
+ partitions.add("k=" + i);
+ }
+
+ // repairing that the ut don't specify the real partition of table
+ runActionForUnawareTable(false, partitions);
+
+ // first compaction, snapshot will be 3.
+ checkFileAndRowSize(table, 3L, 0L, 1, 6);
+ }
+
@Test
public void testTableConf() throws Exception {
prepareTable(
@@ -440,14 +479,20 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
}
private void runAction(boolean isStreaming) throws Exception {
- runAction(isStreaming, false);
+ runAction(isStreaming, false, Collections.emptyList());
+ }
+
+ private void runActionForUnawareTable(boolean isStreaming, List<String>
extra)
+ throws Exception {
+ runAction(isStreaming, true, extra);
}
private void runActionForUnawareTable(boolean isStreaming) throws
Exception {
- runAction(isStreaming, true);
+ runAction(isStreaming, true, Collections.emptyList());
}
- private void runAction(boolean isStreaming, boolean unawareBucket) throws
Exception {
+ private void runAction(boolean isStreaming, boolean unawareBucket,
List<String> extra)
+ throws Exception {
StreamExecutionEnvironment env;
if (isStreaming) {
env = streamExecutionEnvironmentBuilder().streamingMode().build();
@@ -486,6 +531,8 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
}
}
+ baseArgs.addAll(extra);
+
CompactAction action = createAction(CompactAction.class,
baseArgs.toArray(new String[0]));
action.withStreamExecutionEnvironment(env).build();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 0e85f559d9..39664c3aa3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -32,7 +32,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -70,7 +70,6 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
@@ -122,16 +121,15 @@ public class CompactorSinkITCase extends AbstractTestBase
{
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().batchMode().build();
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(tablePath.toString(), table);
- Predicate predicate =
- createPartitionPredicate(
- getSpecifiedPartitions(),
- table.rowType(),
- table.coreOptions().partitionDefaultName());
DataStreamSource<RowData> source =
sourceBuilder
.withEnv(env)
.withContinuousMode(false)
- .withPartitionPredicate(predicate)
+ .withPartitionPredicate(
+ PartitionPredicate.fromMaps(
+ table.schema().logicalPartitionType(),
+ getSpecifiedPartitions(),
+
table.coreOptions().partitionDefaultName()))
.build();
new CompactorSinkBuilder(table, true).withInput(source).build();
env.execute();
@@ -162,16 +160,15 @@ public class CompactorSinkITCase extends AbstractTestBase
{
streamExecutionEnvironmentBuilder().streamingMode().build();
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(tablePath.toString(), table);
- Predicate predicate =
- createPartitionPredicate(
- getSpecifiedPartitions(),
- table.rowType(),
- table.coreOptions().partitionDefaultName());
DataStreamSource<RowData> source =
sourceBuilder
.withEnv(env)
.withContinuousMode(false)
- .withPartitionPredicate(predicate)
+ .withPartitionPredicate(
+ PartitionPredicate.fromMaps(
+ table.schema().logicalPartitionType(),
+ getSpecifiedPartitions(),
+
table.coreOptions().partitionDefaultName()))
.build();
Integer sinkParalellism = new Random().nextInt(100) + 1;
new CompactorSinkBuilder(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 9afd04c8d0..d5224670d4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -27,7 +27,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -60,7 +60,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
@@ -264,16 +263,15 @@ public class CompactorSourceITCase extends
AbstractTestBase {
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().streamingMode().build();
- Predicate partitionPredicate =
- createPartitionPredicate(
- specifiedPartitions,
- table.rowType(),
- table.coreOptions().partitionDefaultName());
DataStreamSource<RowData> compactorSource =
new CompactorSourceBuilder("test", table)
.withContinuousMode(isStreaming)
.withEnv(env)
- .withPartitionPredicate(partitionPredicate)
+ .withPartitionPredicate(
+ PartitionPredicate.fromMaps(
+ table.schema().logicalPartitionType(),
+ specifiedPartitions,
+
table.coreOptions().partitionDefaultName()))
.build();
CloseableIterator<RowData> it = compactorSource.executeAndCollect();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
index 24f35cdfdb..89ac2c947a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
@@ -129,7 +129,7 @@ public class FileStoreSourceMetricsTest {
public void logHybridFileStoreSourceScanMetricsTest() throws Exception {
writeOnce();
FlinkSource logHybridFileStoreSource =
- LogHybridSourceFactory.buildHybridFirstSource(table, null,
null, null);
+ LogHybridSourceFactory.buildHybridFirstSource(table, null,
null, null, null);
logHybridFileStoreSource.restoreEnumerator(context, null);
assertThat(TestingMetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
.isEqualTo(1L);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 196f682cd5..8458941b91 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.spark.PaimonSplitScan;
import org.apache.paimon.spark.SparkUtils;
@@ -46,6 +47,7 @@ import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.ProcedureUtils;
@@ -237,25 +239,33 @@ public class CompactProcedure extends BaseProcedure {
BucketMode bucketMode = table.bucketMode();
OrderType orderType = OrderType.of(sortType);
boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
+ RowType partitionType = table.schema().logicalPartitionType();
Predicate filter =
condition == null
? null
: ExpressionUtils.convertConditionToPaimonPredicate(
condition,
((LogicalPlan) relation).output(),
- table.rowType(),
+ partitionType,
false)
.getOrElse(null);
+ PartitionPredicate partitionPredicate =
+ PartitionPredicate.fromPredicate(partitionType, filter);
if (orderType.equals(OrderType.NONE)) {
JavaSparkContext javaSparkContext = new
JavaSparkContext(spark().sparkContext());
switch (bucketMode) {
case HASH_FIXED:
case HASH_DYNAMIC:
compactAwareBucketTable(
- table, fullCompact, filter, partitionIdleTime,
javaSparkContext);
+ table,
+ fullCompact,
+ partitionPredicate,
+ partitionIdleTime,
+ javaSparkContext);
break;
case BUCKET_UNAWARE:
- compactUnAwareBucketTable(table, filter,
partitionIdleTime, javaSparkContext);
+ compactUnAwareBucketTable(
+ table, partitionPredicate, partitionIdleTime,
javaSparkContext);
break;
default:
throw new UnsupportedOperationException(
@@ -279,12 +289,12 @@ public class CompactProcedure extends BaseProcedure {
private void compactAwareBucketTable(
FileStoreTable table,
boolean fullCompact,
- @Nullable Predicate filter,
+ @Nullable PartitionPredicate partitionPredicate,
@Nullable Duration partitionIdleTime,
JavaSparkContext javaSparkContext) {
SnapshotReader snapshotReader = table.newSnapshotReader();
- if (filter != null) {
- snapshotReader.withFilter(filter);
+ if (partitionPredicate != null) {
+ snapshotReader.withPartitionFilter(partitionPredicate);
}
Set<BinaryRow> partitionToBeCompacted =
getHistoryPartition(snapshotReader, partitionIdleTime);
@@ -358,12 +368,12 @@ public class CompactProcedure extends BaseProcedure {
private void compactUnAwareBucketTable(
FileStoreTable table,
- @Nullable Predicate filter,
+ @Nullable PartitionPredicate partitionPredicate,
@Nullable Duration partitionIdleTime,
JavaSparkContext javaSparkContext) {
List<AppendCompactTask> compactionTasks;
try {
- compactionTasks = new AppendCompactCoordinator(table, false,
filter).run();
+ compactionTasks = new AppendCompactCoordinator(table, false,
partitionPredicate).run();
} catch (EndOfScanException e) {
compactionTasks = new ArrayList<>();
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index f85213da7c..ffc31bb46f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -443,6 +443,80 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
}
}
+ test(s"Paimon DeletionVector: delete for append partitioned table with
bucket = 1") {
+ val bucket = -1
+ withTable("T") {
+ val bucketKey = if (bucket > 1) {
+ ", 'bucket-key' = 'id'"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |PARTITIONED BY(pt)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.bitmap64' = '${Random
+ .nextBoolean()}', 'bucket' = '$bucket' $bucketKey)
+ |""".stripMargin)
+
+ val table = loadTable("T")
+ val dvMaintainerFactory =
+ new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+
+ def getDeletionVectors(ptValues: Seq[String]): Map[String,
DeletionVector] = {
+ getLatestDeletionVectors(table, dvMaintainerFactory,
ptValues.map(BinaryRow.singleColumn))
+ }
+
+ spark.sql(
+ "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c',
'2025'), (4, 'd', '2025')")
+ val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
+ Assertions.assertEquals(0, deletionVectors1.size)
+
+ val cond1 = "id = 2"
+ val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
+ runAndCheckSplitScan(s"DELETE FROM T WHERE $cond1")
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025") ::
Nil)
+ val deletionVectors2 = getDeletionVectors(Seq("2024", "2025"))
+ Assertions.assertEquals(1, deletionVectors2.size)
+ deletionVectors2
+ .foreach {
+ case (filePath, dv) =>
+ rowMetaInfo1(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
+ }
+
+ val cond2 = "id = 3"
+ val rowMetaInfo2 = rowMetaInfo1 ++ getFilePathAndRowIndex(cond2)
+ runAndCheckSplitScan(s"DELETE FROM T WHERE $cond2")
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+ val deletionVectors3 = getDeletionVectors(Seq("2024"))
+ Assertions.assertTrue(deletionVectors2 == deletionVectors3)
+ val deletionVectors4 = getDeletionVectors(Seq("2024", "2025"))
+ deletionVectors4
+ .foreach {
+ case (filePath, dv) =>
+ rowMetaInfo2(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
+ }
+
+ spark.sql("""CALL sys.compact(table => 'T', partitions => "pt =
'2024'")""")
+ Assertions.assertTrue(getDeletionVectors(Seq("2024")).isEmpty)
+ Assertions.assertTrue(getDeletionVectors(Seq("2025")).nonEmpty)
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+
+ spark.sql("""CALL sys.compact(table => 'T', where => "pt = '2025'")""")
+ Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+ Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+ }
+ }
+
test("Paimon deletionVector: deletion vector write verification") {
withTable("T") {
spark.sql(s"""