This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 186efa0207 [core] Fix paimon_incremental_query with limit push down
(#7269)
186efa0207 is described below
commit 186efa0207459728d4f6137f2ec924d728f22704
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Feb 12 06:35:16 2026 +0800
[core] Fix paimon_incremental_query with limit push down (#7269)
---
.../paimon/table/source/DataTableBatchScan.java | 17 +++++++++++++++--
.../paimon/spark/sql/TableValuedFunctionsTest.scala | 21 +++++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 457229d7ad..19f55ae3c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -137,7 +137,14 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
long scannedRowCount = 0;
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
- List<DataSplit> splits = plan.dataSplits();
+ List<Split> planSplits = plan.splits();
+ // Limit pushdown only supports DataSplit. Skip for IncrementalSplit.
+ if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
+ return Optional.of(result);
+ }
+ @SuppressWarnings("unchecked")
+ List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
+
LOG.info("Applying limit pushdown. Original splits count: {}",
splits.size());
if (splits.isEmpty()) {
return Optional.of(result);
@@ -193,7 +200,13 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
}
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
- List<DataSplit> splits = plan.dataSplits();
+ List<Split> planSplits = plan.splits();
+ // TopN pushdown only supports DataSplit. Skip for IncrementalSplit.
+ if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
+ return Optional.of(result);
+ }
+ @SuppressWarnings("unchecked")
+ List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
if (splits.isEmpty()) {
return Optional.of(result);
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index b3012e2f90..68f97743d3 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -344,6 +344,27 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
}
}
+ test("incremental query by tag with LIMIT") {
+ sql("use paimon")
+ withTable("t") {
+ spark.sql("""
+ |CREATE TABLE t (a INT, b INT, c STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key'='a,b', 'bucket' = '2')
+ |PARTITIONED BY (a)
+ |""".stripMargin)
+ spark.sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2')")
+ sql("CALL sys.create_tag('t', 'tag1')")
+ spark.sql("INSERT INTO t VALUES (1, 3, '3'), (2, 4, '4')")
+ sql("CALL sys.create_tag('t', 'tag2')")
+
+ checkAnswer(
+ spark.sql(
+ "SELECT * FROM paimon_incremental_query('t', 'tag1', 'tag2') ORDER
BY a, b LIMIT 5"),
+ Seq(Row(1, 3, "3"), Row(2, 4, "4")))
+ }
+ }
+
private def incrementalDF(tableIdent: String, start: Int, end: Int):
DataFrame = {
spark.read
.format("paimon")