This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e1babc642556 [SPARK-55978][SQL][FOLLOWUP] Don't block V2 join pushdown 
when pushed Sample has fraction=1 without replacement
e1babc642556 is described below

commit e1babc6425562b0f9ebf13e63bb540cf96eaef11
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue May 26 02:25:30 2026 +0800

    [SPARK-55978][SQL][FOLLOWUP] Don't block V2 join pushdown when pushed 
Sample has fraction=1 without replacement
    
    ### What changes were proposed in this pull request?
    
    Followup to https://github.com/apache/spark/pull/54972.
    
    Narrow the `V2ScanRelationPushDown` join-pushdown guard so it only blocks 
pushdown when at least one side has a pushed `Sample` that can actually change 
the result set. A pushed sample is treated as a no-op when **both** 
`withReplacement = false` **and** `upperBound - lowerBound >= 1.0` (fraction = 
1). In that case dropping the sample inside the merged scan builder is safe.
    
    ### Why are the changes needed?
    
    The guard added in SPARK-55978 exists because the merged scan builder for 
`SupportsPushDownJoin` cannot carry a pushed sample and would silently discard 
it. The hazard is *silent result change*. For a `Sample` without replacement at 
fraction = 1, no rows are excluded, so dropping the sample changes nothing 
observable. The current guard is therefore stricter than its rationale 
requires, and unnecessarily skips join pushdown for queries that land at 
`TABLESAMPLE SYSTEM (100 PERCENT)` (p [...]
    
    With replacement is a different story: `SampleExec` uses `PoissonSampler`, 
and even at rate 1.0 each input row can be emitted 0, 1, 2, … times, so it is 
**not** a no-op. SQL `TABLESAMPLE` always sets `withReplacement = false`, but 
`DataFrame.sample(withReplacement = true, fraction = 1.0)` can be pushed to 
DSv2, so the guard must keep blocking that case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavior change for queries with fraction < 1, or for `withReplacement = 
true` at any fraction. For queries where the pushed sample has `withReplacement 
= false` and fraction = 1, join pushdown now proceeds — same result set, faster 
plan.
    
    ### How was this patch tested?
    
    - Existing `"join pushdown is skipped when a side has a pushed sample"` 
test moved from `100 PERCENT` to `50 PERCENT` so it keeps exercising the 
(still-active) fraction < 1 branch of the guard.
    - New `"100% SYSTEM sample does not block join pushdown"` test asserts the 
new fraction = 1, without-replacement short-circuit.
    - New `"with-replacement sample blocks join pushdown even at fraction 1"` 
test uses the DataFrame API to assert that Poisson sampling at fraction 1 keeps 
blocking join pushdown.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #56031 from cloud-fan/SPARK-55978-followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../datasources/v2/V2ScanRelationPushDown.scala    | 13 +++--
 .../connector/DataSourceV2TableSampleSuite.scala   | 58 +++++++++++++++++++++-
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index 60a2017e6947..a1c69847c509 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -151,11 +151,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
         rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
         // Cross joins are not supported because they increase the amount of 
data.
         condition.isDefined &&
-        // Do not push down join if either side has a pushed sample, because
-        // the merged scan builder would silently discard it.
+        // Do not push down join if either side has a pushed sample with
+        // fraction < 1, because the merged scan builder would silently
+        // discard it and change the result. At fraction = 1 without
+        // replacement the sample is a no-op on the result set, so dropping
+        // it is safe. With replacement (Poisson sampling), even fraction 1
+        // can emit each row 0, 1, 2, ... times, so it is not a no-op.
         // TODO(SPARK-56504): Extend SupportsPushDownJoin to accept pushed
         //   samples so sources supporting both can handle the composition.
-        leftHolder.pushedSample.isEmpty && rightHolder.pushedSample.isEmpty &&
+        leftHolder.pushedSample.forall(s =>
+          !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) &&
+        rightHolder.pushedSample.forall(s =>
+          !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) &&
         lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
       // Process left and right columns in original order
       val (leftSideRequiredColumnsWithAliases, 
rightSideRequiredColumnsWithAliases) =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
index 76ec2e588eae..164c098e95e8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
@@ -171,9 +171,11 @@ class DataSourceV2TableSampleSuite extends 
DatasourceV2SQLBase
         val dfNoSample = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.id = $t2.id")
         checkJoinPushed(dfNoSample)
 
-        // With SYSTEM sample on one side: join pushdown should be skipped
+        // With a SYSTEM sample (fraction < 1) on one side: join pushdown
+        // should be skipped because the merged scan builder would silently
+        // discard the sample.
         val dfWithSample = sql(
-          s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " +
+          s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (50 PERCENT) " +
           s"JOIN $t2 ON $t1.id = $t2.id")
         checkJoinNotPushed(dfWithSample)
         // The sample should still be pushed down though
@@ -185,6 +187,58 @@ class DataSourceV2TableSampleSuite extends 
DatasourceV2SQLBase
     }
   }
 
+  test("SPARK-55978: 100% SYSTEM sample does not block join pushdown") {
+    val joinSampleCatalog = "testjoinsample100"
+    registerCatalog(joinSampleCatalog, 
classOf[InMemoryTableWithJoinAndSampleCatalog])
+    val t1 = s"$joinSampleCatalog.ns.t1"
+    val t2 = s"$joinSampleCatalog.ns.t2"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING _")
+    sql(s"CREATE TABLE $t2 (id bigint, data string) USING _")
+    try {
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')")
+      withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+        // At fraction = 1 the sample is a no-op on the result set, so
+        // dropping it inside the merged scan builder is safe. The guard
+        // in V2ScanRelationPushDown short-circuits and join pushdown
+        // proceeds.
+        val dfWithSample = sql(
+          s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " +
+          s"JOIN $t2 ON $t1.id = $t2.id")
+        checkJoinPushed(dfWithSample)
+      }
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $t1")
+      sql(s"DROP TABLE IF EXISTS $t2")
+    }
+  }
+
+  test("SPARK-55978: with-replacement sample blocks join pushdown even at 
fraction 1") {
+    val joinSampleCatalog = "testjoinsamplerepl"
+    registerCatalog(joinSampleCatalog, 
classOf[InMemoryTableWithJoinAndSampleCatalog])
+    val t1 = s"$joinSampleCatalog.ns.t1"
+    val t2 = s"$joinSampleCatalog.ns.t2"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING _")
+    sql(s"CREATE TABLE $t2 (id bigint, data string) USING _")
+    try {
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')")
+      withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+        // SQL TABLESAMPLE always sets withReplacement=false, so use the
+        // DataFrame API. Poisson sampling at fraction 1 still emits each
+        // input row 0, 1, 2, ... times, so the sample is not a no-op and
+        // join pushdown must remain blocked.
+        val df = spark.table(t1).sample(withReplacement = true, fraction = 1.0)
+          .join(spark.table(t2), "id")
+        checkJoinNotPushed(df)
+        checkSamplePushed(df, pushed = true)
+      }
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $t1")
+      sql(s"DROP TABLE IF EXISTS $t2")
+    }
+  }
+
   test("SPARK-55978: legacy connector with only 4-arg pushTableSample - 
BERNOULLI pushes down") {
     val legacyCatalog = "testlegacysample"
     registerCatalog(legacyCatalog, 
classOf[InMemoryTableWithLegacyTableSampleCatalog])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to