This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6646071840 [core] Support TopN pushdown in deletion-vector mode (#6097)
6646071840 is described below
commit 6646071840b53c822a88ef680ad5c0722f6f15b3
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Wed Aug 20 12:27:34 2025 +0800
[core] Support TopN pushdown in deletion-vector mode (#6097)
---
.../paimon/fileindex/bitmap/BitmapIndexResult.java | 17 ++++
.../org/apache/paimon/utils/RoaringBitmap32.java | 4 +
.../org/apache/paimon/AppendOnlyFileStore.java | 3 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
.../org/apache/paimon/io/FileIndexEvaluator.java | 100 +++++++++++++--------
.../apache/paimon/operation/RawFileSplitRead.java | 29 ++----
.../paimon/table/source/KeyValueTableRead.java | 12 +++
.../paimon/table/AppendOnlySimpleTableTest.java | 33 -------
.../paimon/table/PrimaryKeySimpleTableTest.java | 100 +++++++++++++++++++++
.../flink/source/TestChangelogDataReadWrite.java | 1 -
.../apache/paimon/spark/PaimonScanBuilder.scala | 13 +--
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 85 ++++++++++++++++++
12 files changed, 291 insertions(+), 109 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
index a1fc6ca551..96e6fe8284 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.RoaringBitmap32;
+import java.util.Objects;
import java.util.function.Supplier;
/** bitmap file index result. */
@@ -53,4 +54,20 @@ public class BitmapIndexResult extends
LazyField<RoaringBitmap32> implements Fil
}
return FileIndexResult.super.or(fileIndexResult);
}
+
+ public FileIndexResult andNot(RoaringBitmap32 deletion) {
+ return new BitmapIndexResult(() -> RoaringBitmap32.andNot(get(),
deletion));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BitmapIndexResult that = (BitmapIndexResult) o;
+ return Objects.equals(this.get(), that.get());
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 8c3b2802ac..b37eb837cf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -175,6 +175,10 @@ public class RoaringBitmap32 {
return roaringBitmap32;
}
+ public static RoaringBitmap32 bitmapOfRange(long min, long max) {
+ return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max));
+ }
+
public static RoaringBitmap32 and(final RoaringBitmap32 x1, final
RoaringBitmap32 x2) {
return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap,
x2.roaringBitmap));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index d7fa9eb66b..0d9ad3b121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -83,8 +83,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
FileFormatDiscover.of(options),
pathFactory(),
options.fileIndexReadEnabled(),
- options.rowTrackingEnabled(),
- options.deletionVectorsEnabled());
+ options.rowTrackingEnabled());
}
public DataEvolutionSplitRead newDataEvolutionRead() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 48d0bf872b..f7a743ace1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -132,8 +132,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
FileFormatDiscover.of(options),
pathFactory(),
options.fileIndexReadEnabled(),
- false,
- options.deletionVectorsEnabled());
+ false);
}
public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index c71af3f2bc..bf04b453af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -18,19 +18,24 @@
package org.apache.paimon.io;
+import org.apache.paimon.deletionvectors.BitmapDeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.ListUtils;
+import org.apache.paimon.utils.RoaringBitmap32;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/** Evaluate file index result. */
@@ -42,55 +47,76 @@ public class FileIndexEvaluator {
List<Predicate> dataFilter,
@Nullable TopN topN,
DataFilePathFactory dataFilePathFactory,
- DataFileMeta file)
+ DataFileMeta file,
+ @Nullable DeletionVector deletionVector)
throws IOException {
- FileIndexResult result = FileIndexResult.REMAIN;
if (ListUtils.isNullOrEmpty(dataFilter) && topN == null) {
- return result;
+ return FileIndexResult.REMAIN;
+ }
+
+ FileIndexResult selection =
+ new BitmapIndexResult(() -> RoaringBitmap32.bitmapOfRange(0,
file.rowCount()));
+ if (deletionVector instanceof BitmapDeletionVector) {
+ RoaringBitmap32 deletion = ((BitmapDeletionVector)
deletionVector).get();
+ selection = ((BitmapIndexResult) selection).andNot(deletion);
}
- FileIndexPredicate predicate = null;
- try {
- byte[] embeddedIndex = file.embeddedIndex();
- if (embeddedIndex != null) {
- predicate = new FileIndexPredicate(embeddedIndex,
dataSchema.logicalRowType());
- } else {
- List<String> indexFiles =
- file.extraFiles().stream()
- .filter(
- name ->
- name.endsWith(
-
DataFilePathFactory.INDEX_PATH_SUFFIX))
- .collect(Collectors.toList());
- if (indexFiles.isEmpty()) {
- return result;
+ try (FileIndexPredicate predicate =
+ createFileIndexPredicate(fileIO, dataSchema,
dataFilePathFactory, file)) {
+ FileIndexResult result = FileIndexResult.REMAIN;
+ if (predicate != null) {
+ if (!ListUtils.isNullOrEmpty(dataFilter)) {
+ Predicate filter =
PredicateBuilder.and(dataFilter.toArray(new Predicate[0]));
+ result = predicate.evaluate(filter);
+ result.and(selection);
+ } else if (topN != null) {
+ result = predicate.evaluateTopN(topN, selection);
}
- if (indexFiles.size() > 1) {
- throw new RuntimeException(
- "Found more than one index file for one data file:
"
- + String.join(" and ", indexFiles));
+
+ // if all position selected, or if only and not the deletion
+ // the effect will not obvious, just return REMAIN.
+ if (Objects.equals(result, selection)) {
+ result = FileIndexResult.REMAIN;
}
- predicate =
- new FileIndexPredicate(
-
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
- fileIO,
- dataSchema.logicalRowType());
}
- // evaluate
- if (!ListUtils.isNullOrEmpty(dataFilter)) {
- result =
- predicate.evaluate(
- PredicateBuilder.and(dataFilter.toArray(new
Predicate[0])));
- } else if (topN != null) {
- result = predicate.evaluateTopN(topN, result);
+ if (!result.remain()) {
+ result = FileIndexResult.SKIP;
}
return result;
- } finally {
- if (predicate != null) {
- predicate.close();
+ }
+ }
+
+ private static FileIndexPredicate createFileIndexPredicate(
+ FileIO fileIO,
+ TableSchema dataSchema,
+ DataFilePathFactory dataFilePathFactory,
+ DataFileMeta file)
+ throws IOException {
+ FileIndexPredicate predicate;
+ byte[] embeddedIndex = file.embeddedIndex();
+ if (embeddedIndex != null) {
+ predicate = new FileIndexPredicate(embeddedIndex,
dataSchema.logicalRowType());
+ } else {
+ List<String> indexFiles =
+ file.extraFiles().stream()
+ .filter(name ->
name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+ if (indexFiles.isEmpty()) {
+ return null;
+ }
+ if (indexFiles.size() > 1) {
+ throw new RuntimeException(
+ "Found more than one index file for one data file: "
+ + String.join(" and ", indexFiles));
}
+ predicate =
+ new FileIndexPredicate(
+
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
+ fileIO,
+ dataSchema.logicalRowType());
}
+ return predicate;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 27211a26bd..1c1249978a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -21,7 +21,6 @@ package org.apache.paimon.operation;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
-import org.apache.paimon.deletionvectors.BitmapDeletionVector;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fileindex.FileIndexResult;
@@ -77,7 +76,6 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
- private final boolean deletionVectorsEnabled;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final boolean fileIndexReadEnabled;
private final boolean rowTrackingEnabled;
@@ -94,14 +92,12 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
boolean fileIndexReadEnabled,
- boolean rowTrackingEnabled,
- boolean deletionVectorsEnabled) {
+ boolean rowTrackingEnabled) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
- this.deletionVectorsEnabled = deletionVectorsEnabled;
this.formatReaderMappings = new HashMap<>();
this.fileIndexReadEnabled = fileIndexReadEnabled;
this.rowTrackingEnabled = rowTrackingEnabled;
@@ -134,9 +130,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
@Override
public SplitRead<InternalRow> withTopN(@Nullable TopN topN) {
- if (!deletionVectorsEnabled) {
- this.topN = topN;
- }
+ this.topN = topN;
return this;
}
@@ -228,6 +222,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
IOExceptionSupplier<DeletionVector> dvFactory)
throws IOException {
FileIndexResult fileIndexResult = null;
+ DeletionVector deletionVector = dvFactory == null ? null :
dvFactory.get();
if (fileIndexReadEnabled) {
fileIndexResult =
FileIndexEvaluator.evaluate(
@@ -236,7 +231,8 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
formatReaderMapping.getDataFilters(),
formatReaderMapping.getTopN(),
dataFilePathFactory,
- file);
+ file,
+ deletionVector);
if (!fileIndexResult.remain()) {
return new EmptyFileRecordReader<>();
}
@@ -247,21 +243,6 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
selection = ((BitmapIndexResult) fileIndexResult).get();
}
- RoaringBitmap32 deletion = null;
- DeletionVector deletionVector = dvFactory == null ? null :
dvFactory.get();
- if (deletionVector instanceof BitmapDeletionVector) {
- deletion = ((BitmapDeletionVector) deletionVector).get();
- }
-
- if (selection != null) {
- if (deletion != null) {
- selection = RoaringBitmap32.andNot(selection, deletion);
- }
- if (selection.isEmpty()) {
- return new EmptyFileRecordReader<>();
- }
- }
-
FormatReaderContext formatReaderContext =
new FormatReaderContext(
fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), selection);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index c4bede61d8..5df41399cb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -26,6 +26,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import
org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider;
@@ -54,6 +55,7 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
private boolean forceKeepDelete = false;
private Predicate predicate = null;
private IOManager ioManager = null;
+ @Nullable private TopN topN = null;
public KeyValueTableRead(
Supplier<MergeFileSplitRead> mergeReadSupplier,
@@ -86,6 +88,9 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
if (readType != null) {
read = read.withReadType(readType);
}
+ if (topN != null) {
+ read = read.withTopN(topN);
+ }
read.withFilter(predicate).withIOManager(ioManager);
}
@@ -109,6 +114,13 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
return this;
}
+ @Override
+ public InnerTableRead withTopN(TopN topN) {
+ initialized().forEach(r -> r.withTopN(topN));
+ this.topN = topN;
+ return this;
+ }
+
@Override
public TableRead withIOManager(IOManager ioManager) {
initialized().forEach(r -> r.withIOManager(ioManager));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index c262f892a8..024e6320b8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -92,7 +92,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
-import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
@@ -945,38 +944,6 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
assertThat(cnt.get()).isEqualTo(rowCount);
reader.close();
}
-
- // test should not push topN with dv modes
- {
- table.schemaManager()
- .commitChanges(SchemaChange.updateColumnType("price",
DataTypes.INT()));
- rowType =
- RowType.builder()
- .field("id", DataTypes.STRING())
- .field("event", DataTypes.STRING())
- .field("price", DataTypes.INT())
- .build();
- Consumer<Options> newConfigure =
- options -> {
- configure.accept(options);
- options.set(DELETION_VECTORS_ENABLED, true);
- };
- table = createUnawareBucketFileStoreTable(rowType, newConfigure);
- DataField field = rowType.getField("price");
- SortValue sort =
- new SortValue(
- new FieldRef(field.id(), field.name(),
field.type()),
- SortValue.SortDirection.DESCENDING,
- SortValue.NullOrdering.NULLS_LAST);
- TopN topN = new TopN(Collections.singletonList(sort), k);
- TableScan.Plan plan = table.newScan().plan();
- RecordReader<InternalRow> reader =
- table.newRead().withTopN(topN).createReader(plan.splits());
- AtomicInteger cnt = new AtomicInteger(0);
- reader.forEachRemaining(row -> cnt.incrementAndGet());
- assertThat(cnt.get()).isEqualTo(rowCount);
- reader.close();
- }
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index c02d617dbe..1e4b8b185b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -41,8 +41,11 @@ import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
import org.apache.paimon.postpone.PostponeBucketWriter;
+import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -72,12 +75,14 @@ import
org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.system.AuditLogTable;
import org.apache.paimon.table.system.FileMonitorTable;
import org.apache.paimon.table.system.ReadOptimizedTable;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -1203,6 +1208,101 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
}
}
+ @Test
+ public void testTopNPushDownInDeletionVectorMode() throws Exception {
+ String indexColumnName = "b";
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 1);
+ conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
+
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+ conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ conf.set("file-index.range-bitmap.columns",
indexColumnName);
+ });
+
+ int rowCount = 1000000;
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // append
+ for (int i = 0; i < rowCount; i++) {
+ write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i));
+ }
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // delete [0, 20000]
+ int min = 20000;
+ for (int i = 0; i < 20000; i++) {
+ write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+ }
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // delete (rowCount - 20000, rowCount)
+ int max = rowCount - 20000;
+ for (int i = rowCount - 20000; i < rowCount; i++) {
+ write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+ }
+ commit.commit(2, write.prepareCommit(true, 2));
+ write.close();
+ commit.close();
+
+ // test bottom k
+ {
+ int k = new Random().nextInt(100);
+ RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(min, min +
k);
+ DataField field =
table.schema().nameToFieldMap().get(indexColumnName);
+ SortValue sort =
+ new SortValue(
+ new FieldRef(field.id(), field.name(),
field.type()),
+ SortValue.SortDirection.ASCENDING,
+ SortValue.NullOrdering.NULLS_LAST);
+ TopN topN = new TopN(Collections.singletonList(sort), k);
+ TableScan.Plan plan = table.newScan().plan();
+ RecordReader<InternalRow> reader =
+ table.newRead().withTopN(topN).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ RoaringBitmap32 actual = new RoaringBitmap32();
+ reader.forEachRemaining(
+ row -> {
+ cnt.incrementAndGet();
+ actual.add((int) row.getLong(2));
+ });
+ assertThat(cnt.get()).isEqualTo(k);
+ assertThat(actual).isEqualTo(bitmap);
+ reader.close();
+ }
+
+ // test top k
+ {
+ int k = new Random().nextInt(100);
+ RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(max - k,
max);
+ DataField field =
table.schema().nameToFieldMap().get(indexColumnName);
+ SortValue sort =
+ new SortValue(
+ new FieldRef(field.id(), field.name(),
field.type()),
+ SortValue.SortDirection.DESCENDING,
+ SortValue.NullOrdering.NULLS_LAST);
+ TopN topN = new TopN(Collections.singletonList(sort), k);
+ TableScan.Plan plan = table.newScan().plan();
+ RecordReader<InternalRow> reader =
+ table.newRead().withTopN(topN).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ RoaringBitmap32 actual = new RoaringBitmap32();
+ reader.forEachRemaining(
+ row -> {
+ cnt.incrementAndGet();
+ actual.add((int) row.getLong(2));
+ });
+ assertThat(cnt.get()).isEqualTo(k);
+ assertThat(actual).isEqualTo(bitmap);
+ reader.close();
+ }
+ }
+
@Test
public void testWithShardFirstRow() throws Exception {
FileStoreTable table =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index cb6ef1c40b..9847276fe8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -147,7 +147,6 @@ public class TestChangelogDataReadWrite {
FileFormatDiscover.of(options),
pathFactory,
options.fileIndexReadEnabled(),
- false,
false);
return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 729613f596..aa50f76043 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -18,7 +18,6 @@
package org.apache.paimon.spark
-import org.apache.paimon.CoreOptions
import org.apache.paimon.predicate._
import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
import org.apache.paimon.spark.aggregate.{AggregatePushDownUtils,
LocalAggregator}
@@ -102,12 +101,7 @@ class PaimonScanBuilder(table: InnerTable)
return false
}
- if (!table.isInstanceOf[AppendOnlyFileStoreTable]) {
- return false
- }
-
- val coreOptions = CoreOptions.fromMap(table.options())
- if (coreOptions.deletionVectorsEnabled()) {
+ if (!table.isInstanceOf[FileStoreTable]) {
return false
}
@@ -115,7 +109,6 @@ class PaimonScanBuilder(table: InnerTable)
return false
}
- val order = orders(0)
val fieldName = orders.head.expression() match {
case nr: NamedReference => nr.fieldNames.mkString(".")
case _ => return false
@@ -129,13 +122,13 @@ class PaimonScanBuilder(table: InnerTable)
val field = rowType.getField(fieldName)
val ref = new FieldRef(field.id(), field.name(), field.`type`())
- val nullOrdering = order.nullOrdering() match {
+ val nullOrdering = orders.head.nullOrdering() match {
case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
case expressions.NullOrdering.NULLS_FIRST => NullOrdering.NULLS_FIRST
case _ => return false
}
- val direction = order.direction() match {
+ val direction = orders.head.direction() match {
case expressions.SortDirection.DESCENDING => SortDirection.DESCENDING
case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING
case _ => return false
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 35190bac51..9541db60e8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -343,6 +343,91 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
Assertions.assertTrue(qe1.optimizedPlan.containsPattern(LIMIT))
}
+ test("Paimon pushDown: topN for primary-key tables with deletion vector") {
+ assume(gteqSpark3_3)
+ withTable("dv_test") {
+ spark.sql("""
+ |CREATE TABLE dv_test (id INT, c1 INT, c2 STRING)
TBLPROPERTIES (
+ |'primary-key'='id',
+ |'deletion-vectors.enabled' = 'true',
+ |'file-index.range-bitmap.columns'='c1'
+ |)
+ |""".stripMargin)
+
+ spark.sql(
+ "insert into table dv_test values(1, 1, 'a'),(2, 2,'b'),(3, 3,
'c'),(4, 4, 'd'),(5, 5, 'e'),(6, NULL, 'f')")
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+ Row(6, null, "f") :: Row(1, 1, "a") :: Row(2, 2, "b") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+ Row(1, 1, "a") :: Row(2, 2, "b") :: Row(3, 3, "c") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT
3"),
+ Row(6, null, "f") :: Row(5, 5, "e") :: Row(4, 4, "d") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+ Row(5, 5, "e") :: Row(4, 4, "d") :: Row(3, 3, "c") :: Nil)
+
+ spark.sql("delete from dv_test where id IN (1, 5)")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+ Row(6, null, "f") :: Row(2, 2, "b") :: Row(3, 3, "c") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+ Row(2, 2, "b") :: Row(3, 3, "c") :: Row(4, 4, "d") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT
3"),
+ Row(6, null, "f") :: Row(4, 4, "d") :: Row(3, 3, "c") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+ Row(4, 4, "d") :: Row(3, 3, "c") :: Row(2, 2, "b") :: Nil)
+ }
+ }
+
+ test("Paimon pushDown: topN for append-only tables with deletion vector") {
+ assume(gteqSpark3_3)
+ withTable("dv_test") {
+ spark.sql("""
+ |CREATE TABLE dv_test (c1 INT, c2 STRING) TBLPROPERTIES (
+ |'deletion-vectors.enabled' = 'true',
+ |'file-index.range-bitmap.columns'='c1'
+ |)
+ |""".stripMargin)
+
+ spark.sql(
+ "insert into table dv_test values(1, 'a'),(2, 'b'),(3, 'c'),(4,
'd'),(5, 'e'),(NULL, 'f')")
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+ Row(null, "f") :: Row(1, "a") :: Row(2, "b") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+ Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT
3"),
+ Row(null, "f") :: Row(5, "e") :: Row(4, "d") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+ Row(5, "e") :: Row(4, "d") :: Row(3, "c") :: Nil)
+
+ spark.sql("delete from dv_test where c1 IN (1, 5)")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS FIRST LIMIT 3"),
+ Row(null, "f") :: Row(2, "b") :: Row(3, "c") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 ASC NULLS LAST LIMIT 3"),
+ Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS FIRST LIMIT
3"),
+ Row(null, "f") :: Row(4, "d") :: Row(3, "c") :: Nil)
+ checkAnswer(
+ spark.sql("SELECT * FROM dv_test ORDER BY c1 DESC NULLS LAST LIMIT 3"),
+ Row(4, "d") :: Row(3, "c") :: Row(2, "b") :: Nil)
+ }
+ }
+
test(s"Paimon pushdown: parquet in-filter") {
withTable("T") {
spark.sql(s"""