This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fd6786f72 [core] Fix Limit push down sort splits violates its general 
contract. (#4491)
fd6786f72 is described below

commit fd6786f7270bf45b099f5012dc17ad50b87f1c1a
Author: HunterXHunter <[email protected]>
AuthorDate: Sun Nov 10 23:17:04 2024 -0800

    [core] Fix Limit push down sort splits violates its general contract. 
(#4491)
---
 .../paimon/table/source/DataTableBatchScan.java    | 39 ++++-----------
 .../paimon/spark/sql/PaimonPushDownTest.scala      | 57 +++++++++++++++++++---
 2 files changed, 61 insertions(+), 35 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index d1d455040..d3e8a2adb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -100,40 +100,21 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
                 return result;
             }
 
-            // We first add the rawConvertible split to avoid merging, and if 
the row count
-            // is still less than limit number, then add split which is not 
rawConvertible.
-            splits.sort(
-                    (x, y) -> {
-                        if (x.rawConvertible() && y.rawConvertible()) {
-                            return 0;
-                        } else if (x.rawConvertible()) {
-                            return -1;
-                        } else {
-                            return 1;
-                        }
-                    });
-
-            // fast return if there is no rawConvertible split
-            if (!splits.get(0).rawConvertible()) {
-                return result;
-            }
-
             List<Split> limitedSplits = new ArrayList<>();
             for (DataSplit dataSplit : splits) {
-                long splitRowCount = getRowCountForSplit(dataSplit);
-                limitedSplits.add(dataSplit);
-                scannedRowCount += splitRowCount;
-                if (scannedRowCount >= pushDownLimit) {
-                    break;
+                if (dataSplit.rawConvertible()) {
+                    long splitRowCount = getRowCountForSplit(dataSplit);
+                    limitedSplits.add(dataSplit);
+                    scannedRowCount += splitRowCount;
+                    if (scannedRowCount >= pushDownLimit) {
+                        SnapshotReader.Plan newPlan =
+                                new PlanImpl(plan.watermark(), 
plan.snapshotId(), limitedSplits);
+                        return new ScannedResult(newPlan);
+                    }
                 }
             }
-
-            SnapshotReader.Plan newPlan =
-                    new PlanImpl(plan.watermark(), plan.snapshotId(), 
limitedSplits);
-            return new ScannedResult(newPlan);
-        } else {
-            return result;
         }
+        return result;
     }
 
     /**
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
index 59968b555..ba314e3af 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
@@ -141,27 +141,72 @@ class PaimonPushDownTest extends PaimonSparkTestBase {
     val scanBuilder = getScanBuilder()
     Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
 
+    // Case 1: All dataSplits is rawConvertible.
     val dataSplitsWithoutLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
     Assertions.assertEquals(4, dataSplitsWithoutLimit.length)
+    // All dataSplits is rawConvertible.
+    dataSplitsWithoutLimit.foreach(
+      splits => {
+        Assertions.assertTrue(splits.asInstanceOf[DataSplit].rawConvertible())
+      })
 
-    // It still return false even it can push down limit.
+    // It still returns false even it can push down limit.
     
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
-
     val dataSplitsWithLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
     Assertions.assertEquals(1, dataSplitsWithLimit.length)
-
     Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
 
-    spark.sql("UPDATE T SET b = 'x' WHERE a = 1")
+    
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
+    val dataSplitsWithLimit1 = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+    Assertions.assertEquals(2, dataSplitsWithLimit1.length)
+    Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
 
+    // Case 2: Update 2 rawConvertible dataSplits to convert to 
nonRawConvertible.
+    spark.sql("INSERT INTO T VALUES (1, 'a2', '11'), (2, 'b2', '22')")
     val scanBuilder2 = getScanBuilder()
     val dataSplitsWithoutLimit2 = 
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
     Assertions.assertEquals(4, dataSplitsWithoutLimit2.length)
-
+    // Now, we have 4 dataSplits, and 2 dataSplit is nonRawConvertible, 2 
dataSplit is rawConvertible.
+    Assertions.assertEquals(
+      2,
+      dataSplitsWithoutLimit2
+        .filter(
+          split => {
+            split.asInstanceOf[DataSplit].rawConvertible()
+          })
+        .length)
+
+    // Return 2 dataSplits.
+    
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
     val dataSplitsWithLimit2 = 
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
-    Assertions.assertEquals(4, dataSplitsWithLimit2.length)
+    Assertions.assertEquals(2, dataSplitsWithLimit2.length)
+    Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
 
+    // 2 dataSplits cannot meet the limit requirement, so need to scan all 
dataSplits.
+    
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(3))
+    val dataSplitsWithLimit22 = 
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+    // Need to scan all dataSplits.
+    Assertions.assertEquals(4, dataSplitsWithLimit22.length)
+    Assertions.assertEquals(3, spark.sql("SELECT * FROM T LIMIT 3").count())
+
+    // Case 3: Update the remaining 2 rawConvertible dataSplits to make all 
dataSplits is nonRawConvertible.
+    spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
+    val scanBuilder3 = getScanBuilder()
+    val dataSplitsWithoutLimit3 = 
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+    Assertions.assertEquals(4, dataSplitsWithoutLimit3.length)
+
+    // All dataSplits is nonRawConvertible.
+    dataSplitsWithoutLimit3.foreach(
+      splits => {
+        Assertions.assertFalse(splits.asInstanceOf[DataSplit].rawConvertible())
+      })
+
+    
Assertions.assertFalse(scanBuilder3.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
+    val dataSplitsWithLimit3 = 
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+    // Need to scan all dataSplits.
+    Assertions.assertEquals(4, dataSplitsWithLimit3.length)
     Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
+
   }
 
   test("Paimon pushDown: runtime filter") {

Reply via email to