This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 52e9d19a60 [core] Make PartitionPredicate Public
52e9d19a60 is described below
commit 52e9d19a603a6854e3f1d66f298c420e706e2f38
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jul 14 14:30:56 2025 +0800
[core] Make PartitionPredicate Public
---
.../main/java/org/apache/paimon/types/RowType.java | 2 +-
.../paimon/partition/PartitionPredicate.java | 25 ++++++++++++++++++++--
.../paimon/table/source/ReadBuilderImpl.java | 4 ++--
.../apache/paimon/flink/action/CompactAction.java | 8 +++----
.../apache/paimon/flink/action/RescaleAction.java | 4 ++--
.../paimon/flink/action/SortCompactAction.java | 2 +-
6 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index e886c97ca6..a7f867aeb3 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -323,7 +323,7 @@ public final class RowType extends DataType {
.copy(isNullable());
}
- public int[] projectNames(List<String> names) {
+ public int[] projectIndexes(List<String> names) {
List<String> fieldNames =
fields.stream().map(DataField::name).collect(Collectors.toList());
return names.stream().mapToInt(fieldNames::indexOf).toArray();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 4a9b7d646a..f3d9e9030c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -48,11 +48,26 @@ import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecTo
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
-/** A special predicate to filter partition only, just like {@link Predicate}.
*/
+/**
+ * A special predicate to filter partition only, just like {@link Predicate}.
+ *
+ * @since 1.3.0
+ */
public interface PartitionPredicate extends Serializable {
- boolean test(BinaryRow part);
+ /**
+ * Test based on the specific partition.
+ *
+ * @return return true when hit, false when not hit.
+ */
+ boolean test(BinaryRow partition);
+ /**
+ * Test based on the statistical information to determine whether a hit is
possible.
+ *
+ * @return return true is likely to hit (there may also be false
positives), return false is
+ * absolutely not possible to hit.
+ */
boolean test(
long rowCount, InternalRow minValues, InternalRow maxValues,
InternalArray nullCounts);
@@ -69,11 +84,13 @@ public interface PartitionPredicate extends Serializable {
return new DefaultPartitionPredicate(predicate);
}
+ /** Create {@link PartitionPredicate} from multiple partitions. */
@Nullable
static PartitionPredicate fromMultiple(RowType partitionType,
List<BinaryRow> partitions) {
return fromMultiple(partitionType, new HashSet<>(partitions));
}
+ /** Create {@link PartitionPredicate} from multiple partitions. */
@Nullable
static PartitionPredicate fromMultiple(RowType partitionType,
Set<BinaryRow> partitions) {
if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) {
@@ -123,6 +140,8 @@ public interface PartitionPredicate extends Serializable {
/** A {@link PartitionPredicate} using {@link Predicate}. */
class DefaultPartitionPredicate implements PartitionPredicate {
+ private static final long serialVersionUID = 1L;
+
private final Predicate predicate;
private DefaultPartitionPredicate(Predicate predicate) {
@@ -150,6 +169,8 @@ public interface PartitionPredicate extends Serializable {
*/
class MultiplePartitionPredicate implements PartitionPredicate {
+ private static final long serialVersionUID = 1L;
+
private final Set<BinaryRow> partitions;
private final int fieldNum;
private final Predicate[] min;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index f12c4fe4be..63be2e8765 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -103,8 +103,8 @@ public class ReadBuilderImpl implements ReadBuilder {
}
@Override
- public ReadBuilder withPartitionFilter(@Nullable PartitionPredicate
partitions) {
- this.partitionFilter = partitions;
+ public ReadBuilder withPartitionFilter(@Nullable PartitionPredicate
partitionPredicate) {
+ this.partitionFilter = partitionPredicate;
return this;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index b0368082aa..5528fb2ee5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -174,7 +174,7 @@ public class CompactAction extends TableActionBase {
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table,
fullCompaction);
- sourceBuilder.withPartitionPredicate(getPredicate());
+ sourceBuilder.withPartitionPredicate(getPartitionPredicate());
DataStreamSource<RowData> source =
sourceBuilder
.withEnv(env)
@@ -189,13 +189,13 @@ public class CompactAction extends TableActionBase {
throws Exception {
AppendTableCompactBuilder builder =
new AppendTableCompactBuilder(env, identifier.getFullName(),
table);
- builder.withPartitionPredicate(getPredicate());
+ builder.withPartitionPredicate(getPartitionPredicate());
builder.withContinuousMode(isStreaming);
builder.withPartitionIdleTime(partitionIdleTime);
builder.build();
}
- protected PartitionPredicate getPredicate() throws Exception {
+ protected PartitionPredicate getPartitionPredicate() throws Exception {
Preconditions.checkArgument(
partitions == null || whereSql == null,
"partitions and where cannot be used together.");
@@ -245,7 +245,7 @@ public class CompactAction extends TableActionBase {
predicate
.visit(
new PredicateProjectionConverter(
-
table.rowType().projectNames(table.partitionKeys())))
+
table.rowType().projectIndexes(table.partitionKeys())))
.orElseThrow(
() ->
new RuntimeException(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index fb3d068ed1..3855a5115d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -95,7 +95,7 @@ public class RescaleAction extends TableActionBase {
String.valueOf(snapshot.id()));
fileStoreTable = fileStoreTable.copy(dynamicOptions);
- PartitionPredicate predicate =
+ PartitionPredicate partitionPredicate =
PartitionPredicate.fromMap(
fileStoreTable.schema().logicalPartitionType(),
partition,
@@ -109,7 +109,7 @@ public class RescaleAction extends TableActionBase {
scanParallelism == null
? currentBucketNum(snapshot)
: scanParallelism)
- .partitionPredicate(predicate)
+ .partitionPredicate(partitionPredicate)
.build();
Map<String, String> bucketOptions = new
HashMap<>(fileStoreTable.options());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 535cdfc7a9..69d07d238b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -91,7 +91,7 @@ public class SortCompactAction extends CompactAction {
identifier.getObjectName())
.asSummaryString());
- sourceBuilder.partitionPredicate(getPredicate());
+ sourceBuilder.partitionPredicate(getPartitionPredicate());
String scanParallelism =
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {