This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2597463dbf [core] Support limit pushdown to the DataFile (#6105)
2597463dbf is described below
commit 2597463dbf010bb140daf0ecb5e332c89245c1ce
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Wed Aug 20 17:44:09 2025 +0800
[core] Support limit pushdown to the DataFile (#6105)
---
.../bitmap/RangeBitmapIndexPushDownBenchmark.java | 36 ++++++++++++++
.../paimon/fileindex/bitmap/BitmapIndexResult.java | 4 ++
.../org/apache/paimon/io/FileIndexEvaluator.java | 21 ++++++---
.../paimon/io/KeyValueFileReaderFactory.java | 2 +-
.../paimon/operation/DataEvolutionSplitRead.java | 1 +
.../apache/paimon/operation/RawFileSplitRead.java | 11 ++++-
.../org/apache/paimon/operation/SplitRead.java | 4 ++
.../paimon/table/source/AppendTableRead.java | 9 ++++
.../apache/paimon/table/source/InnerTableRead.java | 4 ++
.../paimon/table/source/KeyValueTableRead.java | 11 +++++
.../paimon/table/source/ReadBuilderImpl.java | 3 ++
.../apache/paimon/utils/FormatReaderMapping.java | 18 +++++--
.../paimon/table/AppendOnlySimpleTableTest.java | 54 +++++++++++++++++++++
.../paimon/table/PrimaryKeySimpleTableTest.java | 55 ++++++++++++++++++++++
.../paimon/utils/FormatReaderMappingTest.java | 1 +
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 1 +
16 files changed, 223 insertions(+), 12 deletions(-)
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
index fa0600911a..77c88afe56 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
@@ -55,6 +55,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -95,6 +96,41 @@ public class RangeBitmapIndexPushDownBenchmark {
}
}
+ @Test
+ public void testLimitPushDown() throws Exception {
+ Random random = new Random();
+ for (int bound : BOUNDS) {
+ Table table = prepareData(bound, parquet(), "parquet_" + bound);
+ Benchmark benchmark =
+ new Benchmark("limit", ROW_COUNT)
+ .setNumWarmupIters(1)
+ .setOutputPerIteration(false);
+ int limit = random.nextInt(Math.min(bound, 1000));
+ benchmark.addCase(
+ bound + "-" + limit,
+ 1,
+ () -> {
+ List<Split> splits =
+
table.newReadBuilder().withLimit(limit).newScan().plan().splits();
+ AtomicLong readCount = new AtomicLong(0);
+ try {
+ for (Split split : splits) {
+ RecordReader<InternalRow> reader =
+ table.newReadBuilder()
+ .withLimit(limit)
+ .newRead()
+ .createReader(split);
+ reader.forEachRemaining(row ->
readCount.incrementAndGet());
+ reader.close();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ benchmark.run();
+ }
+ }
+
private Options parquet() {
Options options = new Options();
options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
index 96e6fe8284..fd4991bdf7 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java
@@ -59,6 +59,10 @@ public class BitmapIndexResult extends
LazyField<RoaringBitmap32> implements Fil
return new BitmapIndexResult(() -> RoaringBitmap32.andNot(get(),
deletion));
}
+ public FileIndexResult limit(int limit) {
+ return new BitmapIndexResult(() -> get().limit(limit));
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 457212ddcb..1b4f3d1776 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -47,26 +47,33 @@ public class FileIndexEvaluator {
TableSchema dataSchema,
List<Predicate> dataFilter,
@Nullable TopN topN,
+ @Nullable Integer limit,
DataFilePathFactory dataFilePathFactory,
DataFileMeta file,
@Nullable DeletionVector deletionVector)
throws IOException {
- if (isNullOrEmpty(dataFilter) && topN == null) {
+ if (isNullOrEmpty(dataFilter) && topN == null && limit == null) {
return FileIndexResult.REMAIN;
}
+ FileIndexResult selection =
+ new BitmapIndexResult(() -> RoaringBitmap32.bitmapOfRange(0,
file.rowCount()));
+ if (deletionVector instanceof BitmapDeletionVector) {
+ RoaringBitmap32 deletion = ((BitmapDeletionVector)
deletionVector).get();
+ selection = ((BitmapIndexResult) selection).andNot(deletion);
+ }
+
+ // for now, limit can not work with other predicates.
+ if (isNullOrEmpty(dataFilter) && topN == null && limit != null) {
+ return ((BitmapIndexResult) selection).limit(limit);
+ }
+
try (FileIndexPredicate predicate =
createFileIndexPredicate(fileIO, dataSchema,
dataFilePathFactory, file)) {
if (predicate == null) {
return FileIndexResult.REMAIN;
}
- FileIndexResult selection =
- new BitmapIndexResult(() ->
RoaringBitmap32.bitmapOfRange(0, file.rowCount()));
- if (deletionVector instanceof BitmapDeletionVector) {
- RoaringBitmap32 deletion = ((BitmapDeletionVector)
deletionVector).get();
- selection = ((BitmapIndexResult) selection).andNot(deletion);
- }
FileIndexResult result;
if (!isNullOrEmpty(dataFilter)) {
Predicate filter = PredicateBuilder.and(dataFilter.toArray(new
Predicate[0]));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 376c45b5dc..0b28bc0b4e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -272,7 +272,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
finalReadKeyType,
readValueType,
new FormatReaderMapping.Builder(
- formatDiscover, readTableFields, fieldsExtractor,
filters, null),
+ formatDiscover, readTableFields, fieldsExtractor,
filters, null, null),
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index acd0b39871..46bfb1a68b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -126,6 +126,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
readRowType.getFields(),
schema ->
rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
null,
+ null,
null);
List<List<DataFileMeta>> splitByRowId =
DataEvolutionSplitGenerator.split(files);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 1c1249978a..610a0c0a2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -83,6 +83,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
private RowType readRowType;
@Nullable private List<Predicate> filters;
@Nullable private TopN topN;
+ @Nullable private Integer limit;
public RawFileSplitRead(
FileIO fileIO,
@@ -134,6 +135,12 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
return this;
}
+ @Override
+ public SplitRead<InternalRow> withLimit(@Nullable Integer limit) {
+ this.limit = limit;
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
if (!split.beforeFiles().isEmpty()) {
@@ -172,7 +179,8 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
return schema.fields();
},
filters,
- topN);
+ topN,
+ limit);
for (DataFileMeta file : files) {
suppliers.add(
@@ -230,6 +238,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
formatReaderMapping.getDataSchema(),
formatReaderMapping.getDataFilters(),
formatReaderMapping.getTopN(),
+ formatReaderMapping.getLimit(),
dataFilePathFactory,
file,
deletionVector);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index e4a8ab5f3b..d646375ef6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -49,6 +49,10 @@ public interface SplitRead<T> {
return this;
}
+ default SplitRead<T> withLimit(@Nullable Integer limit) {
+ return this;
+ }
+
/** Create a {@link RecordReader} from split. */
RecordReader<T> createReader(DataSplit split) throws IOException;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index cdb66483f5..8feb177db5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -47,6 +47,7 @@ public final class AppendTableRead extends
AbstractDataTableRead {
@Nullable private RowType readType = null;
private Predicate predicate = null;
private TopN topN = null;
+ private Integer limit = null;
public AppendTableRead(
List<Function<SplitReadConfig, SplitReadProvider>>
providerFactories,
@@ -74,6 +75,7 @@ public final class AppendTableRead extends
AbstractDataTableRead {
}
read.withFilter(predicate);
read.withTopN(topN);
+ read.withLimit(limit);
}
@Override
@@ -96,6 +98,13 @@ public final class AppendTableRead extends
AbstractDataTableRead {
return this;
}
+ @Override
+ public InnerTableRead withLimit(int limit) {
+ initialized().forEach(r -> r.withLimit(limit));
+ this.limit = limit;
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> reader(Split split) throws IOException {
DataSplit dataSplit = (DataSplit) split;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index 1e34e911ae..b4da78ef6f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -55,6 +55,10 @@ public interface InnerTableRead extends TableRead {
return this;
}
+ default InnerTableRead withLimit(int limit) {
+ return this;
+ }
+
default InnerTableRead forceKeepDelete() {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 5df41399cb..ed67e73380 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -56,6 +56,7 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
private Predicate predicate = null;
private IOManager ioManager = null;
@Nullable private TopN topN = null;
+ @Nullable private Integer limit = null;
public KeyValueTableRead(
Supplier<MergeFileSplitRead> mergeReadSupplier,
@@ -91,6 +92,9 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
if (topN != null) {
read = read.withTopN(topN);
}
+ if (limit != null) {
+ read = read.withLimit(limit);
+ }
read.withFilter(predicate).withIOManager(ioManager);
}
@@ -121,6 +125,13 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
return this;
}
+ @Override
+ public InnerTableRead withLimit(int limit) {
+ initialized().forEach(r -> r.withLimit(limit));
+ this.limit = limit;
+ return this;
+ }
+
@Override
public TableRead withIOManager(IOManager ioManager) {
initialized().forEach(r -> r.withIOManager(ioManager));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 5d529aa41d..e57a7b6382 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -214,6 +214,9 @@ public class ReadBuilderImpl implements ReadBuilder {
if (topN != null) {
read.withTopN(topN);
}
+ if (limit != null) {
+ read.withLimit(limit);
+ }
return read;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 4f867b5ac1..590abe1f7c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -64,6 +64,7 @@ public class FormatReaderMapping {
private final List<Predicate> dataFilters;
private final Map<String, Integer> systemFields;
@Nullable private final TopN topN;
+ @Nullable private final Integer limit;
public FormatReaderMapping(
@Nullable int[] indexMapping,
@@ -74,7 +75,8 @@ public class FormatReaderMapping {
TableSchema dataSchema,
List<Predicate> dataFilters,
Map<String, Integer> systemFields,
- @Nullable TopN topN) {
+ @Nullable TopN topN,
+ @Nullable Integer limit) {
this.indexMapping = combine(indexMapping, trimmedKeyMapping);
this.castMapping = castMapping;
this.readerFactory = readerFactory;
@@ -83,6 +85,7 @@ public class FormatReaderMapping {
this.dataFilters = dataFilters;
this.systemFields = systemFields;
this.topN = topN;
+ this.limit = limit;
}
private int[] combine(@Nullable int[] indexMapping, @Nullable int[]
trimmedKeyMapping) {
@@ -141,6 +144,11 @@ public class FormatReaderMapping {
return topN;
}
+ @Nullable
+ public Integer getLimit() {
+ return limit;
+ }
+
/** Builder for {@link FormatReaderMapping}. */
public static class Builder {
@@ -149,18 +157,21 @@ public class FormatReaderMapping {
private final Function<TableSchema, List<DataField>> fieldsExtractor;
@Nullable private final List<Predicate> filters;
@Nullable private final TopN topN;
+ @Nullable private final Integer limit;
public Builder(
FileFormatDiscover formatDiscover,
List<DataField> readFields,
Function<TableSchema, List<DataField>> fieldsExtractor,
@Nullable List<Predicate> filters,
- @Nullable TopN topN) {
+ @Nullable TopN topN,
+ @Nullable Integer limit) {
this.formatDiscover = formatDiscover;
this.readFields = readFields;
this.fieldsExtractor = fieldsExtractor;
this.filters = filters;
this.topN = topN;
+ this.limit = limit;
}
/**
@@ -223,7 +234,8 @@ public class FormatReaderMapping {
dataSchema,
readFilters,
systemFields,
- evolutionTopN(tableSchema, dataSchema));
+ evolutionTopN(tableSchema, dataSchema),
+ limit);
}
@Nullable
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 024e6320b8..f8056cc492 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.FieldRef;
@@ -68,6 +69,7 @@ import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.commons.math3.random.RandomDataGenerator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -96,6 +98,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -946,6 +949,57 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
}
}
+ @Test
+ public void testLimitPushDown() throws Exception {
+ RowType rowType = RowType.builder().field("id",
DataTypes.INT()).build();
+ Consumer<Options> configure =
+ options -> {
+ options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+ options.set(WRITE_ONLY, true);
+ options.set(SOURCE_SPLIT_TARGET_SIZE,
MemorySize.ofBytes(1));
+ options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
+
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+ options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ };
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table = createUnawareBucketFileStoreTable(rowType,
configure);
+
+ int rowCount = 10000;
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ for (int i = 0; i < rowCount; i++) {
+ write.write(GenericRow.of(i));
+ }
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ for (int i = 0; i < rowCount; i++) {
+ write.write(GenericRow.of(i));
+ }
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ for (int i = 0; i < rowCount; i++) {
+ write.write(GenericRow.of(i));
+ }
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.close();
+ commit.close();
+
+ // test limit push down
+ {
+ int limit = new RandomDataGenerator().nextInt(1, 1000);
+ TableScan.Plan plan = table.newScan().withLimit(limit).plan();
+ assertThat(plan.splits()).hasSize(1);
+
+ RecordReader<InternalRow> reader =
+
table.newRead().withLimit(limit).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ reader.forEachRemaining(row -> cnt.incrementAndGet());
+ assertThat(cnt.get()).isEqualTo(limit);
+ reader.close();
+ }
+ }
+
@Test
public void testWithShardAppendTable() throws Exception {
FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET,
-1));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 1e4b8b185b..d4f1484230 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -86,6 +86,7 @@ import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.commons.math3.random.RandomDataGenerator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -1303,6 +1304,60 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
}
}
+ @Test
+ public void testLimitPushDownInDeletionVectorMode() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 2);
+ conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ conf.set(SOURCE_SPLIT_TARGET_SIZE,
MemorySize.ofBytes(1));
+ conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
+
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+ conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ });
+
+ int rowCount = 10000;
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // append
+ for (int i = 0; i < rowCount; i++) {
+ write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i));
+ }
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // delete [0, 2000]
+ for (int i = 0; i < 2000; i++) {
+ write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+ }
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // delete (rowCount - 2000, rowCount)
+ for (int i = rowCount - 2000; i < rowCount; i++) {
+ write.write(rowDataWithKind(RowKind.DELETE, 1, i, (long) i));
+ }
+ commit.commit(2, write.prepareCommit(true, 2));
+ write.close();
+ commit.close();
+
+ // test limit push down
+ {
+ int limit = new RandomDataGenerator().nextInt(1, 1000);
+ TableScan.Plan plan = table.newScan().withLimit(limit).plan();
+ assertThat(plan.splits()).hasSize(1);
+
+ RecordReader<InternalRow> reader =
+
table.newRead().withLimit(limit).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ reader.forEachRemaining(row -> cnt.incrementAndGet());
+ assertThat(cnt.get()).isEqualTo(limit);
+ reader.close();
+ }
+ }
+
@Test
public void testWithShardFirstRow() throws Exception {
FileStoreTable table =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
index af7fba5401..7b2ad9898d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
@@ -136,6 +136,7 @@ public class FormatReaderMappingTest {
null,
null,
Collections.emptyMap(),
+ null,
null);
Assertions.assertThat(formatReaderMapping.getIndexMapping())
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 9541db60e8..1edc64e55b 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -224,6 +224,7 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
sql("INSERT INTO T SELECT id FROM range (1, 50000)")
sql("DELETE FROM T WHERE id % 13 = 0")
+ Assertions.assertEquals(100, spark.sql("SELECT * FROM T LIMIT
100").count())
val withoutLimit =
getScanBuilder().build().asInstanceOf[PaimonScan].getOriginSplits
assert(withoutLimit.length == 10)