This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 283332e9f77b04cea74abcba0cda5d9e9773fd4d Author: Tan-JiaLiang <[email protected]> AuthorDate: Wed Aug 27 17:01:52 2025 +0800 [core] Support TopN pushdown to pick the DataSplits (#6143) --- .../bitmap/RangeBitmapIndexPushDownBenchmark.java | 4 +- .../java/org/apache/paimon/predicate/TopN.java | 9 + .../java/org/apache/paimon/utils/ListUtils.java | 3 +- .../rangebitmap/RangeBitmapFileIndexTest.java | 46 +--- .../apache/paimon/stats/SimpleStatsEvolution.java | 26 ++ .../java/org/apache/paimon/stats/StatsUtils.java | 51 ++++ .../paimon/table/AbstractFileStoreTable.java | 1 + .../paimon/table/FallbackReadFileStoreTable.java | 8 + .../paimon/table/source/AbstractDataTableScan.java | 2 +- .../org/apache/paimon/table/source/DataSplit.java | 40 +++ .../paimon/table/source/DataTableBatchScan.java | 40 ++- .../apache/paimon/table/source/InnerTableScan.java | 5 + .../paimon/table/source/ReadBuilderImpl.java | 3 + .../table/source/TopNDataSplitEvaluator.java | 267 ++++++++++++++++++++ .../paimon/table/system/ReadOptimizedTable.java | 1 + .../paimon/table/AppendOnlySimpleTableTest.java | 31 +-- .../paimon/table/PrimaryKeySimpleTableTest.java | 20 +- .../org/apache/paimon/table/source/SplitTest.java | 34 +++ .../apache/paimon/table/source/TableScanTest.java | 278 +++++++++++++++++++++ .../table/source/snapshot/ScannerTestBase.java | 14 ++ .../paimon/spark/ColumnPruningAndPushDown.scala | 8 +- .../spark/aggregate/AggregatePushDownUtils.scala | 32 +-- .../paimon/spark/sql/PaimonPushDownTestBase.scala | 63 +++-- 23 files changed, 855 insertions(+), 131 deletions(-) diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java index 77c88afe56..c286b8727d 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java @@ -32,7 +32,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; @@ -213,8 +212,7 @@ public class RangeBitmapIndexPushDownBenchmark { () -> { Table table = tables.get(name); FieldRef ref = new FieldRef(0, "k", DataTypes.INT()); - SortValue sort = new SortValue(ref, DESCENDING, NULLS_LAST); - TopN topN = new TopN(Collections.singletonList(sort), k); + TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k); List<Split> splits = table.newReadBuilder().newScan().plan().splits(); AtomicLong readCount = new AtomicLong(0); try { diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java index 2ecb38cce3..e66de2c560 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java @@ -18,9 +18,12 @@ package org.apache.paimon.predicate; +import org.apache.paimon.predicate.SortValue.NullOrdering; +import org.apache.paimon.predicate.SortValue.SortDirection; import org.apache.paimon.utils.Preconditions; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -37,6 +40,12 @@ public class TopN implements Serializable { this.limit = limit; } + public TopN(FieldRef ref, SortDirection direction, NullOrdering nullOrdering, int limit) { + SortValue order = new SortValue(ref, direction, nullOrdering); + this.orders = Collections.singletonList(order); + this.limit = limit; + } + public List<SortValue> orders() { return orders; } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java index e870b68e1d..a93f9d016c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.utils; +import java.util.Collection; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -32,7 +33,7 @@ public class ListUtils { return list.get(index); } - public static <T> boolean isNullOrEmpty(List<T> list) { + public static <T> boolean isNullOrEmpty(Collection<T> list) { return list == null || list.isEmpty(); } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java index 904f24826f..90656c2081 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java @@ -25,7 +25,6 @@ import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.fs.ByteArraySeekableStream; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.FieldRef; -import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.types.IntType; import org.apache.paimon.types.VarCharType; @@ -36,7 +35,6 @@ import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -283,11 +281,7 @@ public class RangeBitmapFileIndexTest { RoaringBitmap32 expected = new RoaringBitmap32(); // test NULL_LAST without found set - TopN topN = - new TopN( - Collections.singletonList( - new SortValue(fieldRef, ASCENDING, NULLS_LAST)), - k); + TopN topN = new TopN(fieldRef, ASCENDING, NULLS_LAST, k); pairs.stream() .sorted(nullLastCompactor) .limit(k) @@ -298,11 +292,7 @@ public class RangeBitmapFileIndexTest { // test NULL_LAST with found set expected.clear(); - topN = - new TopN( - Collections.singletonList( - new SortValue(fieldRef, ASCENDING, NULLS_LAST)), - k); + topN = new TopN(fieldRef, ASCENDING, NULLS_LAST, k); pairs.stream() .filter(pair -> foundSet.contains(pair.getKey())) .sorted(nullLastCompactor) @@ -317,11 +307,7 @@ public class RangeBitmapFileIndexTest { // test NULL_FIRST without found set expected.clear(); - topN = - new TopN( - Collections.singletonList( - new SortValue(fieldRef, ASCENDING, NULLS_FIRST)), - k); + topN = new TopN(fieldRef, ASCENDING, NULLS_FIRST, k); pairs.stream() .sorted(nullFirstCompactor) .limit(k) @@ -332,11 +318,7 @@ public class RangeBitmapFileIndexTest { // test NULL_FIRST with found set expected.clear(); - topN = - new TopN( - Collections.singletonList( - new SortValue(fieldRef, ASCENDING, NULLS_FIRST)), - k); + topN = new TopN(fieldRef, ASCENDING, NULLS_FIRST, k); pairs.stream() .filter(pair -> foundSet.contains(pair.getKey())) .sorted(nullFirstCompactor) @@ -406,38 +388,26 @@ public class RangeBitmapFileIndexTest { // test bottomK BitmapIndexResult result = new BitmapIndexResult(() -> RoaringBitmap32.bitmapOf(0, 3, 4, 5)); - TopN bottomNullFirst = - new TopN( - Collections.singletonList(new SortValue(fieldRef, ASCENDING, NULLS_FIRST)), - 3); + TopN bottomNullFirst = new TopN(fieldRef, ASCENDING, NULLS_FIRST, 3); assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst, null)).get()) .isEqualTo(RoaringBitmap32.bitmapOf(0, 5, 6)); assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst, result)).get()) .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 5)); - TopN bottomNullLast = - new TopN( - Collections.singletonList(new SortValue(fieldRef, ASCENDING, NULLS_LAST)), - 3); + TopN bottomNullLast = new TopN(fieldRef, ASCENDING, NULLS_LAST, 3); assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast, null)).get()) .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2)); assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast, result)).get()) .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4)); // test topK - TopN topNullFirst = - new TopN( - Collections.singletonList(new SortValue(fieldRef, DESCENDING, NULLS_FIRST)), - 3); + TopN topNullFirst = new TopN(fieldRef, DESCENDING, NULLS_FIRST, 3); assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst, null)).get()) .isEqualTo(RoaringBitmap32.bitmapOf(5, 6, 7)); assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst, result)).get()) .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5)); - TopN topNullLast = - new TopN( - Collections.singletonList(new SortValue(fieldRef, DESCENDING, NULLS_LAST)), - 3); + TopN topNullLast = new TopN(fieldRef, DESCENDING, NULLS_LAST, 3); assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast, null)).get()) .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 7)); assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast, result)).get()) diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java index b1c7cfebee..ad9f4f4b22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java @@ -88,6 +88,32 @@ public class SimpleStatsEvolution { return result; } + public InternalArray evolution( + InternalArray array, Long rowCount, @Nullable List<String> denseFields) { + InternalArray nullCounts = array; + + if (denseFields != null && denseFields.isEmpty()) { + // optimize for empty dense fields + nullCounts = emptyNullCounts; + } else if (denseFields != null) { + int[] denseIndexMapping = + indexMappings.computeIfAbsent( + denseFields, + k -> fieldNames.stream().mapToInt(denseFields::indexOf).toArray()); + nullCounts = ProjectedArray.from(denseIndexMapping).replaceArray(nullCounts); + } + + if (indexMapping != null) { + if (rowCount == null) { + throw new RuntimeException("Schema Evolution for stats needs row count."); + } + + nullCounts = new NullCountsEvoArray(indexMapping, nullCounts, rowCount); + } + + return nullCounts; + } + public Result evolution( SimpleStats stats, @Nullable Long rowCount, @Nullable List<String> denseFields) { InternalRow minValues = stats.minValues(); diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java b/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java new file mode 100644 index 0000000000..ec52cda0d9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.stats; + +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TinyIntType; + +/** Utils for Stats. */ +public class StatsUtils { + + public static boolean minmaxAvailable(DataType type) { + // not push down complex type + // not push down Timestamp because INT96 sort order is undefined, + // Parquet doesn't return statistics for INT96 + // not push down Parquet Binary because min/max could be truncated + // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary + // could be Spark StringType, BinaryType or DecimalType. + // not push down for ORC with same reason. + return type instanceof BooleanType + || type instanceof TinyIntType + || type instanceof SmallIntType + || type instanceof IntType + || type instanceof BigIntType + || type instanceof FloatType + || type instanceof DoubleType + || type instanceof DateType; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 6757c8ef38..db0ca5e5ff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -268,6 +268,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { public DataTableBatchScan newScan() { return new DataTableBatchScan( tableSchema, + schemaManager(), coreOptions(), newSnapshotReader(), catalogEnvironment.tableQueryAuth(coreOptions())); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index f53cfaa985..11bc764a57 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.TopN; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataFilePlan; @@ -366,6 +367,13 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { return this; } + @Override + public InnerTableScan withTopN(TopN topN) { + mainScan.withTopN(topN); + fallbackScan.withTopN(topN); + return this; + } + @Override public InnerTableScan dropStats() { mainScan.dropStats(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index b019f5dbda..67acb44066 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -78,7 +78,7 @@ abstract class AbstractDataTableScan implements DataTableScan { private static final Logger LOG = LoggerFactory.getLogger(AbstractDataTableScan.class); - private final TableSchema schema; + protected final TableSchema schema; private final CoreOptions options; protected final SnapshotReader snapshotReader; private final TableQueryAuth queryAuth; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 1feb72afa3..276cf5fe50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; @@ -34,6 +35,7 @@ import org.apache.paimon.predicate.CompareUtils; import org.apache.paimon.stats.SimpleStatsEvolution; import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.FunctionWithIOException; import org.apache.paimon.utils.InternalRowUtils; import org.apache.paimon.utils.SerializationUtils; @@ -44,13 +46,16 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX; +import static org.apache.paimon.utils.ListUtils.isNullOrEmpty; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -153,6 +158,21 @@ public class DataSplit implements Split { return partialMergedRowCount(); } + public boolean statsAvailable(Set<String> columns) { + if (isNullOrEmpty(columns)) { + return false; + } + + return dataFiles.stream() + .map(DataFileMeta::valueStatsCols) + .allMatch( + valueStatsCols -> + // It means there are all column statistics when valueStatsCols == + // null + valueStatsCols == null + || new HashSet<>(valueStatsCols).containsAll(columns)); + } + public Object minValue(int fieldIndex, DataField dataField, SimpleStatsEvolutions evolutions) { Object minValue = null; for (DataFileMeta dataFile : dataFiles) { @@ -191,6 +211,26 @@ public class DataSplit implements Split { return maxValue; } + public Long nullCount(int fieldIndex, SimpleStatsEvolutions evolutions) { + Long sum = null; + for (DataFileMeta dataFile : dataFiles) { + SimpleStatsEvolution evolution = evolutions.getOrCreate(dataFile.schemaId()); + InternalArray nullCounts = + evolution.evolution( + dataFile.valueStats().nullCounts(), + dataFile.rowCount(), + dataFile.valueStatsCols()); + Long nullCount = + (Long) InternalRowUtils.get(nullCounts, fieldIndex, DataTypes.BIGINT()); + if (sum == null) { + sum = nullCount; + } else if (nullCount != null) { + sum += nullCount; + } + } + return sum; + } + /** * Obtain merged row count as much as possible. There are two scenarios where accurate row count * can be calculated: diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index d7f02c06ad..ec8eb8ee0b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -21,6 +21,8 @@ package org.apache.paimon.table.source; import org.apache.paimon.CoreOptions; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.TopN; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.source.snapshot.SnapshotReader; @@ -37,16 +39,20 @@ public class DataTableBatchScan extends AbstractDataTableScan { private boolean hasNext; private Integer pushDownLimit; + private TopN topN; + + private final SchemaManager schemaManager; public DataTableBatchScan( TableSchema schema, + SchemaManager schemaManager, CoreOptions options, SnapshotReader snapshotReader, TableQueryAuth queryAuth) { super(schema, options, snapshotReader, queryAuth); this.hasNext = true; - + this.schemaManager = schemaManager; if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) { if (options.toConfiguration() .get(CoreOptions.BATCH_SCAN_MODE) @@ -71,6 +77,12 @@ public class DataTableBatchScan extends AbstractDataTableScan { return this; } + @Override + public InnerTableScan withTopN(TopN topN) { + this.topN = topN; + return this; + } + @Override public TableScan.Plan plan() { authQuery(); @@ -82,8 +94,9 @@ public class DataTableBatchScan extends AbstractDataTableScan { if (hasNext) { hasNext = false; StartingScanner.Result result = startingScanner.scan(snapshotReader); - StartingScanner.Result limitedResult = applyPushDownLimit(result); - return DataFilePlan.fromResult(limitedResult); + result = applyPushDownLimit(result); + result = applyPushDownTopN(result); + return DataFilePlan.fromResult(result); } else { throw new EndOfScanException(); } @@ -123,6 +136,27 @@ public class DataTableBatchScan extends AbstractDataTableScan { return result; } + private StartingScanner.Result applyPushDownTopN(StartingScanner.Result result) { + if (topN == null + || pushDownLimit != null + || !(result instanceof ScannedResult) + || !schema.primaryKeys().isEmpty() + || options().deletionVectorsEnabled()) { + return result; + } + + SnapshotReader.Plan plan = ((ScannedResult) result).plan(); + List<DataSplit> splits = plan.dataSplits(); + if (splits.isEmpty()) { + return result; + } + + TopNDataSplitEvaluator evaluator = new TopNDataSplitEvaluator(schema, schemaManager); + List<Split> topNSplits = new ArrayList<>(evaluator.evaluate(topN, splits)); + SnapshotReader.Plan newPlan = new PlanImpl(plan.watermark(), plan.snapshotId(), topNSplits); + return new ScannedResult(newPlan); + } + @Override public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 89f15055d5..d0015445b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.TopN; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; @@ -77,6 +78,10 @@ public interface InnerTableScan extends TableScan { return this; } + default InnerTableScan withTopN(TopN topN) { + return this; + } + default InnerTableScan dropStats() { // do nothing, should implement this if need return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index e57a7b6382..662ac19858 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -167,6 +167,9 @@ public class ReadBuilderImpl implements ReadBuilder { if (limit != null) { tableScan.withLimit(limit); } + if (topN != null) { + tableScan.withTopN(topN); + } return tableScan; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java new file mode 100644 index 0000000000..87c6b23cfd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.predicate.CompareUtils; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.SortValue; +import org.apache.paimon.predicate.TopN; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.stats.SimpleStatsEvolutions; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.Pair; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST; +import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; +import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING; +import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; +import static org.apache.paimon.stats.StatsUtils.minmaxAvailable; + +/** Evaluate DataSplit TopN result. */ +public class TopNDataSplitEvaluator { + + private final Map<Long, TableSchema> tableSchemas; + private final TableSchema schema; + private final SchemaManager schemaManager; + + public TopNDataSplitEvaluator(TableSchema schema, SchemaManager schemaManager) { + this.tableSchemas = new HashMap<>(); + this.schema = schema; + this.schemaManager = schemaManager; + } + + public List<DataSplit> evaluate(TopN topN, List<DataSplit> splits) { + // todo: we can support all the sort columns. + List<SortValue> orders = topN.orders(); + if (orders.size() != 1) { + return splits; + } + + int limit = topN.limit(); + if (limit >= splits.size()) { + return splits; + } + + SortValue order = orders.get(0); + DataType type = order.field().type(); + if (!minmaxAvailable(type)) { + return splits; + } + + return getTopNSplits(order, limit, splits); + } + + private List<DataSplit> getTopNSplits(SortValue order, int limit, List<DataSplit> splits) { + FieldRef ref = order.field(); + SortValue.SortDirection direction = order.direction(); + SortValue.NullOrdering nullOrdering = order.nullOrdering(); + + int index = ref.index(); + DataField field = schema.fields().get(index); + SimpleStatsEvolutions evolutions = + new SimpleStatsEvolutions((id) -> scanTableSchema(id).fields(), schema.id()); + + // extract the stats + List<DataSplit> results = new ArrayList<>(); + List<Pair<Stats, DataSplit>> pairs = new ArrayList<>(); + for (DataSplit split : splits) { + if (!split.rawConvertible()) { + return splits; + } + + Set<String> cols = Collections.singleton(field.name()); + if (!split.statsAvailable(cols)) { + results.add(split); + continue; + } + + Object min = split.minValue(index, field, evolutions); + Object max = split.maxValue(index, field, evolutions); + Long nullCount = split.nullCount(index, evolutions); + Stats stats = new Stats(min, max, nullCount); + pairs.add(Pair.of(stats, split)); + } + + // pick the TopN splits + if (NULLS_FIRST.equals(nullOrdering)) { + results.addAll(pickNullFirstSplits(pairs, ref, direction, limit)); + } else if (NULLS_LAST.equals(nullOrdering)) { + results.addAll(pickNullLastSplits(pairs, ref, direction, limit)); + } else { + return splits; + } + + return results; + } + + private List<DataSplit> pickNullFirstSplits( + List<Pair<Stats, DataSplit>> pairs, + FieldRef field, + SortValue.SortDirection direction, + int limit) { + Comparator<Pair<Stats, DataSplit>> comparator; + if (ASCENDING.equals(direction)) { + comparator = + (x, y) -> { + Stats left = x.getKey(); + Stats right = y.getKey(); + int result = nullsFirst(left.nullCount, right.nullCount); + if (result == 0) { + result = asc(field, left.min, right.min); + } + return result; + }; + } else if (DESCENDING.equals(direction)) { + comparator = + (x, y) -> { + Stats left = x.getKey(); + Stats right = y.getKey(); + int result = nullsFirst(left.nullCount, right.nullCount); + if (result == 0) { + result = desc(field, left.max, right.max); + } + return result; + }; + } else { + return pairs.stream().map(Pair::getValue).collect(Collectors.toList()); + } + pairs.sort(comparator); + + long scanned = 0; + List<DataSplit> splits = new ArrayList<>(); + for (Pair<Stats, DataSplit> pair : pairs) { + Stats stats = pair.getKey(); + DataSplit split = pair.getValue(); + splits.add(split); + scanned += Math.max(stats.nullCount, 1); + if (scanned >= limit) { + break; + } + } + return splits; + } + + private List<DataSplit> pickNullLastSplits( + List<Pair<Stats, DataSplit>> pairs, + FieldRef field, + SortValue.SortDirection direction, + int limit) { + Comparator<Pair<Stats, DataSplit>> comparator; + if (ASCENDING.equals(direction)) { + comparator = + (x, y) -> { + Stats left = x.getKey(); + Stats right = y.getKey(); + int result = asc(field, left.min, right.min); + if (result == 0) { + result = nullsLast(left.nullCount, right.nullCount); + } + return result; + }; + } else if (DESCENDING.equals(direction)) { + comparator = + (x, y) -> { + Stats left = x.getKey(); + Stats right = y.getKey(); + int result = desc(field, left.max, right.max); + if (result == 0) { + result = nullsLast(left.nullCount, right.nullCount); + } + return result; + }; + } else { + return pairs.stream().map(Pair::getValue).collect(Collectors.toList()); + } + + return pairs.stream() + .sorted(comparator) + .map(Pair::getValue) + .limit(limit) + .collect(Collectors.toList()); + } + + private int nullsFirst(Long left, Long right) { + if (left == null) { + return -1; + } else if (right == null) { + return 1; + } else { + return -Long.compare(left, right); + } + } + + private int nullsLast(Long left, Long right) { + if (left == null) { + return -1; + } else if (right == null) { + return 1; + } else { + return Long.compare(left, right); + } + } + + private int asc(FieldRef field, Object left, Object right) { + if (left == null) { + return -1; + } else if (right == null) { + return 1; + } else { + return CompareUtils.compareLiteral(field.type(), left, right); + } + } + + private int desc(FieldRef field, Object left, Object right) { + if (left == null) { + return -1; + } else if (right == null) { + return 1; + } else { + return -CompareUtils.compareLiteral(field.type(), left, right); + } + } + + private TableSchema scanTableSchema(long id) { + return tableSchemas.computeIfAbsent( + id, key -> key == schema.id() ? schema : schemaManager.schema(id)); + } + + /** The DataSplit's stats. */ + private static class Stats { + Object min; + Object max; + Long nullCount; + + public Stats(Object min, Object max, Long nullCount) { + this.min = min; + this.max = max; + this.nullCount = nullCount; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 0094313238..87d79bc537 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -149,6 +149,7 @@ public class ReadOptimizedTable implements DataTable, ReadonlyTable { CoreOptions options = wrapped.coreOptions(); return new DataTableBatchScan( wrapped.schema(), + schemaManager(), options, newSnapshotReader(wrapped), wrapped.catalogEnvironment().tableQueryAuth(options)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index f8056cc492..cf246b13b5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -43,7 +43,6 @@ import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; @@ -101,6 +100,8 @@ import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE; import static org.apache.paimon.CoreOptions.WRITE_ONLY; import static org.apache.paimon.io.DataFileTestUtils.row; +import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; +import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -882,13 +883,9 @@ public class AppendOnlySimpleTableTest extends SimpleTableTestBase { RoaringBitmap32 bitmap = new RoaringBitmap32(); expected.forEach(bitmap::add); DataField field = rowType.getField("price"); - SortValue sort = - new SortValue( - new FieldRef(field.id(), field.name(), field.type()), - SortValue.SortDirection.DESCENDING, - SortValue.NullOrdering.NULLS_LAST); - TopN topN = new TopN(Collections.singletonList(sort), k); - TableScan.Plan plan = table.newScan().plan(); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k); + TableScan.Plan plan = table.newScan().withTopN(topN).plan(); RecordReader<InternalRow> reader = table.newRead().withTopN(topN).createReader(plan.splits()); AtomicInteger cnt = new AtomicInteger(0); @@ -906,13 +903,9 @@ public class AppendOnlySimpleTableTest extends SimpleTableTestBase { // test TopK without index { DataField field = rowType.getField("id"); - SortValue sort = - new SortValue( - new FieldRef(field.id(), field.name(), field.type()), - SortValue.SortDirection.DESCENDING, - SortValue.NullOrdering.NULLS_LAST); - TopN topN = new TopN(Collections.singletonList(sort), k); - TableScan.Plan plan = table.newScan().plan(); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k); + TableScan.Plan plan = table.newScan().withTopN(topN).plan(); RecordReader<InternalRow> reader = table.newRead().withTopN(topN).createReader(plan.splits()); AtomicInteger cnt = new AtomicInteger(0); @@ -933,12 +926,8 @@ public class AppendOnlySimpleTableTest extends SimpleTableTestBase { .build(); table = createUnawareBucketFileStoreTable(rowType, configure); DataField field = rowType.getField("price"); - SortValue sort = - new SortValue( - new FieldRef(field.id(), field.name(), field.type()), - SortValue.SortDirection.DESCENDING, - SortValue.NullOrdering.NULLS_LAST); - TopN topN = new TopN(Collections.singletonList(sort), k); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k); TableScan.Plan plan = table.newScan().plan(); RecordReader<InternalRow> reader = table.newRead().withTopN(topN).createReader(plan.splits()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index d4f1484230..ec0675a53a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -44,7 +44,6 @@ import org.apache.paimon.postpone.PostponeBucketWriter; import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; @@ -142,6 +141,9 @@ import static org.apache.paimon.Snapshot.CommitKind.COMPACT; import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.apache.paimon.predicate.PredicateBuilder.and; +import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; +import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING; +import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -1256,12 +1258,8 @@ public class PrimaryKeySimpleTableTest extends SimpleTableTestBase { int k = new Random().nextInt(100); RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(min, min + k); DataField field = table.schema().nameToFieldMap().get(indexColumnName); - SortValue sort = - new SortValue( - new FieldRef(field.id(), field.name(), field.type()), - SortValue.SortDirection.ASCENDING, - SortValue.NullOrdering.NULLS_LAST); - TopN topN = new TopN(Collections.singletonList(sort), k); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + TopN topN = new TopN(ref, ASCENDING, NULLS_LAST, k); TableScan.Plan plan = table.newScan().plan(); RecordReader<InternalRow> reader = table.newRead().withTopN(topN).createReader(plan.splits()); @@ -1282,12 +1280,8 @@ public class PrimaryKeySimpleTableTest extends SimpleTableTestBase { int k = new Random().nextInt(100); RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(max - k, max); DataField field = table.schema().nameToFieldMap().get(indexColumnName); - SortValue sort = - new SortValue( - new FieldRef(field.id(), field.name(), field.type()), - SortValue.SortDirection.DESCENDING, - SortValue.NullOrdering.NULLS_LAST); - TopN topN = new TopN(Collections.singletonList(sort), k); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k); TableScan.Plan plan = table.newScan().plan(); RecordReader<InternalRow> reader = table.newRead().withTopN(topN).createReader(plan.splits()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 6eb11583f4..4d6d842f75 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -160,6 +160,40 @@ public class SplitTest { assertThat(split2.maxValue(3, floatField, evolutions)).isEqualTo(null); } + @Test + public void testSplitNullCount() { + Map<Long, List<DataField>> schemas = new HashMap<>(); + DataField intField = new DataField(0, "c_int", new IntType()); + DataField longField = new DataField(1, "c_long", new BigIntType()); + schemas.put(1L, Arrays.asList(intField, longField)); + + // test common + BinaryRow min1 = newBinaryRow(new Object[] {10, 123L}); + BinaryRow max1 = newBinaryRow(new Object[] {99, 456L}); + SimpleStats valueStats1 = new SimpleStats(min1, max1, fromLongArray(new Long[] {5L, 1L})); + + BinaryRow min2 = newBinaryRow(new Object[] {5, 0L}); + BinaryRow max2 = newBinaryRow(new Object[] {90, 789L}); + SimpleStats valueStats2 = new SimpleStats(min2, max2, fromLongArray(new Long[] {3L, 2L})); + + DataFileMeta d1 = newDataFile(100, valueStats1, null); + DataFileMeta d2 = newDataFile(100, valueStats2, null); + DataSplit split1 = newDataSplit(true, Arrays.asList(d1, d2), null); + + SimpleStatsEvolutions evolutions = new SimpleStatsEvolutions(schemas::get, 1); + assertThat(split1.nullCount(0, evolutions)).isEqualTo(8); + assertThat(split1.nullCount(1, evolutions)).isEqualTo(3); + + // test schema evolution + DataField doubleField = new DataField(2, "c_double", new DoubleType()); + schemas.put(2L, Arrays.asList(intField, longField, doubleField)); + evolutions = new SimpleStatsEvolutions(schemas::get, 2); + + assertThat(split1.nullCount(0, evolutions)).isEqualTo(8); + assertThat(split1.nullCount(1, evolutions)).isEqualTo(3); + assertThat(split1.nullCount(2, evolutions)).isEqualTo(200); + } + @Test public void testSerializer() throws IOException { DataFileTestDataGenerator gen = DataFileTestDataGenerator.builder().build(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java index 5f565339f9..0fd381f918 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java @@ -18,16 +18,28 @@ package org.apache.paimon.table.source; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.TopN; +import org.apache.paimon.stats.SimpleStatsEvolutions; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.source.snapshot.ScannerTestBase; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST; +import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; +import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING; +import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -98,4 +110,270 @@ public class TableScanTest extends ScannerTestBase { write.close(); commit.close(); } + + @Test + public void testPushDownTopN() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(2, 20, 200L)); + write.write(rowData(3, 30, 300L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(4, 40, 400L)); + write.write(rowData(5, 50, 500L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(6, null, 600L)); + write.write(rowData(6, null, 600L)); + write.write(rowData(6, 60, 600L)); + write.write(rowData(7, null, 700L)); + write.write(rowData(7, null, 700L)); + write.write(rowData(7, 70, 700L)); + commit.commit(2, write.prepareCommit(true, 2)); + write.close(); + commit.close(); + + // disable stats + FileStoreTable disableStatsTable = + table.copy(Collections.singletonMap("metadata.stats-mode", "none")); + write = disableStatsTable.newWrite(commitUser); + commit = disableStatsTable.newCommit(commitUser); + + write.write(rowData(8, 80, 800L)); + write.write(rowData(9, 90, 900L)); + commit.commit(3, write.prepareCommit(true, 3)); + + // no TopN pushed down + TableScan.Plan plan1 = table.newScan().plan(); + assertThat(plan1.splits().size()).isEqualTo(9); + + DataField field = table.schema().fields().get(1); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + SimpleStatsEvolutions evolutions = + new SimpleStatsEvolutions( + (id) -> table.schemaManager().schema(id).fields(), table.schema().id()); + + // with bottom1 null first + TableScan.Plan plan2 = + table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 1)).plan(); + List<Split> splits2 = plan2.splits(); + assertThat(splits2.size()).isEqualTo(3); + assertThat(((DataSplit) splits2.get(0)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits2.get(1)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits2.get(2)).minValue(field.id(), field, evolutions)) + .isEqualTo(60); + assertThat(((DataSplit) splits2.get(2)).nullCount(field.id(), evolutions)).isEqualTo(2); + + // with bottom1 null last + TableScan.Plan plan3 = + table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_LAST, 1)).plan(); + List<Split> splits3 = plan3.splits(); + assertThat(splits3.size()).isEqualTo(3); + assertThat(((DataSplit) splits3.get(0)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits3.get(1)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits3.get(2)).minValue(field.id(), field, evolutions)) + .isEqualTo(10); + + // with bottom5 null first + TableScan.Plan plan4 = + table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 5)).plan(); + List<Split> splits4 = plan4.splits(); + assertThat(splits4.size()).isEqualTo(5); + assertThat(((DataSplit) splits4.get(0)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits4.get(1)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits4.get(2)).minValue(field.id(), field, evolutions)) + .isEqualTo(60); + assertThat(((DataSplit) splits4.get(2)).nullCount(field.id(), evolutions)).isEqualTo(2); + assertThat(((DataSplit) splits4.get(3)).minValue(field.id(), field, evolutions)) + .isEqualTo(70); + assertThat(((DataSplit) splits4.get(3)).nullCount(field.id(), evolutions)).isEqualTo(2); + assertThat(((DataSplit) splits4.get(4)).minValue(field.id(), field, evolutions)) + .isEqualTo(10); + + // with bottom5 null last + TableScan.Plan plan5 = + table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_LAST, 5)).plan(); + List<Split> splits5 = plan5.splits(); + assertThat(splits5.size()).isEqualTo(7); + assertThat(((DataSplit) splits5.get(0)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits5.get(1)).minValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits5.get(2)).minValue(field.id(), field, evolutions)) + .isEqualTo(10); + assertThat(((DataSplit) splits5.get(3)).minValue(field.id(), field, evolutions)) + .isEqualTo(20); + assertThat(((DataSplit) splits5.get(4)).minValue(field.id(), field, evolutions)) + .isEqualTo(30); + assertThat(((DataSplit) splits5.get(5)).minValue(field.id(), field, evolutions)) + .isEqualTo(40); + assertThat(((DataSplit) splits5.get(6)).minValue(field.id(), field, evolutions)) + .isEqualTo(50); + + // with top1 null first + TableScan.Plan plan6 = + table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_FIRST, 1)).plan(); + List<Split> splits6 = plan6.splits(); + assertThat(splits6.size()).isEqualTo(3); + assertThat(((DataSplit) splits6.get(0)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits6.get(1)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits6.get(2)).maxValue(field.id(), field, evolutions)) + .isEqualTo(70); + assertThat(((DataSplit) splits6.get(2)).nullCount(field.id(), evolutions)).isEqualTo(2); + + // with top1 null last + TableScan.Plan plan7 = + table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 1)).plan(); + List<Split> splits7 = plan7.splits(); + assertThat(splits7.size()).isEqualTo(3); + assertThat(((DataSplit) splits7.get(0)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits7.get(1)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits7.get(2)).maxValue(field.id(), field, evolutions)) + .isEqualTo(70); + + // with top5 null first + TableScan.Plan plan8 = + table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_FIRST, 5)).plan(); + List<Split> splits8 = plan8.splits(); + assertThat(splits8.size()).isEqualTo(5); + assertThat(((DataSplit) splits8.get(0)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits8.get(1)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits8.get(2)).maxValue(field.id(), field, evolutions)) + .isEqualTo(70); + assertThat(((DataSplit) splits8.get(2)).nullCount(field.id(), evolutions)).isEqualTo(2); + assertThat(((DataSplit) splits8.get(3)).maxValue(field.id(), field, evolutions)) + .isEqualTo(60); + assertThat(((DataSplit) splits8.get(3)).nullCount(field.id(), evolutions)).isEqualTo(2); + assertThat(((DataSplit) splits8.get(4)).maxValue(field.id(), field, evolutions)) + .isEqualTo(50); + + // with top5 null last + TableScan.Plan plan9 = + table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 5)).plan(); + List<Split> splits9 = plan9.splits(); + assertThat(splits9.size()).isEqualTo(7); + assertThat(((DataSplit) splits9.get(0)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits9.get(1)).maxValue(field.id(), field, evolutions)) + .isEqualTo(null); + assertThat(((DataSplit) splits9.get(2)).maxValue(field.id(), field, evolutions)) + .isEqualTo(70); + assertThat(((DataSplit) splits9.get(3)).maxValue(field.id(), field, evolutions)) + .isEqualTo(60); + assertThat(((DataSplit) splits9.get(4)).maxValue(field.id(), field, evolutions)) + .isEqualTo(50); + assertThat(((DataSplit) splits9.get(5)).maxValue(field.id(), field, evolutions)) + .isEqualTo(40); + assertThat(((DataSplit) splits9.get(6)).maxValue(field.id(), field, evolutions)) + .isEqualTo(30); + + write.close(); + commit.close(); + } + + @Test + public void testPushDownTopNSchemaEvolution() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(2, 20, 200L)); + write.write(rowData(3, 30, 300L)); + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + + // schema evolution + updateColumn("a", DataTypes.BIGINT()); + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + write.write(rowData(4, 40L, 400L)); + write.write(rowData(5, 50L, 500L)); + write.write(rowData(6, 60L, 600L)); + commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + commit.close(); + + DataField field = table.schema().fields().get(1); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + SimpleStatsEvolutions evolutions = + new SimpleStatsEvolutions( + (id) -> table.schemaManager().schema(id).fields(), table.schema().id()); + + TableScan.Plan plan1 = + table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 1)).plan(); + assertThat(plan1.splits().size()).isEqualTo(1); + assertThat(((DataSplit) plan1.splits().get(0)).maxValue(field.id(), field, evolutions)) + .isEqualTo(60L); + assertThat(((DataSplit) plan1.splits().get(0)).minValue(field.id(), field, evolutions)) + .isEqualTo(60L); + + TableScan.Plan plan2 = + table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 1)).plan(); + assertThat(plan2.splits().size()).isEqualTo(1); + assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), field, evolutions)) + .isEqualTo(10L); + assertThat(((DataSplit) plan2.splits().get(0)).minValue(field.id(), field, evolutions)) + .isEqualTo(10L); + } + + @Test + public void testPushDownTopNOnlyNull() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, null, 100L)); + write.write(rowData(2, null, 200L)); + write.write(rowData(3, null, 300L)); + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + + DataField field = table.schema().fields().get(1); + FieldRef ref = new FieldRef(field.id(), field.name(), field.type()); + SimpleStatsEvolutions evolutions = + new SimpleStatsEvolutions( + (id) -> table.schemaManager().schema(id).fields(), table.schema().id()); + + // the min/max will be null, and the null count is not null. + TableScan.Plan plan1 = + table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 1)).plan(); + assertThat(plan1.splits().size()).isEqualTo(1); + assertThat(((DataSplit) plan1.splits().get(0)).nullCount(field.id(), evolutions)) + .isEqualTo(1); + assertThat(((DataSplit) plan1.splits().get(0)).minValue(field.id(), field, evolutions)) + .isNull(); + assertThat(((DataSplit) plan1.splits().get(0)).maxValue(field.id(), field, evolutions)) + .isNull(); + + TableScan.Plan plan2 = + table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 1)).plan(); + assertThat(plan2.splits().size()).isEqualTo(1); + assertThat(((DataSplit) plan2.splits().get(0)).nullCount(field.id(), evolutions)) + .isEqualTo(1); + assertThat(((DataSplit) plan2.splits().get(0)).minValue(field.id(), field, evolutions)) + .isNull(); + assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), field, evolutions)) + .isNull(); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java index 2ed0d5c9b3..612b4ed49b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java @@ -32,6 +32,7 @@ import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.CatalogEnvironment; @@ -171,6 +172,19 @@ public abstract class ScannerTestBase { fileIO, tablePath, tableSchema, conf, CatalogEnvironment.empty()); } + protected void updateColumn(String columnName, DataType type) throws Exception { + TableSchema tableSchema = + table.schemaManager() + .commitChanges(SchemaChange.updateColumnType(columnName, type)); + table = + FileStoreTableFactory.create( + table.fileIO(), + table.location(), + tableSchema, + new Options(table.options()), + CatalogEnvironment.empty()); + } + protected List<Split> toSplits(List<DataSplit> dataSplits) { return new ArrayList<>(dataSplits); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala index ce1fe2e25f..1e3bb846cf 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala @@ -73,7 +73,13 @@ trait ColumnPruningAndPushDown extends Scan with Logging { } pushDownLimit.foreach(_readBuilder.withLimit) pushDownTopN.foreach(_readBuilder.withTopN) - _readBuilder.dropStats() + + // when TopN is not empty, we need the stats to pick the TopN DataSplits + if (pushDownTopN.nonEmpty) { + _readBuilder + } else { + _readBuilder.dropStats() + } } final def metadataColumns: Seq[PaimonMetadataColumn] = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala index c6abec1acd..fd1ee59b5a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.aggregate +import org.apache.paimon.stats.StatsUtils.minmaxAvailable import org.apache.paimon.table.Table import org.apache.paimon.table.source.DataSplit import org.apache.paimon.types._ @@ -58,21 +59,12 @@ object AggregatePushDownUtils { val dataField = getDataFieldForCol(columnName) - dataField.`type`() match { - // not push down complex type - // not push down Timestamp because INT96 sort order is undefined, - // Parquet doesn't return statistics for INT96 - // not push down Parquet Binary because min/max could be truncated - // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary - // could be Spark StringType, BinaryType or DecimalType. - // not push down for ORC with same reason. - case _: BooleanType | _: TinyIntType | _: SmallIntType | _: IntType | _: BigIntType | - _: FloatType | _: DoubleType | _: DateType => - minmaxColumns.add(columnName) - hasMinMax = true - true - case _ => - false + if (minmaxAvailable(dataField.`type`())) { + minmaxColumns.add(columnName) + hasMinMax = true + true + } else { + false } } @@ -95,15 +87,7 @@ object AggregatePushDownUtils { } if (hasMinMax) { - dataSplits.forall { - dataSplit => - dataSplit.dataFiles().asScala.forall { - dataFile => - // It means there are all column statistics when valueStatsCols == null - dataFile.valueStatsCols() == null || - minmaxColumns.forall(dataFile.valueStatsCols().contains) - } - } + dataSplits.forall(_.statsAvailable(minmaxColumns.toSet.asJava)) } else if (hasCount) { dataSplits.forall(_.mergedRowCountAvailable()) } else { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala index 1edc64e55b..27c8c00846 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala @@ -285,57 +285,68 @@ abstract class PaimonPushDownTestBase extends PaimonSparkTestBase { test("Paimon pushDown: topN for append-only tables") { assume(gteqSpark3_3) spark.sql(""" - |CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED BY (pt) + |CREATE TABLE T (pt INT, id INT, price BIGINT) PARTITIONED BY (pt) |TBLPROPERTIES ('file-index.range-bitmap.columns'='id') |""".stripMargin) Assertions.assertTrue(getScanBuilder().isInstanceOf[SupportsPushDownTopN]) spark.sql(""" |INSERT INTO T VALUES - |(1, "a1", "2023"), - |(1, "a1", "2023"), - |(3, "d1", "2023"), - |(4, "e1", "2023") + |(1, 10, 100L), + |(2, 20, 200L), + |(3, 30, 300L) |""".stripMargin) spark.sql(""" |INSERT INTO T VALUES - |(5, "a2", "2025"), - |(NULL, "b2", "2025"), - |(6, "c2", "2025"), - |(7, "d2", "2025"), - |(8, "e2", "2025") + |(4, 40, 400L), + |(5, 50, 500L) |""".stripMargin) spark.sql(""" |INSERT INTO T VALUES - |(5, "a3", "2023"), - |(9, "a3", "2023"), - |(2, "c3", "2025"), - |(NULL, "b2", "2025") + |(6, NULL, 600L), + |(6, NULL, 600L), + |(6, 60, 600L), + |(7, NULL, 700L), + |(7, NULL, 700L), + |(7, 70, 700L) + |""".stripMargin) + + // disable stats + spark.sql(""" + |ALTER TABLE T SET TBLPROPERTIES ('metadata.stats-mode' = 'none') + |""".stripMargin) + spark.sql(""" + |INSERT INTO T VALUES + |(8, 80, 800L), + |(9, 90, 900L) + |""".stripMargin) + spark.sql(""" + |ALTER TABLE T UNSET TBLPROPERTIES ('metadata.stats-mode') |""".stripMargin) // test ASC checkAnswer( - spark.sql("SELECT * FROM T ORDER BY id ASC NULLS LAST LIMIT 3"), - Row(1, "a1", "2023") :: Row(1, "a1", "2023") :: Row(2, "c3", "2025") :: Nil) + spark.sql("SELECT id FROM T ORDER BY id ASC NULLS LAST LIMIT 5"), + Row(10) :: Row(20) :: Row(30) :: Row(40) :: Row(50) :: Nil) checkAnswer( - spark.sql("SELECT * FROM T ORDER BY id ASC NULLS FIRST LIMIT 3"), - Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(1, "a1", "2023") :: Nil) + spark.sql("SELECT id FROM T ORDER BY id ASC NULLS FIRST LIMIT 5"), + Row(null) :: Row(null) :: Row(null) :: Row(null) :: Row(10) :: Nil) // test DESC checkAnswer( - spark.sql("SELECT * FROM T ORDER BY id DESC NULLS LAST LIMIT 3"), - Row(9, "a3", "2023") :: Row(8, "e2", "2025") :: Row(7, "d2", "2025") :: Nil) + spark.sql("SELECT id FROM T ORDER BY id DESC NULLS LAST LIMIT 5"), + Row(90) :: Row(80) :: Row(70) :: Row(60) :: Row(50) :: Nil) checkAnswer( - spark.sql("SELECT * FROM T ORDER BY id DESC NULLS FIRST LIMIT 3"), - Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(9, "a3", "2023") :: Nil) + spark.sql("SELECT id FROM T ORDER BY id DESC NULLS FIRST LIMIT 5"), + Row(null) :: Row(null) :: Row(null) :: Row(null) :: Row(90) :: Nil) // test with partition checkAnswer( - spark.sql("SELECT * FROM T WHERE pt='2023' ORDER BY id DESC LIMIT 3"), - Row(9, "a3", "2023") :: Row(5, "a3", "2023") :: Row(4, "e1", "2023") :: Nil) + spark.sql("SELECT id FROM T WHERE pt=6 ORDER BY id DESC LIMIT 5"), + Row(60) :: Row(null) :: Row(null) :: Nil) checkAnswer( - spark.sql("SELECT * FROM T WHERE pt='2025' ORDER BY id ASC LIMIT 3"), - Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(2, "c3", "2025") :: Nil) + spark.sql("SELECT id FROM T WHERE pt=6 ORDER BY id ASC LIMIT 3"), + Row(null) :: Row(null) :: Row(60) :: Nil) // test plan val df1 = spark.sql("SELECT * FROM T ORDER BY id DESC LIMIT 1")
