This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f5647255f4 [spark] Split pushedFilters into dataFilters and
partitionFilters in ScanBuilder (#6817)
f5647255f4 is described below
commit f5647255f426e634a24aa0da24d59a7e36aec0f0
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 16 15:09:14 2025 +0800
[spark] Split pushedFilters into dataFilters and partitionFilters in
ScanBuilder (#6817)
---
.../apache/paimon/predicate/PredicateBuilder.java | 15 --
.../org/apache/paimon/predicate/SortValue.java | 17 ++
.../java/org/apache/paimon/predicate/TopN.java | 15 ++
.../paimon/operation/ManifestFileMerger.java | 4 +-
.../paimon/partition/PartitionPredicate.java | 211 ++++++++++++++++++---
.../PartitionValuesTimeExpireStrategy.java | 16 ++
.../paimon/table/format/FormatReadBuilder.java | 17 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 22 +--
.../paimon/table/format/FormatTableScanTest.java | 2 +-
.../paimon/spark/FormatTableScanBuilder.scala | 20 +-
...ePushDown.scala => PaimonBaseScanBuilder.scala} | 59 ++++--
.../paimon/spark/PaimonFormatTableScan.scala | 8 +-
.../scala/org/apache/paimon/spark/PaimonScan.scala | 12 +-
.../apache/paimon/spark/PaimonScanBuilder.scala | 13 +-
.../paimon/spark/PaimonSplitScanBuilder.scala | 29 ---
.../scala/org/apache/paimon/spark/PaimonScan.scala | 11 +-
.../paimon/spark/ColumnPruningAndPushDown.scala | 47 ++++-
.../paimon/spark/FormatTableScanBuilder.scala | 27 +--
.../apache/paimon/spark/PaimonBasePushDown.scala | 91 ---------
.../org/apache/paimon/spark/PaimonBaseScan.scala | 48 +----
.../paimon/spark/PaimonBaseScanBuilder.scala | 96 ++++++++--
.../org/apache/paimon/spark/PaimonBatch.scala | 1 -
.../paimon/spark/PaimonFormatTableBaseScan.scala | 17 +-
.../paimon/spark/PaimonFormatTableScan.scala | 8 +-
.../org/apache/paimon/spark/PaimonLocalScan.scala | 10 +-
.../scala/org/apache/paimon/spark/PaimonScan.scala | 25 +--
.../apache/paimon/spark/PaimonScanBuilder.scala | 43 ++---
.../org/apache/paimon/spark/PaimonSplitScan.scala | 24 ++-
.../org/apache/paimon/spark/PaimonStatistics.scala | 73 +++----
.../paimon/spark/sql/AnalyzeTableTestBase.scala | 24 +--
.../spark/sql/PaimonOptimizationTestBase.scala | 5 +
31 files changed, 537 insertions(+), 473 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 935ecad303..c8050efa16 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -297,21 +297,6 @@ public class PredicateBuilder {
return result;
}
- public static Pair<List<Predicate>, List<Predicate>> splitAndByPartition(
- Predicate predicate, int[] fieldIdxToPartitionIdx) {
- List<Predicate> partitionFilters = new ArrayList<>();
- List<Predicate> nonPartitionFilters = new ArrayList<>();
- for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
- Optional<Predicate> mapped = transformFieldMapping(p,
fieldIdxToPartitionIdx);
- if (mapped.isPresent()) {
- partitionFilters.add(mapped.get());
- } else {
- nonPartitionFilters.add(p);
- }
- }
- return Pair.of(partitionFilters, nonPartitionFilters);
- }
-
public static List<Predicate> splitOr(@Nullable Predicate predicate) {
if (predicate == null) {
return Collections.emptyList();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
index 61d7913510..c78819b1c6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
@@ -21,6 +21,7 @@ package org.apache.paimon.predicate;
import org.apache.paimon.utils.Preconditions;
import java.io.Serializable;
+import java.util.Objects;
/** Represents a sort order. */
public class SortValue implements Serializable {
@@ -55,6 +56,22 @@ public class SortValue implements Serializable {
"%s %s %s", field.name(), direction.toString(),
nullOrdering.toString());
}
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SortValue sortValue = (SortValue) o;
+ return Objects.equals(field, sortValue.field)
+ && direction == sortValue.direction
+ && nullOrdering == sortValue.nullOrdering;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(field, direction, nullOrdering);
+ }
+
/** A null order used in sorting expressions. */
public enum NullOrdering {
NULLS_FIRST("NULLS FIRST"),
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
index 70b50c0c3a..20d34b86d2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
@@ -24,6 +24,7 @@ import org.apache.paimon.predicate.SortValue.SortDirection;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.ListUtils.isNullOrEmpty;
@@ -62,4 +63,18 @@ public class TopN implements Serializable {
String sort =
orders.stream().map(SortValue::toString).collect(Collectors.joining(", "));
return String.format("Sort(%s), Limit(%s)", sort, limit);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopN topN = (TopN) o;
+ return limit == topN.limit && Objects.equals(orders, topN.orders);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(orders, limit);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
index 51c7081916..fd8321de04 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
@@ -200,13 +200,13 @@ public class ManifestFileMerger {
PartitionPredicate predicate;
if (deleteEntries.isEmpty()) {
- predicate = PartitionPredicate.alwaysFalse();
+ predicate = PartitionPredicate.ALWAYS_FALSE;
} else {
if (partitionType.getFieldCount() > 0) {
Set<BinaryRow> deletePartitions =
computeDeletePartitions(deleteEntries);
predicate = PartitionPredicate.fromMultiple(partitionType,
deletePartitions);
} else {
- predicate = PartitionPredicate.alwaysTrue();
+ predicate = PartitionPredicate.ALWAYS_TRUE;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index cb4e952664..d997ad2db7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -30,6 +30,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.statistics.FullSimpleColStatsCollector;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -37,13 +38,18 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import static
org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx;
+import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -102,41 +108,53 @@ public interface PartitionPredicate extends Serializable {
new RowDataToObjectArrayConverter(partitionType), partitions);
}
- static PartitionPredicate alwaysFalse() {
- return new PartitionPredicate() {
- @Override
- public boolean test(BinaryRow part) {
- return false;
- }
+ /** Creates {@link PartitionPredicate} that combines multiple predicates
using logical AND. */
+ @Nullable
+ static PartitionPredicate and(List<PartitionPredicate> predicates) {
+ if (predicates.isEmpty()) {
+ return null;
+ }
- @Override
- public boolean test(
- long rowCount,
- InternalRow minValues,
- InternalRow maxValues,
- InternalArray nullCounts) {
- return false;
- }
- };
+ if (predicates.size() == 1) {
+ return predicates.get(0);
+ }
+
+ return new AndPartitionPredicate(predicates);
}
- static PartitionPredicate alwaysTrue() {
- return new PartitionPredicate() {
- @Override
- public boolean test(BinaryRow part) {
- return true;
- }
+ PartitionPredicate ALWAYS_FALSE =
+ new PartitionPredicate() {
+ @Override
+ public boolean test(BinaryRow part) {
+ return false;
+ }
- @Override
- public boolean test(
- long rowCount,
- InternalRow minValues,
- InternalRow maxValues,
- InternalArray nullCounts) {
- return true;
- }
- };
- }
+ @Override
+ public boolean test(
+ long rowCount,
+ InternalRow minValues,
+ InternalRow maxValues,
+ InternalArray nullCounts) {
+ return false;
+ }
+ };
+
+ PartitionPredicate ALWAYS_TRUE =
+ new PartitionPredicate() {
+ @Override
+ public boolean test(BinaryRow part) {
+ return true;
+ }
+
+ @Override
+ public boolean test(
+ long rowCount,
+ InternalRow minValues,
+ InternalRow maxValues,
+ InternalArray nullCounts) {
+ return true;
+ }
+ };
/** A {@link PartitionPredicate} using {@link Predicate}. */
class DefaultPartitionPredicate implements PartitionPredicate {
@@ -166,6 +184,26 @@ public interface PartitionPredicate extends Serializable {
public Predicate predicate() {
return predicate;
}
+
+ @Override
+ public String toString() {
+ return predicate.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DefaultPartitionPredicate that = (DefaultPartitionPredicate) o;
+ return Objects.equals(predicate, that.predicate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(predicate);
+ }
}
/**
@@ -253,6 +291,82 @@ public interface PartitionPredicate extends Serializable {
public Set<BinaryRow> partitions() {
return partitions;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MultiplePartitionPredicate that = (MultiplePartitionPredicate) o;
+ return fieldNum == that.fieldNum
+ && Objects.equals(partitions, that.partitions)
+ && Objects.deepEquals(min, that.min)
+ && Objects.deepEquals(max, that.max);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitions, fieldNum, Arrays.hashCode(min),
Arrays.hashCode(max));
+ }
+ }
+
+ /** AND-combines multiple {@link PartitionPredicate}s. */
+ class AndPartitionPredicate implements PartitionPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<PartitionPredicate> predicates;
+
+ private AndPartitionPredicate(List<PartitionPredicate> predicates) {
+ checkArgument(!predicates.isEmpty());
+ this.predicates = Collections.unmodifiableList(new
ArrayList<>(predicates));
+ }
+
+ @Override
+ public boolean test(BinaryRow partition) {
+ for (PartitionPredicate predicate : predicates) {
+ if (!predicate.test(partition)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean test(
+ long rowCount,
+ InternalRow minValues,
+ InternalRow maxValues,
+ InternalArray nullCounts) {
+ for (PartitionPredicate predicate : predicates) {
+ if (!predicate.test(rowCount, minValues, maxValues,
nullCounts)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ if (predicates.size() == 1) {
+ return predicates.get(0).toString();
+ }
+ return "AND" + "(" + predicates + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AndPartitionPredicate that = (AndPartitionPredicate) o;
+ return Objects.equals(predicates, that.predicates);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(predicates);
+ }
}
static Predicate createPartitionPredicate(RowType rowType, Map<String,
Object> partition) {
@@ -340,4 +454,39 @@ public interface PartitionPredicate extends Serializable {
return fromMultiple(
partitionType, createBinaryPartitions(values, partitionType,
defaultPartValue));
}
+
+ static Pair<Optional<PartitionPredicate>, List<Predicate>>
+ splitPartitionPredicatesAndDataPredicates(
+ Predicate dataPredicates, RowType tableType, List<String>
partitionKeys) {
+ return splitPartitionPredicatesAndDataPredicates(
+ PredicateBuilder.splitAnd(dataPredicates), tableType,
partitionKeys);
+ }
+
+ static Pair<Optional<PartitionPredicate>, List<Predicate>>
+ splitPartitionPredicatesAndDataPredicates(
+ List<Predicate> dataPredicates, RowType tableType,
List<String> partitionKeys) {
+ if (partitionKeys.isEmpty()) {
+ return Pair.of(Optional.empty(), dataPredicates);
+ }
+
+ RowType partitionType = tableType.project(partitionKeys);
+ int[] partitionIdx = fieldIdxToPartitionIdx(tableType, partitionKeys);
+
+ List<Predicate> partitionFilters = new ArrayList<>();
+ List<Predicate> nonPartitionFilters = new ArrayList<>();
+ for (Predicate p : dataPredicates) {
+ Optional<Predicate> mapped = transformFieldMapping(p,
partitionIdx);
+ if (mapped.isPresent()) {
+ partitionFilters.add(mapped.get());
+ } else {
+ nonPartitionFilters.add(p);
+ }
+ }
+ PartitionPredicate partitionPredicate =
+ partitionFilters.isEmpty()
+ ? null
+ : PartitionPredicate.fromPredicate(
+ partitionType,
PredicateBuilder.and(partitionFilters));
+ return Pair.of(Optional.ofNullable(partitionPredicate),
nonPartitionFilters);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
index 6685a1d28c..94e26d6f37 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
@@ -33,6 +33,7 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -117,5 +118,20 @@ public class PartitionValuesTimeExpireStrategy extends
PartitionExpireStrategy {
InternalArray nullCounts) {
return true;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PartitionValuesTimePredicate that = (PartitionValuesTimePredicate)
o;
+ return Objects.equals(expireDateTime, that.expireDateTime);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(expireDateTime);
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 637fa070b5..ec344fb541 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -51,13 +51,13 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+import static
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates;
import static
org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
-import static
org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link ReadBuilder} for {@link FormatTable}. */
@@ -147,13 +147,12 @@ public class FormatReadBuilder implements ReadBuilder {
public TableScan newScan() {
PartitionPredicate partitionFilter = this.partitionFilter;
if (partitionFilter == null && this.filter != null &&
!table.partitionKeys().isEmpty()) {
- int[] partitionIdx = fieldIdxToPartitionIdx(table.rowType(),
table.partitionKeys());
- List<Predicate> partitionFilters = splitAndByPartition(filter,
partitionIdx).getLeft();
- if (!partitionFilters.isEmpty()) {
- RowType partitionType =
table.rowType().project(table.partitionKeys());
- partitionFilter =
- PartitionPredicate.fromPredicate(
- partitionType,
PredicateBuilder.and(partitionFilters));
+ Optional<PartitionPredicate> partitionPredicateOpt =
+ splitPartitionPredicatesAndDataPredicates(
+ filter, table.rowType(),
table.partitionKeys())
+ .getLeft();
+ if (partitionPredicateOpt.isPresent()) {
+ partitionFilter = partitionPredicateOpt.get();
}
}
return new FormatTableScan(table, partitionFilter, limit);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 63261f8c64..15f662d3ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -70,6 +70,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -78,7 +79,7 @@ import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition;
+import static
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates;
/** Implementation of {@link SnapshotReader}. */
public class SnapshotReaderImpl implements SnapshotReader {
@@ -228,19 +229,14 @@ public class SnapshotReaderImpl implements SnapshotReader
{
@Override
public SnapshotReader withFilter(Predicate predicate) {
- int[] fieldIdxToPartitionIdx =
- PredicateBuilder.fieldIdxToPartitionIdx(
- tableSchema.logicalRowType(),
tableSchema.partitionKeys());
- Pair<List<Predicate>, List<Predicate>> partitionAndNonPartitionFilter =
- splitAndByPartition(predicate, fieldIdxToPartitionIdx);
- List<Predicate> partitionFilters =
partitionAndNonPartitionFilter.getLeft();
- List<Predicate> nonPartitionFilters =
partitionAndNonPartitionFilter.getRight();
- if (partitionFilters.size() > 0) {
- scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
+ Pair<Optional<PartitionPredicate>, List<Predicate>> pair =
+ splitPartitionPredicatesAndDataPredicates(
+ predicate, tableSchema.logicalRowType(),
tableSchema.partitionKeys());
+ if (pair.getLeft().isPresent()) {
+ scan.withPartitionFilter(pair.getLeft().get());
}
-
- if (nonPartitionFilters.size() > 0) {
- nonPartitionFilterConsumer.accept(scan,
PredicateBuilder.and(nonPartitionFilters));
+ if (!pair.getRight().isEmpty()) {
+ nonPartitionFilterConsumer.accept(scan,
PredicateBuilder.and(pair.getRight()));
}
return this;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
index c2d45f1c62..1c9a3d6e7c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
@@ -115,7 +115,7 @@ public class FormatTableScanTest {
void testComputeScanPathAndLevelNoPartitionKeys() {
List<String> partitionKeys = Collections.emptyList();
RowType partitionType = RowType.of();
- PartitionPredicate partitionFilter = PartitionPredicate.alwaysTrue();
+ PartitionPredicate partitionFilter = PartitionPredicate.ALWAYS_TRUE;
Pair<Path, Integer> result =
FormatTableScan.computeScanPathAndLevel(
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
index 9be731924d..2ae94d65f8 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
@@ -19,25 +19,9 @@
package org.apache.paimon.spark
import org.apache.paimon.table.FormatTable
-import org.apache.paimon.types.RowType
-import org.apache.spark.sql.connector.read.{SupportsPushDownRequiredColumns,
SupportsRuntimeFiltering}
-import org.apache.spark.sql.types.StructType
-
-import java.util.{List => JList}
-
-case class FormatTableScanBuilder(table: FormatTable)
- extends PaimonBasePushDown
- with SupportsPushDownRequiredColumns {
-
- override protected var partitionKeys: JList[String] = table.partitionKeys()
- override protected var rowType: RowType = table.rowType()
- protected var requiredSchema: StructType =
SparkTypeUtils.fromPaimonRowType(rowType)
+case class FormatTableScanBuilder(table: FormatTable) extends
PaimonBaseScanBuilder {
override def build(): PaimonFormatTableScan =
- PaimonFormatTableScan(table, requiredSchema, pushedPaimonPredicates, None)
-
- override def pruneColumns(requiredSchema: StructType): Unit = {
- this.requiredSchema = requiredSchema
- }
+ PaimonFormatTableScan(table, requiredSchema, pushedPartitionFilters,
pushedDataFilters)
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
similarity index 51%
rename from
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
rename to
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index c99066343e..806ce90e28 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,60 +18,85 @@
package org.apache.paimon.spark
+import org.apache.paimon.partition.PartitionPredicate
+import
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
+import org.apache.paimon.table.Table
import org.apache.paimon.types.RowType
-import org.apache.spark.sql.connector.read.SupportsPushDownFilters
+import org.apache.spark.sql.connector.read.{SupportsPushDownFilters,
SupportsPushDownRequiredColumns}
import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
import java.util.{List => JList}
+import scala.collection.JavaConverters._
import scala.collection.mutable
-/** Base trait for Paimon scan filter push down. */
-trait PaimonBasePushDown extends SupportsPushDownFilters {
+/** Base Scan builder. */
+abstract class PaimonBaseScanBuilder
+ extends SupportsPushDownRequiredColumns
+ with SupportsPushDownFilters {
- protected var partitionKeys: JList[String]
- protected var rowType: RowType
+ val table: Table
+ val partitionKeys: JList[String] = table.partitionKeys()
+ val rowType: RowType = table.rowType()
private var pushedSparkFilters = Array.empty[Filter]
- protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
- protected var reservedFilters: Array[Filter] = Array.empty
protected var hasPostScanPredicates = false
+ protected var pushedPartitionFilters: Array[PartitionPredicate] = Array.empty
+ protected var pushedDataFilters: Array[Predicate] = Array.empty
+
+ protected var requiredSchema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ this.requiredSchema = requiredSchema
+ }
+
/**
* Pushes down filters, and returns filters that need to be evaluated after
scanning. <p> Rows
* should be returned from the data source if and only if all the filters
match. That is, filters
* must be interpreted as ANDed together.
*/
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
- val pushable = mutable.ArrayBuffer.empty[(Filter, Predicate)]
+ val pushable = mutable.ArrayBuffer.empty[Filter]
+ val pushablePartitionDataFilters = mutable.ArrayBuffer.empty[Predicate]
+ val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
val postScan = mutable.ArrayBuffer.empty[Filter]
- val reserved = mutable.ArrayBuffer.empty[Filter]
val converter = new SparkFilterConverter(rowType)
- val visitor = new PartitionPredicateVisitor(partitionKeys)
+ val partitionPredicateVisitor = new
PartitionPredicateVisitor(partitionKeys)
filters.foreach {
filter =>
val predicate = converter.convertIgnoreFailure(filter)
if (predicate == null) {
postScan.append(filter)
} else {
- pushable.append((filter, predicate))
- if (predicate.visit(visitor)) {
- reserved.append(filter)
+ pushable.append(filter)
+ if (predicate.visit(partitionPredicateVisitor)) {
+ pushablePartitionDataFilters.append(predicate)
} else {
+ pushableDataFilters.append(predicate)
postScan.append(filter)
}
}
}
if (pushable.nonEmpty) {
- this.pushedSparkFilters = pushable.map(_._1).toArray
- this.pushedPaimonPredicates = pushable.map(_._2).toArray
+ this.pushedSparkFilters = pushable.toArray
+ }
+ if (pushablePartitionDataFilters.nonEmpty) {
+ val pair = splitPartitionPredicatesAndDataPredicates(
+ pushablePartitionDataFilters.asJava,
+ rowType,
+ partitionKeys)
+ assert(pair.getRight.isEmpty)
+ assert(pair.getLeft.isPresent)
+ this.pushedPartitionFilters = Array(pair.getLeft.get())
}
- if (reserved.nonEmpty) {
- this.reservedFilters = reserved.toArray
+ if (pushableDataFilters.nonEmpty) {
+ this.pushedDataFilters = pushableDataFilters.toArray
}
if (postScan.nonEmpty) {
this.hasPostScanPredicates = true
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index a7bbe4e94e..e9734d238b 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.FormatTable
@@ -33,9 +34,10 @@ import scala.collection.JavaConverters._
case class PaimonFormatTableScan(
table: FormatTable,
requiredSchema: StructType,
- filters: Seq[Predicate],
- override val pushDownLimit: Option[Int])
- extends PaimonFormatTableBaseScan(table, requiredSchema, filters,
pushDownLimit)
+ pushedPartitionFilters: Seq[PartitionPredicate],
+ pushedDataFilters: Seq[Predicate],
+ override val pushedLimit: Option[Int] = None)
+ extends PaimonFormatTableBaseScan
with SupportsRuntimeFiltering
with ScanHelper {
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index e8fe9a9c40..68b89d5ecc 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{Predicate, TopN}
import org.apache.paimon.table.InnerTable
@@ -32,13 +33,12 @@ import scala.collection.JavaConverters._
case class PaimonScan(
table: InnerTable,
requiredSchema: StructType,
- filters: Seq[Predicate],
- reservedFilters: Seq[Filter],
- override val pushDownLimit: Option[Int],
- // no usage, just for compile compatibility
- override val pushDownTopN: Option[TopN],
+ pushedPartitionFilters: Seq[PartitionPredicate],
+ pushedDataFilters: Seq[Predicate],
+ override val pushedLimit: Option[Int] = None,
+ override val pushedTopN: Option[TopN] = None,
bucketedScanDisabled: Boolean = true)
- extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters,
pushDownLimit)
+ extends PaimonBaseScan(table)
with SupportsRuntimeFiltering {
override def filterAttributes(): Array[NamedReference] = {
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 0ba6dca6c0..21ab46dabc 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -19,21 +19,12 @@
package org.apache.paimon.spark
import org.apache.paimon.table.InnerTable
-import org.apache.paimon.types.RowType
import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.sources.Filter
-import java.util.{List => JList}
-
-class PaimonScanBuilder(table: InnerTable)
- extends PaimonBaseScanBuilder(table)
- with PaimonBasePushDown {
-
- override protected var partitionKeys: JList[String] = table.partitionKeys()
- override protected var rowType: RowType = table.rowType()
+class PaimonScanBuilder(val table: InnerTable) extends PaimonBaseScanBuilder {
override def build(): Scan = {
- PaimonScan(table, requiredSchema, pushedPaimonPredicates, reservedFilters,
None, pushDownTopN)
+ PaimonScan(table, requiredSchema, pushedPartitionFilters,
pushedDataFilters)
}
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonSplitScanBuilder.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonSplitScanBuilder.scala
deleted file mode 100644
index ede18b8cc9..0000000000
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonSplitScanBuilder.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.table.KnownSplitsTable
-
-import org.apache.spark.sql.connector.read.Scan
-
-class PaimonSplitScanBuilder(table: KnownSplitsTable) extends
PaimonScanBuilder(table) {
- override def build(): Scan = {
- PaimonSplitScan(table, table.splits(), requiredSchema,
pushedPaimonPredicates)
- }
-}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 0fb515ac2c..7c0a4d0c17 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{Predicate, TopN}
import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -34,12 +35,12 @@ import scala.collection.JavaConverters._
case class PaimonScan(
table: InnerTable,
requiredSchema: StructType,
- filters: Seq[Predicate],
- reservedFilters: Seq[Filter],
- override val pushDownLimit: Option[Int],
- override val pushDownTopN: Option[TopN],
+ pushedPartitionFilters: Seq[PartitionPredicate],
+ pushedDataFilters: Seq[Predicate],
+ override val pushedLimit: Option[Int],
+ override val pushedTopN: Option[TopN],
bucketedScanDisabled: Boolean = false)
- extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters,
pushDownLimit)
+ extends PaimonBaseScan(table)
with SupportsRuntimeFiltering
with SupportsReportPartitioning {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index 11f4d8ad4b..98daf2eaef 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -19,7 +19,8 @@
package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder, TopN}
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.predicate.{Predicate, TopN}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.table.{SpecialFields, Table}
@@ -30,12 +31,20 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.types.StructType
+import scala.collection.JavaConverters._
+
trait ColumnPruningAndPushDown extends Scan with Logging {
+
def table: Table
+
+ // Column pruning
def requiredSchema: StructType
- def filters: Seq[Predicate]
- def pushDownLimit: Option[Int] = None
- def pushDownTopN: Option[TopN] = None
+
+ // Push down
+ def pushedPartitionFilters: Seq[PartitionPredicate]
+ def pushedDataFilters: Seq[Predicate]
+ def pushedLimit: Option[Int] = None
+ def pushedTopN: Option[TopN] = None
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
@@ -82,12 +91,14 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
lazy val readBuilder: ReadBuilder = {
val _readBuilder = table.newReadBuilder().withReadType(readTableRowType)
- if (filters.nonEmpty) {
- val pushedPredicate = PredicateBuilder.and(filters: _*)
- _readBuilder.withFilter(pushedPredicate)
+ if (pushedPartitionFilters.nonEmpty) {
+
_readBuilder.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
+ }
+ if (pushedDataFilters.nonEmpty) {
+ _readBuilder.withFilter(pushedDataFilters.asJava)
}
- pushDownLimit.foreach(_readBuilder.withLimit)
- pushDownTopN.foreach(_readBuilder.withTopN)
+ pushedLimit.foreach(_readBuilder.withLimit)
+ pushedTopN.foreach(_readBuilder.withTopN)
_readBuilder.dropStats()
}
@@ -104,4 +115,22 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
}
_readSchema
}
+
+ override def description(): String = {
+ val pushedPartitionFiltersStr = if (pushedPartitionFilters.nonEmpty) {
+ ", PartitionFilters: [" + pushedPartitionFilters.mkString(",") + "]"
+ } else {
+ ""
+ }
+ val pushedDataFiltersStr = if (pushedDataFilters.nonEmpty) {
+ ", DataFilters: [" + pushedDataFilters.mkString(",") + "]"
+ } else {
+ ""
+ }
+ s"${getClass.getSimpleName}: [${table.name}]" +
+ pushedPartitionFiltersStr +
+ pushedDataFiltersStr +
+ pushedTopN.map(topN => s", TopN: [$topN]").getOrElse("") +
+ pushedLimit.map(limit => s", Limit: [$limit]").getOrElse("")
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
index df9921d563..b92476a085 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
@@ -19,27 +19,14 @@
package org.apache.paimon.spark
import org.apache.paimon.table.FormatTable
-import org.apache.paimon.types.RowType
-import org.apache.spark.sql.connector.read.{ScanBuilder,
SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.types.StructType
-
-import java.util.{List => JList}
-
-case class FormatTableScanBuilder(table: FormatTable)
- extends ScanBuilder
- with PaimonBasePushDown
- with SupportsPushDownRequiredColumns {
-
- override protected var partitionKeys: JList[String] = table.partitionKeys()
- override protected var rowType: RowType = table.rowType()
-
- protected var requiredSchema: StructType =
SparkTypeUtils.fromPaimonRowType(rowType)
+case class FormatTableScanBuilder(table: FormatTable) extends
PaimonBaseScanBuilder {
override def build(): PaimonFormatTableScan =
- PaimonFormatTableScan(table, requiredSchema, pushedPaimonPredicates,
pushDownLimit)
-
- override def pruneColumns(requiredSchema: StructType): Unit = {
- this.requiredSchema = requiredSchema
- }
+ PaimonFormatTableScan(
+ table,
+ requiredSchema,
+ pushedPartitionFilters,
+ pushedDataFilters,
+ pushedLimit)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
deleted file mode 100644
index 3f10202607..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.types.RowType
-
-import org.apache.spark.sql.PaimonUtils
-import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
-import org.apache.spark.sql.connector.read.{SupportsPushDownLimit,
SupportsPushDownV2Filters}
-import org.apache.spark.sql.sources.Filter
-
-import java.util.{List => JList}
-
-import scala.collection.mutable
-
-/** Base trait for Paimon scan push down. */
-trait PaimonBasePushDown extends SupportsPushDownV2Filters with
SupportsPushDownLimit {
-
- protected var partitionKeys: JList[String]
- protected var rowType: RowType
-
- private var pushedSparkPredicates = Array.empty[SparkPredicate]
- protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
- protected var reservedFilters: Array[Filter] = Array.empty
- protected var hasPostScanPredicates = false
- protected var pushDownLimit: Option[Int] = None
-
- override def pushPredicates(predicates: Array[SparkPredicate]):
Array[SparkPredicate] = {
- val pushable = mutable.ArrayBuffer.empty[(SparkPredicate, Predicate)]
- val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
- val reserved = mutable.ArrayBuffer.empty[Filter]
-
- val converter = SparkV2FilterConverter(rowType)
- val visitor = new PartitionPredicateVisitor(partitionKeys)
- predicates.foreach {
- predicate =>
- converter.convert(predicate) match {
- case Some(paimonPredicate) =>
- pushable.append((predicate, paimonPredicate))
- if (paimonPredicate.visit(visitor)) {
- // We need to filter the stats using filter instead of predicate.
- PaimonUtils.filterV2ToV1(predicate).map(reserved.append(_))
- } else {
- postScan.append(predicate)
- }
- case None =>
- postScan.append(predicate)
- }
- }
-
- if (pushable.nonEmpty) {
- this.pushedSparkPredicates = pushable.map(_._1).toArray
- this.pushedPaimonPredicates = pushable.map(_._2).toArray
- }
- if (reserved.nonEmpty) {
- this.reservedFilters = reserved.toArray
- }
- if (postScan.nonEmpty) {
- this.hasPostScanPredicates = true
- }
- postScan.toArray
- }
-
- override def pushedPredicates: Array[SparkPredicate] = {
- pushedSparkPredicates
- }
-
- override def pushLimit(limit: Int): Boolean = {
- // It is safe, since we will do nothing if it is the primary table and the
split is not `rawConvertible`
- pushDownLimit = Some(limit)
- // just make the best effort to push down limit
- false
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 7ae11b7ce3..f39f209415 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -19,10 +19,8 @@
package org.apache.paimon.spark
import org.apache.paimon.annotation.VisibleForTesting
-import org.apache.paimon.predicate.Predicate
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
-import org.apache.paimon.spark.statistics.StatisticsHelper
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.stats
import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
@@ -31,24 +29,17 @@ import org.apache.paimon.table.source.{InnerTableScan,
Split}
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics,
SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
-import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import java.util.Optional
import scala.collection.JavaConverters._
-abstract class PaimonBaseScan(
- table: InnerTable,
- requiredSchema: StructType,
- filters: Seq[Predicate],
- reservedFilters: Seq[Filter],
- pushDownLimit: Option[Int])
+abstract class PaimonBaseScan(table: InnerTable)
extends Scan
with SupportsReportStatistics
with ScanHelper
- with ColumnPruningAndPushDown
- with StatisticsHelper {
+ with ColumnPruningAndPushDown {
protected var inputPartitions: Seq[PaimonInputPartition] = _
@@ -59,8 +50,7 @@ abstract class PaimonBaseScan(
private lazy val paimonMetricsRegistry: SparkMetricRegistry =
SparkMetricRegistry()
lazy val requiredStatsSchema: StructType = {
- val fieldNames =
- readTableRowType.getFields.asScala.map(_.name) ++
reservedFilters.flatMap(_.references)
+ val fieldNames = readTableRowType.getFields.asScala.map(_.name)
StructType(tableSchema.filter(field => fieldNames.contains(field.name)))
}
@@ -96,13 +86,7 @@ abstract class PaimonBaseScan(
}
override def estimateStatistics(): Statistics = {
- val stats = PaimonStatistics(this)
- // When using paimon stats, we need to perform additional FilterEstimation
with reservedFilters on stats.
- if (stats.paimonStatsEnabled && reservedFilters.nonEmpty) {
- filterStatistics(stats, reservedFilters)
- } else {
- stats
- }
+ PaimonStatistics(this)
}
override def supportedCustomMetrics: Array[CustomMetric] = {
@@ -137,28 +121,4 @@ abstract class PaimonBaseScan(
case _ =>
}
}
-
- override def description(): String = {
- val pushedFiltersStr = if (filters.nonEmpty) {
- ", PushedFilters: [" + filters.mkString(",") + "]"
- } else {
- ""
- }
-
- val reservedFiltersStr = if (reservedFilters.nonEmpty) {
- ", ReservedFilters: [" + reservedFilters.mkString(",") + "]"
- } else {
- ""
- }
-
- val pushedTopNFilterStr = if (pushDownTopN.nonEmpty) {
- s", PushedTopNFilter: [${pushDownTopN.get.toString}]"
- } else {
- ""
- }
-
- s"PaimonScan: [${table.name}]" +
- pushedFiltersStr + reservedFiltersStr + pushedTopNFilterStr +
- pushDownLimit.map(limit => s", Limit: [$limit]").getOrElse("")
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index cacaf651fc..71ab544b71 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,24 +18,98 @@
package org.apache.paimon.spark
-import org.apache.paimon.predicate.{Predicate, TopN}
-import org.apache.paimon.table.InnerTable
+import org.apache.paimon.partition.PartitionPredicate
+import
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
+import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN}
+import org.apache.paimon.table.Table
+import org.apache.paimon.types.RowType
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
+import org.apache.spark.sql.connector.read.{SupportsPushDownLimit,
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
import org.apache.spark.sql.types.StructType
-abstract class PaimonBaseScanBuilder(table: InnerTable)
- extends ScanBuilder
- with SupportsPushDownRequiredColumns
- with Logging {
+import java.util.{List => JList}
- protected var requiredSchema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Base Scan builder. */
+abstract class PaimonBaseScanBuilder
+ extends SupportsPushDownRequiredColumns
+ with SupportsPushDownV2Filters
+ with SupportsPushDownLimit {
+
+ val table: Table
+ val partitionKeys: JList[String] = table.partitionKeys()
+ val rowType: RowType = table.rowType()
+
+ private var pushedSparkPredicates = Array.empty[SparkPredicate]
+ protected var hasPostScanPredicates = false
- protected var pushDownTopN: Option[TopN] = None
+ protected var pushedPartitionFilters: Array[PartitionPredicate] = Array.empty
+ protected var pushedDataFilters: Array[Predicate] = Array.empty
+ protected var pushedLimit: Option[Int] = None
+ protected var pushedTopN: Option[TopN] = None
+
+ protected var requiredSchema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
override def pruneColumns(requiredSchema: StructType): Unit = {
this.requiredSchema = requiredSchema
}
+
+ override def pushPredicates(predicates: Array[SparkPredicate]):
Array[SparkPredicate] = {
+ val pushable = mutable.ArrayBuffer.empty[SparkPredicate]
+ val pushablePartitionDataFilters = mutable.ArrayBuffer.empty[Predicate]
+ val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
+ val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
+
+ val converter = SparkV2FilterConverter(rowType)
+ val partitionPredicateVisitor = new
PartitionPredicateVisitor(partitionKeys)
+ predicates.foreach {
+ predicate =>
+ converter.convert(predicate) match {
+ case Some(paimonPredicate) =>
+ pushable.append(predicate)
+ if (paimonPredicate.visit(partitionPredicateVisitor)) {
+ pushablePartitionDataFilters.append(paimonPredicate)
+ } else {
+ pushableDataFilters.append(paimonPredicate)
+ postScan.append(predicate)
+ }
+ case None =>
+ postScan.append(predicate)
+ }
+ }
+
+ if (pushable.nonEmpty) {
+ this.pushedSparkPredicates = pushable.toArray
+ }
+ if (pushablePartitionDataFilters.nonEmpty) {
+ val pair = splitPartitionPredicatesAndDataPredicates(
+ pushablePartitionDataFilters.asJava,
+ rowType,
+ partitionKeys)
+ assert(pair.getRight.isEmpty)
+ assert(pair.getLeft.isPresent)
+ this.pushedPartitionFilters = Array(pair.getLeft.get())
+ }
+ if (pushableDataFilters.nonEmpty) {
+ this.pushedDataFilters = pushableDataFilters.toArray
+ }
+ if (postScan.nonEmpty) {
+ this.hasPostScanPredicates = true
+ }
+ postScan.toArray
+ }
+
+ override def pushedPredicates: Array[SparkPredicate] = {
+ pushedSparkPredicates
+ }
+
+ override def pushLimit(limit: Int): Boolean = {
+ // It is safe, since we will do nothing if it is the primary table and the
split is not `rawConvertible`
+ pushedLimit = Some(limit)
+ // just make the best effort to push down limit
+ false
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
index d9137a1250..1e8f415f30 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
@@ -38,5 +38,4 @@ case class PaimonBatch(
override def createReaderFactory(): PartitionReaderFactory =
PaimonPartitionReaderFactory(readBuilder, metadataColumns,
blobAsDescriptor)
-
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index a0dd474436..793e15540a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -18,22 +18,16 @@
package org.apache.paimon.spark
-import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.FormatTable
import org.apache.paimon.table.source.Split
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, Statistics,
SupportsReportStatistics}
-import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
/** Base Scan implementation for [[FormatTable]]. */
-abstract class PaimonFormatTableBaseScan(
- table: FormatTable,
- requiredSchema: StructType,
- filters: Seq[Predicate],
- pushDownLimit: Option[Int])
+abstract class PaimonFormatTableBaseScan
extends ColumnPruningAndPushDown
with SupportsReportStatistics
with ScanHelper {
@@ -82,13 +76,4 @@ abstract class PaimonFormatTableBaseScan(
PaimonResultedTableFilesTaskMetric(filesCount)
)
}
-
- override def description(): String = {
- val pushedFiltersStr = if (filters.nonEmpty) {
- ", PushedFilters: [" + filters.mkString(",") + "]"
- } else {
- ""
- }
- s"PaimonFormatTableScan: [${table.name}]" + pushedFiltersStr
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index 8fb64999b9..aac086c2a7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.FormatTable
@@ -27,6 +28,7 @@ import org.apache.spark.sql.types.StructType
case class PaimonFormatTableScan(
table: FormatTable,
requiredSchema: StructType,
- filters: Seq[Predicate],
- override val pushDownLimit: Option[Int])
- extends PaimonFormatTableBaseScan(table, requiredSchema, filters,
pushDownLimit) {}
+ pushedPartitionFilters: Seq[PartitionPredicate],
+ pushedDataFilters: Seq[Predicate],
+ override val pushedLimit: Option[Int])
+ extends PaimonFormatTableBaseScan {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
index 1f4e88e8d1..b4f8b3b785 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.table.Table
import org.apache.spark.sql.catalyst.InternalRow
@@ -30,15 +30,15 @@ case class PaimonLocalScan(
rows: Array[InternalRow],
readSchema: StructType,
table: Table,
- filters: Array[Predicate])
+ pushedPartitionFilters: Seq[PartitionPredicate])
extends LocalScan {
override def description(): String = {
- val pushedFiltersStr = if (filters.nonEmpty) {
- ", PushedFilters: [" + filters.mkString(",") + "]"
+ val pushedPartitionFiltersStr = if (pushedPartitionFilters.nonEmpty) {
+ ", PartitionFilters: [" + pushedPartitionFilters.mkString(",") + "]"
} else {
""
}
- s"PaimonLocalScan: [${table.name}]" + pushedFiltersStr
+ s"PaimonLocalScan: [${table.name}]" + pushedPartitionFiltersStr
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 08f0029989..f8a6e89df8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.CoreOptions.BucketFunctionType
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{Predicate, TopN}
import org.apache.paimon.spark.commands.BucketExpression.quote
import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
@@ -29,7 +30,6 @@ import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
import org.apache.spark.sql.connector.read.{SupportsReportOrdering,
SupportsReportPartitioning, SupportsRuntimeV2Filtering}
import
org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning,
Partitioning, UnknownPartitioning}
-import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
@@ -37,19 +37,12 @@ import scala.collection.JavaConverters._
case class PaimonScan(
table: InnerTable,
requiredSchema: StructType,
- filters: Seq[Predicate],
- reservedFilters: Seq[Filter],
- override val pushDownLimit: Option[Int],
- override val pushDownTopN: Option[TopN],
+ pushedPartitionFilters: Seq[PartitionPredicate],
+ pushedDataFilters: Seq[Predicate],
+ override val pushedLimit: Option[Int],
+ override val pushedTopN: Option[TopN],
bucketedScanDisabled: Boolean = false)
- extends PaimonScanCommon(
- table,
- requiredSchema,
- filters,
- reservedFilters,
- pushDownLimit,
- pushDownTopN,
- bucketedScanDisabled)
+ extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
with SupportsRuntimeV2Filtering {
def disableBucketedScan(): PaimonScan = {
copy(bucketedScanDisabled = true)
@@ -87,12 +80,8 @@ case class PaimonScan(
abstract class PaimonScanCommon(
table: InnerTable,
requiredSchema: StructType,
- filters: Seq[Predicate],
- reservedFilters: Seq[Filter],
- override val pushDownLimit: Option[Int],
- override val pushDownTopN: Option[TopN],
bucketedScanDisabled: Boolean = false)
- extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters,
pushDownLimit)
+ extends PaimonBaseScan(table)
with SupportsReportPartitioning
with SupportsReportOrdering {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 186cb116f5..c61ce42b80 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -18,33 +18,26 @@
package org.apache.paimon.spark
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate._
import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
import
org.apache.paimon.spark.aggregate.AggregatePushDownUtils.tryPushdownAggregation
import org.apache.paimon.table.{FileStoreTable, InnerTable}
-import org.apache.paimon.types.RowType
import org.apache.spark.sql.connector.expressions
import org.apache.spark.sql.connector.expressions.{NamedReference, SortOrder}
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
import org.apache.spark.sql.connector.read._
-import java.util.{List => JList}
-
import scala.collection.JavaConverters._
-class PaimonScanBuilder(table: InnerTable)
- extends PaimonBaseScanBuilder(table)
- with PaimonBasePushDown
+class PaimonScanBuilder(val table: InnerTable)
+ extends PaimonBaseScanBuilder
with SupportsPushDownAggregates
with SupportsPushDownTopN {
private var localScan: Option[Scan] = None
- override protected var partitionKeys: JList[String] = table.partitionKeys()
- override protected var rowType: RowType = table.rowType()
-
override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
if (hasPostScanPredicates) {
return false
@@ -86,7 +79,7 @@ class PaimonScanBuilder(table: InnerTable)
})
.toList
- pushDownTopN = Some(new TopN(sorts.asJava, limit))
+ pushedTopN = Some(new TopN(sorts.asJava, limit))
// just make the best effort to push down TopN
false
@@ -115,15 +108,15 @@ class PaimonScanBuilder(table: InnerTable)
}
val readBuilder = table.newReadBuilder
- if (pushedPaimonPredicates.nonEmpty) {
- val pushedPartitionPredicate =
PredicateBuilder.and(pushedPaimonPredicates.toList.asJava)
- readBuilder.withFilter(pushedPartitionPredicate)
+ if (pushedPartitionFilters.nonEmpty) {
+
readBuilder.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.toList.asJava))
}
+ assert(pushedDataFilters.isEmpty)
tryPushdownAggregation(table.asInstanceOf[FileStoreTable], aggregation,
readBuilder) match {
case Some(agg) =>
localScan = Some(
- PaimonLocalScan(agg.result(), agg.resultSchema(), table,
pushedPaimonPredicates)
+ PaimonLocalScan(agg.result(), agg.resultSchema(), table,
pushedPartitionFilters)
)
true
case None => false
@@ -131,16 +124,16 @@ class PaimonScanBuilder(table: InnerTable)
}
override def build(): Scan = {
- if (localScan.isDefined) {
- localScan.get
- } else {
- PaimonScan(
- table,
- requiredSchema,
- pushedPaimonPredicates,
- reservedFilters,
- pushDownLimit,
- pushDownTopN)
+ localScan match {
+ case Some(scan) => scan
+ case None =>
+ PaimonScan(
+ table,
+ requiredSchema,
+ pushedPartitionFilters,
+ pushedDataFilters,
+ pushedLimit,
+ pushedTopN)
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 2fc8d9d5f8..f47081e5b2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -27,9 +27,15 @@ import org.apache.spark.sql.connector.metric.{CustomMetric,
CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.types.StructType
-class PaimonSplitScanBuilder(table: KnownSplitsTable) extends
PaimonScanBuilder(table) {
+class PaimonSplitScanBuilder(val table: KnownSplitsTable) extends
PaimonBaseScanBuilder {
+
override def build(): Scan = {
- PaimonSplitScan(table, table.splits(), requiredSchema,
pushedPaimonPredicates)
+ PaimonSplitScan(
+ table,
+ table.splits(),
+ requiredSchema,
+ pushedPartitionFilters,
+ pushedDataFilters)
}
}
@@ -38,7 +44,8 @@ case class PaimonSplitScan(
table: InnerTable,
dataSplits: Array[DataSplit],
requiredSchema: StructType,
- filters: Seq[Predicate])
+ pushedPartitionFilters: Seq[PartitionPredicate],
+ pushedDataFilters: Seq[Predicate])
extends ColumnPruningAndPushDown
with ScanHelper {
@@ -64,13 +71,4 @@ case class PaimonSplitScan(
PaimonResultedTableFilesTaskMetric(filesCount)
)
}
-
- override def description(): String = {
- val pushedFiltersStr = if (filters.nonEmpty) {
- ", PushedFilters: [" + filters.mkString(",") + "]"
- } else {
- ""
- }
- s"PaimonSplitScan: [${table.name}]" + pushedFiltersStr
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
index 8dd4649330..cd8dcd1165 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
@@ -18,9 +18,9 @@
package org.apache.paimon.spark
-import org.apache.paimon.spark.data.SparkInternalRow
+import org.apache.paimon.stats
import org.apache.paimon.stats.ColStats
-import org.apache.paimon.types.{DataField, DataType, RowType}
+import org.apache.paimon.types.{DataField, DataType}
import org.apache.spark.sql.PaimonUtils
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
@@ -34,62 +34,45 @@ import scala.collection.JavaConverters._
case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics {
- private lazy val rowCount: Long =
scan.lazyInputPartitions.map(_.rowCount()).sum
+ import PaimonImplicits._
- private lazy val scannedTotalSize: Long = rowCount *
scan.readSchema().defaultSize
+ private lazy val paimonStats: Option[stats.Statistics] = scan.statistics
- private lazy val paimonStats = if (scan.statistics.isPresent)
scan.statistics.get() else null
+ private lazy val rowCount: Long =
scan.lazyInputPartitions.map(_.rowCount()).sum
- lazy val paimonStatsEnabled: Boolean = {
- paimonStats != null &&
- paimonStats.mergedRecordSize().isPresent &&
- paimonStats.mergedRecordCount().isPresent
+ private lazy val scannedTotalSize: Long = {
+ val readSchemaSize =
+
SparkTypeUtils.toPaimonRowType(scan.readSchema()).getFields.asScala.map(getSizeForField).sum
+ val sizeInBytes = rowCount * readSchemaSize
+ // Avoid return 0 bytes if there are some valid rows.
+ // Avoid return too small size in bytes which may less than row count,
+ // note the compression ratio on disk is usually bigger than memory.
+ Math.max(sizeInBytes, rowCount)
}
private def getSizeForField(field: DataField): Long = {
- Option(paimonStats.colStats().get(field.name()))
- .map(_.avgLen())
- .filter(_.isPresent)
- .map(_.getAsLong)
- .getOrElse(field.`type`().defaultSize().toLong)
- }
-
- private def getSizeForRow(schema: RowType): Long = {
- schema.getFields.asScala.map(field => getSizeForField(field)).sum
- }
-
- override def sizeInBytes(): OptionalLong = {
- if (!paimonStatsEnabled) {
- return OptionalLong.of(scannedTotalSize)
+ paimonStats match {
+ case Some(stats) =>
+ val colStat = stats.colStats().get(field.name())
+ if (colStat != null && colStat.avgLen().isPresent) {
+ colStat.avgLen().getAsLong
+ } else {
+ field.`type`().defaultSize().toLong
+ }
+ case _ =>
+ field.`type`().defaultSize().toLong
}
-
- val wholeSchemaSize = getSizeForRow(scan.tableRowType)
-
- val requiredDataSchemaSize =
- scan.readTableRowType.getFields.asScala.map(field =>
getSizeForField(field)).sum
- val requiredDataSizeInBytes =
- paimonStats.mergedRecordSize().getAsLong *
(requiredDataSchemaSize.toDouble / wholeSchemaSize)
-
- val metadataSchemaSize =
- scan.metadataColumns.map(field =>
getSizeForField(field.toPaimonDataField)).sum
- val metadataSizeInBytes = paimonStats.mergedRecordCount().getAsLong *
metadataSchemaSize
-
- val sizeInBytes = (requiredDataSizeInBytes + metadataSizeInBytes).toLong
- // Avoid return 0 bytes if there are some valid rows.
- // Avoid return too small size in bytes which may less than row count,
- // note the compression ratio on disk is usually bigger than memory.
- val normalized = Math.max(sizeInBytes,
paimonStats.mergedRecordCount().getAsLong)
- OptionalLong.of(normalized)
}
- override def numRows(): OptionalLong =
- if (paimonStatsEnabled) paimonStats.mergedRecordCount() else
OptionalLong.of(rowCount)
+ override def numRows(): OptionalLong = OptionalLong.of(rowCount)
+
+ override def sizeInBytes(): OptionalLong = OptionalLong.of(scannedTotalSize)
override def columnStats(): java.util.Map[NamedReference, ColumnStatistics]
= {
val requiredFields = scan.requiredStatsSchema.fieldNames
val resultMap = new java.util.HashMap[NamedReference, ColumnStatistics]()
- if (paimonStatsEnabled) {
- val paimonColStats = paimonStats.colStats()
+ if (paimonStats.isDefined) {
+ val paimonColStats = paimonStats.get.colStats()
scan.tableRowType.getFields.asScala
.filter {
field => requiredFields.contains(field.name) &&
paimonColStats.containsKey(field.name())
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
index 040c90170f..c6860779cd 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
@@ -439,21 +439,17 @@ abstract class AnalyzeTableTestBase extends
PaimonSparkTestBase {
// paimon will reserve partition filter and not return it to spark, we
need to ensure stats are filtered correctly.
// partition push down hit
var sql = "SELECT * FROM T WHERE pt < 1"
- Assertions.assertEquals(
- if (gteqSpark3_4) 0L else 4L,
- getScanStatistic(sql).rowCount.get.longValue)
+ Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue)
checkAnswer(spark.sql(sql), Nil)
// partition push down hit and select without it
sql = "SELECT id FROM T WHERE pt < 1"
- Assertions.assertEquals(
- if (gteqSpark3_4) 0L else 4L,
- getScanStatistic(sql).rowCount.get.longValue)
+ Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue)
checkAnswer(spark.sql(sql), Nil)
// partition push down not hit
sql = "SELECT * FROM T WHERE id < 1"
- Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue)
+ Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue)
checkAnswer(spark.sql(sql), Nil)
}
@@ -470,12 +466,17 @@ abstract class AnalyzeTableTestBase extends
PaimonSparkTestBase {
sql("INSERT INTO T VALUES (1, 'a', '1'), (2, 'b', '1'), (3, 'c',
'2'), (4, 'd', '3')")
sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
- // For col type such as char, varchar that don't have min and max,
filter estimation on stats has no effect.
var sqlText = "SELECT * FROM T WHERE pt < '1'"
- Assertions.assertEquals(4L,
getScanStatistic(sqlText).rowCount.get.longValue)
+ Assertions.assertEquals(
+ if (gteqSpark3_4 && partitionType == "char(10)") 4L else 0L,
+ getScanStatistic(sqlText).rowCount.get.longValue)
+ checkAnswer(sql(sqlText), Nil)
sqlText = "SELECT id FROM T WHERE pt < '1'"
- Assertions.assertEquals(4L,
getScanStatistic(sqlText).rowCount.get.longValue)
+ Assertions.assertEquals(
+ if (gteqSpark3_4 && partitionType == "char(10)") 4L else 0L,
+ getScanStatistic(sqlText).rowCount.get.longValue)
+ checkAnswer(sql(sqlText), Nil)
}
})
}
@@ -525,7 +526,7 @@ abstract class AnalyzeTableTestBase extends
PaimonSparkTestBase {
spark.sql("ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
val withColStat = checkStatistics()
- assert(withColStat == noColStat)
+ assert(withColStat != noColStat)
}
test("Query a non-existent catalog") {
@@ -553,5 +554,4 @@ abstract class AnalyzeTableTestBase extends
PaimonSparkTestBase {
.get
relation.computeStats()
}
-
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index e55a610a02..d417b5f405 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -102,8 +102,13 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
spark.sql(s"CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED
BY (pt)")
spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3,
'c', 'p2')")
+ // data filter and partition filter
val sqlText = "SELECT * FROM T WHERE id = 1 AND pt = 'p1' LIMIT 1"
Assertions.assertEquals(getPaimonScan(sqlText), getPaimonScan(sqlText))
+
+ // topN
+ val sqlText2 = "SELECT id FROM T ORDER BY id ASC NULLS LAST LIMIT 5"
+ Assertions.assertEquals(getPaimonScan(sqlText2), getPaimonScan(sqlText2))
}
}