This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e2d02d26c8 [core] Support TopN pushdown to pick the DataSplits (#6143)
e2d02d26c8 is described below
commit e2d02d26c84abddd2053282fe49cf3bdfeaac8da
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Wed Aug 27 17:01:52 2025 +0800
[core] Support TopN pushdown to pick the DataSplits (#6143)
---
.../bitmap/RangeBitmapIndexPushDownBenchmark.java | 4 +-
.../java/org/apache/paimon/predicate/TopN.java | 9 +
.../java/org/apache/paimon/utils/ListUtils.java | 3 +-
.../rangebitmap/RangeBitmapFileIndexTest.java | 46 +---
.../apache/paimon/stats/SimpleStatsEvolution.java | 26 ++
.../java/org/apache/paimon/stats/StatsUtils.java | 51 ++++
.../paimon/table/AbstractFileStoreTable.java | 1 +
.../paimon/table/FallbackReadFileStoreTable.java | 8 +
.../paimon/table/source/AbstractDataTableScan.java | 2 +-
.../org/apache/paimon/table/source/DataSplit.java | 40 +++
.../paimon/table/source/DataTableBatchScan.java | 40 ++-
.../apache/paimon/table/source/InnerTableScan.java | 5 +
.../paimon/table/source/ReadBuilderImpl.java | 3 +
.../table/source/TopNDataSplitEvaluator.java | 267 ++++++++++++++++++++
.../paimon/table/system/ReadOptimizedTable.java | 1 +
.../paimon/table/AppendOnlySimpleTableTest.java | 31 +--
.../paimon/table/PrimaryKeySimpleTableTest.java | 20 +-
.../org/apache/paimon/table/source/SplitTest.java | 34 +++
.../apache/paimon/table/source/TableScanTest.java | 278 +++++++++++++++++++++
.../table/source/snapshot/ScannerTestBase.java | 14 ++
.../paimon/spark/ColumnPruningAndPushDown.scala | 8 +-
.../spark/aggregate/AggregatePushDownUtils.scala | 32 +--
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 63 +++--
23 files changed, 855 insertions(+), 131 deletions(-)
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
index 77c88afe56..c286b8727d 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
@@ -32,7 +32,6 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
@@ -213,8 +212,7 @@ public class RangeBitmapIndexPushDownBenchmark {
() -> {
Table table = tables.get(name);
FieldRef ref = new FieldRef(0, "k", DataTypes.INT());
- SortValue sort = new SortValue(ref, DESCENDING,
NULLS_LAST);
- TopN topN = new TopN(Collections.singletonList(sort),
k);
+ TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
List<Split> splits =
table.newReadBuilder().newScan().plan().splits();
AtomicLong readCount = new AtomicLong(0);
try {
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
index 2ecb38cce3..e66de2c560 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
@@ -18,9 +18,12 @@
package org.apache.paimon.predicate;
+import org.apache.paimon.predicate.SortValue.NullOrdering;
+import org.apache.paimon.predicate.SortValue.SortDirection;
import org.apache.paimon.utils.Preconditions;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -37,6 +40,12 @@ public class TopN implements Serializable {
this.limit = limit;
}
+ public TopN(FieldRef ref, SortDirection direction, NullOrdering
nullOrdering, int limit) {
+ SortValue order = new SortValue(ref, direction, nullOrdering);
+ this.orders = Collections.singletonList(order);
+ this.limit = limit;
+ }
+
public List<SortValue> orders() {
return orders;
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
index e870b68e1d..a93f9d016c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.utils;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -32,7 +33,7 @@ public class ListUtils {
return list.get(index);
}
- public static <T> boolean isNullOrEmpty(List<T> list) {
+ public static <T> boolean isNullOrEmpty(Collection<T> list) {
return list == null || list.isEmpty();
}
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
index 904f24826f..90656c2081 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.fs.ByteArraySeekableStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.VarCharType;
@@ -36,7 +35,6 @@ import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -283,11 +281,7 @@ public class RangeBitmapFileIndexTest {
RoaringBitmap32 expected = new RoaringBitmap32();
// test NULL_LAST without found set
- TopN topN =
- new TopN(
- Collections.singletonList(
- new SortValue(fieldRef, ASCENDING,
NULLS_LAST)),
- k);
+ TopN topN = new TopN(fieldRef, ASCENDING, NULLS_LAST, k);
pairs.stream()
.sorted(nullLastCompactor)
.limit(k)
@@ -298,11 +292,7 @@ public class RangeBitmapFileIndexTest {
// test NULL_LAST with found set
expected.clear();
- topN =
- new TopN(
- Collections.singletonList(
- new SortValue(fieldRef, ASCENDING,
NULLS_LAST)),
- k);
+ topN = new TopN(fieldRef, ASCENDING, NULLS_LAST, k);
pairs.stream()
.filter(pair -> foundSet.contains(pair.getKey()))
.sorted(nullLastCompactor)
@@ -317,11 +307,7 @@ public class RangeBitmapFileIndexTest {
// test NULL_FIRST without found set
expected.clear();
- topN =
- new TopN(
- Collections.singletonList(
- new SortValue(fieldRef, ASCENDING,
NULLS_FIRST)),
- k);
+ topN = new TopN(fieldRef, ASCENDING, NULLS_FIRST, k);
pairs.stream()
.sorted(nullFirstCompactor)
.limit(k)
@@ -332,11 +318,7 @@ public class RangeBitmapFileIndexTest {
// test NULL_FIRST with found set
expected.clear();
- topN =
- new TopN(
- Collections.singletonList(
- new SortValue(fieldRef, ASCENDING,
NULLS_FIRST)),
- k);
+ topN = new TopN(fieldRef, ASCENDING, NULLS_FIRST, k);
pairs.stream()
.filter(pair -> foundSet.contains(pair.getKey()))
.sorted(nullFirstCompactor)
@@ -406,38 +388,26 @@ public class RangeBitmapFileIndexTest {
// test bottomK
BitmapIndexResult result =
new BitmapIndexResult(() -> RoaringBitmap32.bitmapOf(0, 3, 4,
5));
- TopN bottomNullFirst =
- new TopN(
- Collections.singletonList(new SortValue(fieldRef,
ASCENDING, NULLS_FIRST)),
- 3);
+ TopN bottomNullFirst = new TopN(fieldRef, ASCENDING, NULLS_FIRST, 3);
assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst,
null)).get())
.isEqualTo(RoaringBitmap32.bitmapOf(0, 5, 6));
assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst,
result)).get())
.isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 5));
- TopN bottomNullLast =
- new TopN(
- Collections.singletonList(new SortValue(fieldRef,
ASCENDING, NULLS_LAST)),
- 3);
+ TopN bottomNullLast = new TopN(fieldRef, ASCENDING, NULLS_LAST, 3);
assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast,
null)).get())
.isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2));
assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast,
result)).get())
.isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4));
// test topK
- TopN topNullFirst =
- new TopN(
- Collections.singletonList(new SortValue(fieldRef,
DESCENDING, NULLS_FIRST)),
- 3);
+ TopN topNullFirst = new TopN(fieldRef, DESCENDING, NULLS_FIRST, 3);
assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst,
null)).get())
.isEqualTo(RoaringBitmap32.bitmapOf(5, 6, 7));
assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst,
result)).get())
.isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5));
- TopN topNullLast =
- new TopN(
- Collections.singletonList(new SortValue(fieldRef,
DESCENDING, NULLS_LAST)),
- 3);
+ TopN topNullLast = new TopN(fieldRef, DESCENDING, NULLS_LAST, 3);
assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast,
null)).get())
.isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 7));
assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast,
result)).get())
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
index b1c7cfebee..ad9f4f4b22 100644
---
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
+++
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
@@ -88,6 +88,32 @@ public class SimpleStatsEvolution {
return result;
}
+ public InternalArray evolution(
+ InternalArray array, Long rowCount, @Nullable List<String>
denseFields) {
+ InternalArray nullCounts = array;
+
+ if (denseFields != null && denseFields.isEmpty()) {
+ // optimize for empty dense fields
+ nullCounts = emptyNullCounts;
+ } else if (denseFields != null) {
+ int[] denseIndexMapping =
+ indexMappings.computeIfAbsent(
+ denseFields,
+ k ->
fieldNames.stream().mapToInt(denseFields::indexOf).toArray());
+ nullCounts =
ProjectedArray.from(denseIndexMapping).replaceArray(nullCounts);
+ }
+
+ if (indexMapping != null) {
+ if (rowCount == null) {
+ throw new RuntimeException("Schema Evolution for stats needs
row count.");
+ }
+
+ nullCounts = new NullCountsEvoArray(indexMapping, nullCounts,
rowCount);
+ }
+
+ return nullCounts;
+ }
+
public Result evolution(
SimpleStats stats, @Nullable Long rowCount, @Nullable List<String>
denseFields) {
InternalRow minValues = stats.minValues();
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java
new file mode 100644
index 0000000000..ec52cda0d9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TinyIntType;
+
+/** Utils for Stats. */
+public class StatsUtils {
+
+ public static boolean minmaxAvailable(DataType type) {
+ // not push down complex type
+ // not push down Timestamp because INT96 sort order is undefined,
+ // Parquet doesn't return statistics for INT96
+ // not push down Parquet Binary because min/max could be truncated
+ // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
+ // could be Spark StringType, BinaryType or DecimalType.
+ // not push down for ORC with same reason.
+ return type instanceof BooleanType
+ || type instanceof TinyIntType
+ || type instanceof SmallIntType
+ || type instanceof IntType
+ || type instanceof BigIntType
+ || type instanceof FloatType
+ || type instanceof DoubleType
+ || type instanceof DateType;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 6757c8ef38..db0ca5e5ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -268,6 +268,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
public DataTableBatchScan newScan() {
return new DataTableBatchScan(
tableSchema,
+ schemaManager(),
coreOptions(),
newSnapshotReader(),
catalogEnvironment.tableQueryAuth(coreOptions()));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f53cfaa985..11bc764a57 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataFilePlan;
@@ -366,6 +367,13 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return this;
}
+ @Override
+ public InnerTableScan withTopN(TopN topN) {
+ mainScan.withTopN(topN);
+ fallbackScan.withTopN(topN);
+ return this;
+ }
+
@Override
public InnerTableScan dropStats() {
mainScan.dropStats();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index b019f5dbda..67acb44066 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -78,7 +78,7 @@ abstract class AbstractDataTableScan implements DataTableScan
{
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDataTableScan.class);
- private final TableSchema schema;
+ protected final TableSchema schema;
private final CoreOptions options;
protected final SnapshotReader snapshotReader;
private final TableQueryAuth queryAuth;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 1feb72afa3..276cf5fe50 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
@@ -34,6 +35,7 @@ import org.apache.paimon.predicate.CompareUtils;
import org.apache.paimon.stats.SimpleStatsEvolution;
import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.SerializationUtils;
@@ -44,13 +46,16 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
+import static org.apache.paimon.utils.ListUtils.isNullOrEmpty;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -153,6 +158,21 @@ public class DataSplit implements Split {
return partialMergedRowCount();
}
+ public boolean statsAvailable(Set<String> columns) {
+ if (isNullOrEmpty(columns)) {
+ return false;
+ }
+
+ return dataFiles.stream()
+ .map(DataFileMeta::valueStatsCols)
+ .allMatch(
+ valueStatsCols ->
+ // It means there are all column statistics
when valueStatsCols ==
+ // null
+ valueStatsCols == null
+ || new
HashSet<>(valueStatsCols).containsAll(columns));
+ }
+
public Object minValue(int fieldIndex, DataField dataField,
SimpleStatsEvolutions evolutions) {
Object minValue = null;
for (DataFileMeta dataFile : dataFiles) {
@@ -191,6 +211,26 @@ public class DataSplit implements Split {
return maxValue;
}
+ public Long nullCount(int fieldIndex, SimpleStatsEvolutions evolutions) {
+ Long sum = null;
+ for (DataFileMeta dataFile : dataFiles) {
+ SimpleStatsEvolution evolution =
evolutions.getOrCreate(dataFile.schemaId());
+ InternalArray nullCounts =
+ evolution.evolution(
+ dataFile.valueStats().nullCounts(),
+ dataFile.rowCount(),
+ dataFile.valueStatsCols());
+ Long nullCount =
+ (Long) InternalRowUtils.get(nullCounts, fieldIndex,
DataTypes.BIGINT());
+ if (sum == null) {
+ sum = nullCount;
+ } else if (nullCount != null) {
+ sum += nullCount;
+ }
+ }
+ return sum;
+ }
+
/**
* Obtain merged row count as much as possible. There are two scenarios
where accurate row count
* can be calculated:
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index d7f02c06ad..ec8eb8ee0b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -37,16 +39,20 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
private boolean hasNext;
private Integer pushDownLimit;
+ private TopN topN;
+
+ private final SchemaManager schemaManager;
public DataTableBatchScan(
TableSchema schema,
+ SchemaManager schemaManager,
CoreOptions options,
SnapshotReader snapshotReader,
TableQueryAuth queryAuth) {
super(schema, options, snapshotReader, queryAuth);
this.hasNext = true;
-
+ this.schemaManager = schemaManager;
if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
if (options.toConfiguration()
.get(CoreOptions.BATCH_SCAN_MODE)
@@ -71,6 +77,12 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
return this;
}
+ @Override
+ public InnerTableScan withTopN(TopN topN) {
+ this.topN = topN;
+ return this;
+ }
+
@Override
public TableScan.Plan plan() {
authQuery();
@@ -82,8 +94,9 @@ public class DataTableBatchScan extends AbstractDataTableScan
{
if (hasNext) {
hasNext = false;
StartingScanner.Result result =
startingScanner.scan(snapshotReader);
- StartingScanner.Result limitedResult = applyPushDownLimit(result);
- return DataFilePlan.fromResult(limitedResult);
+ result = applyPushDownLimit(result);
+ result = applyPushDownTopN(result);
+ return DataFilePlan.fromResult(result);
} else {
throw new EndOfScanException();
}
@@ -123,6 +136,27 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
return result;
}
+ private StartingScanner.Result applyPushDownTopN(StartingScanner.Result
result) {
+ if (topN == null
+ || pushDownLimit != null
+ || !(result instanceof ScannedResult)
+ || !schema.primaryKeys().isEmpty()
+ || options().deletionVectorsEnabled()) {
+ return result;
+ }
+
+ SnapshotReader.Plan plan = ((ScannedResult) result).plan();
+ List<DataSplit> splits = plan.dataSplits();
+ if (splits.isEmpty()) {
+ return result;
+ }
+
+ TopNDataSplitEvaluator evaluator = new TopNDataSplitEvaluator(schema,
schemaManager);
+ List<Split> topNSplits = new ArrayList<>(evaluator.evaluate(topN,
splits));
+ SnapshotReader.Plan newPlan = new PlanImpl(plan.watermark(),
plan.snapshotId(), topNSplits);
+ return new ScannedResult(newPlan);
+ }
+
@Override
public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 89f15055d5..d0015445b3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
@@ -77,6 +78,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withTopN(TopN topN) {
+ return this;
+ }
+
default InnerTableScan dropStats() {
// do nothing, should implement this if need
return this;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index e57a7b6382..662ac19858 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -167,6 +167,9 @@ public class ReadBuilderImpl implements ReadBuilder {
if (limit != null) {
tableScan.withLimit(limit);
}
+ if (topN != null) {
+ tableScan.withTopN(topN);
+ }
return tableScan;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
new file mode 100644
index 0000000000..87c6b23cfd
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.predicate.CompareUtils;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
+import static org.apache.paimon.stats.StatsUtils.minmaxAvailable;
+
+/** Evaluate DataSplit TopN result. */
+public class TopNDataSplitEvaluator {
+
+ private final Map<Long, TableSchema> tableSchemas;
+ private final TableSchema schema;
+ private final SchemaManager schemaManager;
+
+ public TopNDataSplitEvaluator(TableSchema schema, SchemaManager
schemaManager) {
+ this.tableSchemas = new HashMap<>();
+ this.schema = schema;
+ this.schemaManager = schemaManager;
+ }
+
+ public List<DataSplit> evaluate(TopN topN, List<DataSplit> splits) {
+ // todo: we can support all the sort columns.
+ List<SortValue> orders = topN.orders();
+ if (orders.size() != 1) {
+ return splits;
+ }
+
+ int limit = topN.limit();
+ if (limit >= splits.size()) {
+ return splits;
+ }
+
+ SortValue order = orders.get(0);
+ DataType type = order.field().type();
+ if (!minmaxAvailable(type)) {
+ return splits;
+ }
+
+ return getTopNSplits(order, limit, splits);
+ }
+
+ private List<DataSplit> getTopNSplits(SortValue order, int limit,
List<DataSplit> splits) {
+ FieldRef ref = order.field();
+ SortValue.SortDirection direction = order.direction();
+ SortValue.NullOrdering nullOrdering = order.nullOrdering();
+
+ int index = ref.index();
+ DataField field = schema.fields().get(index);
+ SimpleStatsEvolutions evolutions =
+ new SimpleStatsEvolutions((id) ->
scanTableSchema(id).fields(), schema.id());
+
+ // extract the stats
+ List<DataSplit> results = new ArrayList<>();
+ List<Pair<Stats, DataSplit>> pairs = new ArrayList<>();
+ for (DataSplit split : splits) {
+ if (!split.rawConvertible()) {
+ return splits;
+ }
+
+ Set<String> cols = Collections.singleton(field.name());
+ if (!split.statsAvailable(cols)) {
+ results.add(split);
+ continue;
+ }
+
+ Object min = split.minValue(index, field, evolutions);
+ Object max = split.maxValue(index, field, evolutions);
+ Long nullCount = split.nullCount(index, evolutions);
+ Stats stats = new Stats(min, max, nullCount);
+ pairs.add(Pair.of(stats, split));
+ }
+
+ // pick the TopN splits
+ if (NULLS_FIRST.equals(nullOrdering)) {
+ results.addAll(pickNullFirstSplits(pairs, ref, direction, limit));
+ } else if (NULLS_LAST.equals(nullOrdering)) {
+ results.addAll(pickNullLastSplits(pairs, ref, direction, limit));
+ } else {
+ return splits;
+ }
+
+ return results;
+ }
+
+ private List<DataSplit> pickNullFirstSplits(
+ List<Pair<Stats, DataSplit>> pairs,
+ FieldRef field,
+ SortValue.SortDirection direction,
+ int limit) {
+ Comparator<Pair<Stats, DataSplit>> comparator;
+ if (ASCENDING.equals(direction)) {
+ comparator =
+ (x, y) -> {
+ Stats left = x.getKey();
+ Stats right = y.getKey();
+ int result = nullsFirst(left.nullCount,
right.nullCount);
+ if (result == 0) {
+ result = asc(field, left.min, right.min);
+ }
+ return result;
+ };
+ } else if (DESCENDING.equals(direction)) {
+ comparator =
+ (x, y) -> {
+ Stats left = x.getKey();
+ Stats right = y.getKey();
+ int result = nullsFirst(left.nullCount,
right.nullCount);
+ if (result == 0) {
+ result = desc(field, left.max, right.max);
+ }
+ return result;
+ };
+ } else {
+ return
pairs.stream().map(Pair::getValue).collect(Collectors.toList());
+ }
+ pairs.sort(comparator);
+
+ long scanned = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (Pair<Stats, DataSplit> pair : pairs) {
+ Stats stats = pair.getKey();
+ DataSplit split = pair.getValue();
+ splits.add(split);
+ scanned += Math.max(stats.nullCount, 1);
+ if (scanned >= limit) {
+ break;
+ }
+ }
+ return splits;
+ }
+
+ private List<DataSplit> pickNullLastSplits(
+ List<Pair<Stats, DataSplit>> pairs,
+ FieldRef field,
+ SortValue.SortDirection direction,
+ int limit) {
+ Comparator<Pair<Stats, DataSplit>> comparator;
+ if (ASCENDING.equals(direction)) {
+ comparator =
+ (x, y) -> {
+ Stats left = x.getKey();
+ Stats right = y.getKey();
+ int result = asc(field, left.min, right.min);
+ if (result == 0) {
+ result = nullsLast(left.nullCount,
right.nullCount);
+ }
+ return result;
+ };
+ } else if (DESCENDING.equals(direction)) {
+ comparator =
+ (x, y) -> {
+ Stats left = x.getKey();
+ Stats right = y.getKey();
+ int result = desc(field, left.max, right.max);
+ if (result == 0) {
+ result = nullsLast(left.nullCount,
right.nullCount);
+ }
+ return result;
+ };
+ } else {
+ return
pairs.stream().map(Pair::getValue).collect(Collectors.toList());
+ }
+
+ return pairs.stream()
+ .sorted(comparator)
+ .map(Pair::getValue)
+ .limit(limit)
+ .collect(Collectors.toList());
+ }
+
+ private int nullsFirst(Long left, Long right) {
+ if (left == null) {
+ return -1;
+ } else if (right == null) {
+ return 1;
+ } else {
+ return -Long.compare(left, right);
+ }
+ }
+
+ private int nullsLast(Long left, Long right) {
+ if (left == null) {
+ return -1;
+ } else if (right == null) {
+ return 1;
+ } else {
+ return Long.compare(left, right);
+ }
+ }
+
+ private int asc(FieldRef field, Object left, Object right) {
+ if (left == null) {
+ return -1;
+ } else if (right == null) {
+ return 1;
+ } else {
+ return CompareUtils.compareLiteral(field.type(), left, right);
+ }
+ }
+
+ private int desc(FieldRef field, Object left, Object right) {
+ if (left == null) {
+ return -1;
+ } else if (right == null) {
+ return 1;
+ } else {
+ return -CompareUtils.compareLiteral(field.type(), left, right);
+ }
+ }
+
+ private TableSchema scanTableSchema(long id) {
+ return tableSchemas.computeIfAbsent(
+ id, key -> key == schema.id() ? schema :
schemaManager.schema(id));
+ }
+
+ /** The DataSplit's stats. */
+ private static class Stats {
+ Object min;
+ Object max;
+ Long nullCount;
+
+ public Stats(Object min, Object max, Long nullCount) {
+ this.min = min;
+ this.max = max;
+ this.nullCount = nullCount;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 0094313238..87d79bc537 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -149,6 +149,7 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
CoreOptions options = wrapped.coreOptions();
return new DataTableBatchScan(
wrapped.schema(),
+ schemaManager(),
options,
newSnapshotReader(wrapped),
wrapped.catalogEnvironment().tableQueryAuth(options));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index f8056cc492..cf246b13b5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -43,7 +43,6 @@ import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
@@ -101,6 +100,8 @@ import static
org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -882,13 +883,9 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
RoaringBitmap32 bitmap = new RoaringBitmap32();
expected.forEach(bitmap::add);
DataField field = rowType.getField("price");
- SortValue sort =
- new SortValue(
- new FieldRef(field.id(), field.name(),
field.type()),
- SortValue.SortDirection.DESCENDING,
- SortValue.NullOrdering.NULLS_LAST);
- TopN topN = new TopN(Collections.singletonList(sort), k);
- TableScan.Plan plan = table.newScan().plan();
+ FieldRef ref = new FieldRef(field.id(), field.name(),
field.type());
+ TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
+ TableScan.Plan plan = table.newScan().withTopN(topN).plan();
RecordReader<InternalRow> reader =
table.newRead().withTopN(topN).createReader(plan.splits());
AtomicInteger cnt = new AtomicInteger(0);
@@ -906,13 +903,9 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
// test TopK without index
{
DataField field = rowType.getField("id");
- SortValue sort =
- new SortValue(
- new FieldRef(field.id(), field.name(),
field.type()),
- SortValue.SortDirection.DESCENDING,
- SortValue.NullOrdering.NULLS_LAST);
- TopN topN = new TopN(Collections.singletonList(sort), k);
- TableScan.Plan plan = table.newScan().plan();
+ FieldRef ref = new FieldRef(field.id(), field.name(),
field.type());
+ TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
+ TableScan.Plan plan = table.newScan().withTopN(topN).plan();
RecordReader<InternalRow> reader =
table.newRead().withTopN(topN).createReader(plan.splits());
AtomicInteger cnt = new AtomicInteger(0);
@@ -933,12 +926,8 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
.build();
table = createUnawareBucketFileStoreTable(rowType, configure);
DataField field = rowType.getField("price");
- SortValue sort =
- new SortValue(
- new FieldRef(field.id(), field.name(),
field.type()),
- SortValue.SortDirection.DESCENDING,
- SortValue.NullOrdering.NULLS_LAST);
- TopN topN = new TopN(Collections.singletonList(sort), k);
+ FieldRef ref = new FieldRef(field.id(), field.name(),
field.type());
+ TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead().withTopN(topN).createReader(plan.splits());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index d4f1484230..ec0675a53a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -44,7 +44,6 @@ import org.apache.paimon.postpone.PostponeBucketWriter;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
@@ -142,6 +141,9 @@ import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.predicate.PredicateBuilder.and;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -1256,12 +1258,8 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
int k = new Random().nextInt(100);
RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(min, min +
k);
DataField field =
table.schema().nameToFieldMap().get(indexColumnName);
- SortValue sort =
- new SortValue(
- new FieldRef(field.id(), field.name(),
field.type()),
- SortValue.SortDirection.ASCENDING,
- SortValue.NullOrdering.NULLS_LAST);
- TopN topN = new TopN(Collections.singletonList(sort), k);
+ FieldRef ref = new FieldRef(field.id(), field.name(),
field.type());
+ TopN topN = new TopN(ref, ASCENDING, NULLS_LAST, k);
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead().withTopN(topN).createReader(plan.splits());
@@ -1282,12 +1280,8 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
int k = new Random().nextInt(100);
RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(max - k,
max);
DataField field =
table.schema().nameToFieldMap().get(indexColumnName);
- SortValue sort =
- new SortValue(
- new FieldRef(field.id(), field.name(),
field.type()),
- SortValue.SortDirection.DESCENDING,
- SortValue.NullOrdering.NULLS_LAST);
- TopN topN = new TopN(Collections.singletonList(sort), k);
+ FieldRef ref = new FieldRef(field.id(), field.name(),
field.type());
+ TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead().withTopN(topN).createReader(plan.splits());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 6eb11583f4..4d6d842f75 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -160,6 +160,40 @@ public class SplitTest {
assertThat(split2.maxValue(3, floatField, evolutions)).isEqualTo(null);
}
+ @Test
+ public void testSplitNullCount() {
+ Map<Long, List<DataField>> schemas = new HashMap<>();
+ DataField intField = new DataField(0, "c_int", new IntType());
+ DataField longField = new DataField(1, "c_long", new BigIntType());
+ schemas.put(1L, Arrays.asList(intField, longField));
+
+ // test common
+ BinaryRow min1 = newBinaryRow(new Object[] {10, 123L});
+ BinaryRow max1 = newBinaryRow(new Object[] {99, 456L});
+ SimpleStats valueStats1 = new SimpleStats(min1, max1,
fromLongArray(new Long[] {5L, 1L}));
+
+ BinaryRow min2 = newBinaryRow(new Object[] {5, 0L});
+ BinaryRow max2 = newBinaryRow(new Object[] {90, 789L});
+ SimpleStats valueStats2 = new SimpleStats(min2, max2,
fromLongArray(new Long[] {3L, 2L}));
+
+ DataFileMeta d1 = newDataFile(100, valueStats1, null);
+ DataFileMeta d2 = newDataFile(100, valueStats2, null);
+ DataSplit split1 = newDataSplit(true, Arrays.asList(d1, d2), null);
+
+ SimpleStatsEvolutions evolutions = new
SimpleStatsEvolutions(schemas::get, 1);
+ assertThat(split1.nullCount(0, evolutions)).isEqualTo(8);
+ assertThat(split1.nullCount(1, evolutions)).isEqualTo(3);
+
+ // test schema evolution
+ DataField doubleField = new DataField(2, "c_double", new DoubleType());
+ schemas.put(2L, Arrays.asList(intField, longField, doubleField));
+ evolutions = new SimpleStatsEvolutions(schemas::get, 2);
+
+ assertThat(split1.nullCount(0, evolutions)).isEqualTo(8);
+ assertThat(split1.nullCount(1, evolutions)).isEqualTo(3);
+ assertThat(split1.nullCount(2, evolutions)).isEqualTo(200);
+ }
+
@Test
public void testSerializer() throws IOException {
DataFileTestDataGenerator gen =
DataFileTestDataGenerator.builder().build();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
index 5f565339f9..0fd381f918 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
@@ -18,16 +18,28 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.snapshot.ScannerTestBase;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -98,4 +110,270 @@ public class TableScanTest extends ScannerTestBase {
write.close();
commit.close();
}
+
+ @Test
+ public void testPushDownTopN() throws Exception {
+ createAppendOnlyTable();
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(3, 30, 300L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(4, 40, 400L));
+ write.write(rowData(5, 50, 500L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowData(6, null, 600L));
+ write.write(rowData(6, null, 600L));
+ write.write(rowData(6, 60, 600L));
+ write.write(rowData(7, null, 700L));
+ write.write(rowData(7, null, 700L));
+ write.write(rowData(7, 70, 700L));
+ commit.commit(2, write.prepareCommit(true, 2));
+ write.close();
+ commit.close();
+
+ // disable stats
+ FileStoreTable disableStatsTable =
+ table.copy(Collections.singletonMap("metadata.stats-mode",
"none"));
+ write = disableStatsTable.newWrite(commitUser);
+ commit = disableStatsTable.newCommit(commitUser);
+
+ write.write(rowData(8, 80, 800L));
+ write.write(rowData(9, 90, 900L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ // no TopN pushed down
+ TableScan.Plan plan1 = table.newScan().plan();
+ assertThat(plan1.splits().size()).isEqualTo(9);
+
+ DataField field = table.schema().fields().get(1);
+ FieldRef ref = new FieldRef(field.id(), field.name(), field.type());
+ SimpleStatsEvolutions evolutions =
+ new SimpleStatsEvolutions(
+ (id) -> table.schemaManager().schema(id).fields(),
table.schema().id());
+
+ // with bottom1 null first
+ TableScan.Plan plan2 =
+ table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST,
1)).plan();
+ List<Split> splits2 = plan2.splits();
+ assertThat(splits2.size()).isEqualTo(3);
+ assertThat(((DataSplit) splits2.get(0)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits2.get(1)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits2.get(2)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(60);
+ assertThat(((DataSplit) splits2.get(2)).nullCount(field.id(),
evolutions)).isEqualTo(2);
+
+ // with bottom1 null last
+ TableScan.Plan plan3 =
+ table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_LAST,
1)).plan();
+ List<Split> splits3 = plan3.splits();
+ assertThat(splits3.size()).isEqualTo(3);
+ assertThat(((DataSplit) splits3.get(0)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits3.get(1)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits3.get(2)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(10);
+
+ // with bottom5 null first
+ TableScan.Plan plan4 =
+ table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST,
5)).plan();
+ List<Split> splits4 = plan4.splits();
+ assertThat(splits4.size()).isEqualTo(5);
+ assertThat(((DataSplit) splits4.get(0)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits4.get(1)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits4.get(2)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(60);
+ assertThat(((DataSplit) splits4.get(2)).nullCount(field.id(),
evolutions)).isEqualTo(2);
+ assertThat(((DataSplit) splits4.get(3)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(70);
+ assertThat(((DataSplit) splits4.get(3)).nullCount(field.id(),
evolutions)).isEqualTo(2);
+ assertThat(((DataSplit) splits4.get(4)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(10);
+
+ // with bottom5 null last
+ TableScan.Plan plan5 =
+ table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_LAST,
5)).plan();
+ List<Split> splits5 = plan5.splits();
+ assertThat(splits5.size()).isEqualTo(7);
+ assertThat(((DataSplit) splits5.get(0)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits5.get(1)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits5.get(2)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(10);
+ assertThat(((DataSplit) splits5.get(3)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(20);
+ assertThat(((DataSplit) splits5.get(4)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(30);
+ assertThat(((DataSplit) splits5.get(5)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(40);
+ assertThat(((DataSplit) splits5.get(6)).minValue(field.id(), field,
evolutions))
+ .isEqualTo(50);
+
+ // with top1 null first
+ TableScan.Plan plan6 =
+ table.newScan().withTopN(new TopN(ref, DESCENDING,
NULLS_FIRST, 1)).plan();
+ List<Split> splits6 = plan6.splits();
+ assertThat(splits6.size()).isEqualTo(3);
+ assertThat(((DataSplit) splits6.get(0)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits6.get(1)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits6.get(2)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(70);
+ assertThat(((DataSplit) splits6.get(2)).nullCount(field.id(),
evolutions)).isEqualTo(2);
+
+ // with top1 null last
+ TableScan.Plan plan7 =
+ table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST,
1)).plan();
+ List<Split> splits7 = plan7.splits();
+ assertThat(splits7.size()).isEqualTo(3);
+ assertThat(((DataSplit) splits7.get(0)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits7.get(1)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits7.get(2)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(70);
+
+ // with top5 null first
+ TableScan.Plan plan8 =
+ table.newScan().withTopN(new TopN(ref, DESCENDING,
NULLS_FIRST, 5)).plan();
+ List<Split> splits8 = plan8.splits();
+ assertThat(splits8.size()).isEqualTo(5);
+ assertThat(((DataSplit) splits8.get(0)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits8.get(1)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits8.get(2)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(70);
+ assertThat(((DataSplit) splits8.get(2)).nullCount(field.id(),
evolutions)).isEqualTo(2);
+ assertThat(((DataSplit) splits8.get(3)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(60);
+ assertThat(((DataSplit) splits8.get(3)).nullCount(field.id(),
evolutions)).isEqualTo(2);
+ assertThat(((DataSplit) splits8.get(4)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(50);
+
+ // with top5 null last
+ TableScan.Plan plan9 =
+ table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST,
5)).plan();
+ List<Split> splits9 = plan9.splits();
+ assertThat(splits9.size()).isEqualTo(7);
+ assertThat(((DataSplit) splits9.get(0)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits9.get(1)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(null);
+ assertThat(((DataSplit) splits9.get(2)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(70);
+ assertThat(((DataSplit) splits9.get(3)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(60);
+ assertThat(((DataSplit) splits9.get(4)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(50);
+ assertThat(((DataSplit) splits9.get(5)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(40);
+ assertThat(((DataSplit) splits9.get(6)).maxValue(field.id(), field,
evolutions))
+ .isEqualTo(30);
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testPushDownTopNSchemaEvolution() throws Exception {
+ createAppendOnlyTable();
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(3, 30, 300L));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+
+ // schema evolution
+ updateColumn("a", DataTypes.BIGINT());
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(rowData(4, 40L, 400L));
+ write.write(rowData(5, 50L, 500L));
+ write.write(rowData(6, 60L, 600L));
+ commit.commit(1, write.prepareCommit(true, 1));
+ write.close();
+ commit.close();
+
+ DataField field = table.schema().fields().get(1);
+ FieldRef ref = new FieldRef(field.id(), field.name(), field.type());
+ SimpleStatsEvolutions evolutions =
+ new SimpleStatsEvolutions(
+ (id) -> table.schemaManager().schema(id).fields(),
table.schema().id());
+
+ TableScan.Plan plan1 =
+ table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST,
1)).plan();
+ assertThat(plan1.splits().size()).isEqualTo(1);
+ assertThat(((DataSplit) plan1.splits().get(0)).maxValue(field.id(),
field, evolutions))
+ .isEqualTo(60L);
+ assertThat(((DataSplit) plan1.splits().get(0)).minValue(field.id(),
field, evolutions))
+ .isEqualTo(60L);
+
+ TableScan.Plan plan2 =
+ table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST,
1)).plan();
+ assertThat(plan2.splits().size()).isEqualTo(1);
+ assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(),
field, evolutions))
+ .isEqualTo(10L);
+ assertThat(((DataSplit) plan2.splits().get(0)).minValue(field.id(),
field, evolutions))
+ .isEqualTo(10L);
+ }
+
+ @Test
+ public void testPushDownTopNOnlyNull() throws Exception {
+ createAppendOnlyTable();
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, null, 100L));
+ write.write(rowData(2, null, 200L));
+ write.write(rowData(3, null, 300L));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+
+ DataField field = table.schema().fields().get(1);
+ FieldRef ref = new FieldRef(field.id(), field.name(), field.type());
+ SimpleStatsEvolutions evolutions =
+ new SimpleStatsEvolutions(
+ (id) -> table.schemaManager().schema(id).fields(),
table.schema().id());
+
+ // the min/max will be null, and the null count is not null.
+ TableScan.Plan plan1 =
+ table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST,
1)).plan();
+ assertThat(plan1.splits().size()).isEqualTo(1);
+ assertThat(((DataSplit) plan1.splits().get(0)).nullCount(field.id(),
evolutions))
+ .isEqualTo(1);
+ assertThat(((DataSplit) plan1.splits().get(0)).minValue(field.id(),
field, evolutions))
+ .isNull();
+ assertThat(((DataSplit) plan1.splits().get(0)).maxValue(field.id(),
field, evolutions))
+ .isNull();
+
+ TableScan.Plan plan2 =
+ table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST,
1)).plan();
+ assertThat(plan2.splits().size()).isEqualTo(1);
+ assertThat(((DataSplit) plan2.splits().get(0)).nullCount(field.id(),
evolutions))
+ .isEqualTo(1);
+ assertThat(((DataSplit) plan2.splits().get(0)).minValue(field.id(),
field, evolutions))
+ .isNull();
+ assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(),
field, evolutions))
+ .isNull();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index 2ed0d5c9b3..612b4ed49b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -32,6 +32,7 @@ import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
@@ -171,6 +172,19 @@ public abstract class ScannerTestBase {
fileIO, tablePath, tableSchema, conf,
CatalogEnvironment.empty());
}
+ protected void updateColumn(String columnName, DataType type) throws
Exception {
+ TableSchema tableSchema =
+ table.schemaManager()
+
.commitChanges(SchemaChange.updateColumnType(columnName, type));
+ table =
+ FileStoreTableFactory.create(
+ table.fileIO(),
+ table.location(),
+ tableSchema,
+ new Options(table.options()),
+ CatalogEnvironment.empty());
+ }
+
protected List<Split> toSplits(List<DataSplit> dataSplits) {
return new ArrayList<>(dataSplits);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index ce1fe2e25f..1e3bb846cf 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -73,7 +73,13 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
}
pushDownLimit.foreach(_readBuilder.withLimit)
pushDownTopN.foreach(_readBuilder.withTopN)
- _readBuilder.dropStats()
+
+ // when TopN is not empty, we need the stats to pick the TopN DataSplits
+ if (pushDownTopN.nonEmpty) {
+ _readBuilder
+ } else {
+ _readBuilder.dropStats()
+ }
}
final def metadataColumns: Seq[PaimonMetadataColumn] = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
index c6abec1acd..fd1ee59b5a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.aggregate
+import org.apache.paimon.stats.StatsUtils.minmaxAvailable
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types._
@@ -58,21 +59,12 @@ object AggregatePushDownUtils {
val dataField = getDataFieldForCol(columnName)
- dataField.`type`() match {
- // not push down complex type
- // not push down Timestamp because INT96 sort order is undefined,
- // Parquet doesn't return statistics for INT96
- // not push down Parquet Binary because min/max could be truncated
- // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
- // could be Spark StringType, BinaryType or DecimalType.
- // not push down for ORC with same reason.
- case _: BooleanType | _: TinyIntType | _: SmallIntType | _: IntType |
_: BigIntType |
- _: FloatType | _: DoubleType | _: DateType =>
- minmaxColumns.add(columnName)
- hasMinMax = true
- true
- case _ =>
- false
+ if (minmaxAvailable(dataField.`type`())) {
+ minmaxColumns.add(columnName)
+ hasMinMax = true
+ true
+ } else {
+ false
}
}
@@ -95,15 +87,7 @@ object AggregatePushDownUtils {
}
if (hasMinMax) {
- dataSplits.forall {
- dataSplit =>
- dataSplit.dataFiles().asScala.forall {
- dataFile =>
- // It means there are all column statistics when valueStatsCols
== null
- dataFile.valueStatsCols() == null ||
- minmaxColumns.forall(dataFile.valueStatsCols().contains)
- }
- }
+ dataSplits.forall(_.statsAvailable(minmaxColumns.toSet.asJava))
} else if (hasCount) {
dataSplits.forall(_.mergedRowCountAvailable())
} else {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 1edc64e55b..27c8c00846 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -285,57 +285,68 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
test("Paimon pushDown: topN for append-only tables") {
assume(gteqSpark3_3)
spark.sql("""
- |CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED
BY (pt)
+ |CREATE TABLE T (pt INT, id INT, price BIGINT) PARTITIONED BY
(pt)
|TBLPROPERTIES ('file-index.range-bitmap.columns'='id')
|""".stripMargin)
Assertions.assertTrue(getScanBuilder().isInstanceOf[SupportsPushDownTopN])
spark.sql("""
|INSERT INTO T VALUES
- |(1, "a1", "2023"),
- |(1, "a1", "2023"),
- |(3, "d1", "2023"),
- |(4, "e1", "2023")
+ |(1, 10, 100L),
+ |(2, 20, 200L),
+ |(3, 30, 300L)
|""".stripMargin)
spark.sql("""
|INSERT INTO T VALUES
- |(5, "a2", "2025"),
- |(NULL, "b2", "2025"),
- |(6, "c2", "2025"),
- |(7, "d2", "2025"),
- |(8, "e2", "2025")
+ |(4, 40, 400L),
+ |(5, 50, 500L)
|""".stripMargin)
spark.sql("""
|INSERT INTO T VALUES
- |(5, "a3", "2023"),
- |(9, "a3", "2023"),
- |(2, "c3", "2025"),
- |(NULL, "b2", "2025")
+ |(6, NULL, 600L),
+ |(6, NULL, 600L),
+ |(6, 60, 600L),
+ |(7, NULL, 700L),
+ |(7, NULL, 700L),
+ |(7, 70, 700L)
+ |""".stripMargin)
+
+ // disable stats
+ spark.sql("""
+ |ALTER TABLE T SET TBLPROPERTIES ('metadata.stats-mode' =
'none')
+ |""".stripMargin)
+ spark.sql("""
+ |INSERT INTO T VALUES
+ |(8, 80, 800L),
+ |(9, 90, 900L)
+ |""".stripMargin)
+ spark.sql("""
+ |ALTER TABLE T UNSET TBLPROPERTIES ('metadata.stats-mode')
|""".stripMargin)
// test ASC
checkAnswer(
- spark.sql("SELECT * FROM T ORDER BY id ASC NULLS LAST LIMIT 3"),
- Row(1, "a1", "2023") :: Row(1, "a1", "2023") :: Row(2, "c3", "2025") ::
Nil)
+ spark.sql("SELECT id FROM T ORDER BY id ASC NULLS LAST LIMIT 5"),
+ Row(10) :: Row(20) :: Row(30) :: Row(40) :: Row(50) :: Nil)
checkAnswer(
- spark.sql("SELECT * FROM T ORDER BY id ASC NULLS FIRST LIMIT 3"),
- Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(1, "a1",
"2023") :: Nil)
+ spark.sql("SELECT id FROM T ORDER BY id ASC NULLS FIRST LIMIT 5"),
+ Row(null) :: Row(null) :: Row(null) :: Row(null) :: Row(10) :: Nil)
// test DESC
checkAnswer(
- spark.sql("SELECT * FROM T ORDER BY id DESC NULLS LAST LIMIT 3"),
- Row(9, "a3", "2023") :: Row(8, "e2", "2025") :: Row(7, "d2", "2025") ::
Nil)
+ spark.sql("SELECT id FROM T ORDER BY id DESC NULLS LAST LIMIT 5"),
+ Row(90) :: Row(80) :: Row(70) :: Row(60) :: Row(50) :: Nil)
checkAnswer(
- spark.sql("SELECT * FROM T ORDER BY id DESC NULLS FIRST LIMIT 3"),
- Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(9, "a3",
"2023") :: Nil)
+ spark.sql("SELECT id FROM T ORDER BY id DESC NULLS FIRST LIMIT 5"),
+ Row(null) :: Row(null) :: Row(null) :: Row(null) :: Row(90) :: Nil)
// test with partition
checkAnswer(
- spark.sql("SELECT * FROM T WHERE pt='2023' ORDER BY id DESC LIMIT 3"),
- Row(9, "a3", "2023") :: Row(5, "a3", "2023") :: Row(4, "e1", "2023") ::
Nil)
+ spark.sql("SELECT id FROM T WHERE pt=6 ORDER BY id DESC LIMIT 5"),
+ Row(60) :: Row(null) :: Row(null) :: Nil)
checkAnswer(
- spark.sql("SELECT * FROM T WHERE pt='2025' ORDER BY id ASC LIMIT 3"),
- Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(2, "c3",
"2025") :: Nil)
+ spark.sql("SELECT id FROM T WHERE pt=6 ORDER BY id ASC LIMIT 3"),
+ Row(null) :: Row(null) :: Row(60) :: Nil)
// test plan
val df1 = spark.sql("SELECT * FROM T ORDER BY id DESC LIMIT 1")