This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new f90351757526 [SPARK-55978][SQL][FOLLOWUP] Don't block V2 join pushdown
when pushed Sample has fraction=1 without replacement
f90351757526 is described below
commit f9035175752604244da834b173e01559f21285b8
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue May 26 02:25:30 2026 +0800
[SPARK-55978][SQL][FOLLOWUP] Don't block V2 join pushdown when pushed
Sample has fraction=1 without replacement
### What changes were proposed in this pull request?
Followup to https://github.com/apache/spark/pull/54972.
Narrow the `V2ScanRelationPushDown` join-pushdown guard so it only blocks
pushdown when at least one side has a pushed `Sample` that can actually change
the result set. A pushed sample is treated as a no-op when **both**
`withReplacement = false` **and** `upperBound - lowerBound >= 1.0` (fraction =
1). In that case dropping the sample inside the merged scan builder is safe.
### Why are the changes needed?
The guard added in SPARK-55978 exists because the merged scan builder for
`SupportsPushDownJoin` cannot carry a pushed sample and would silently discard
it. The hazard is *silent result change*. For a `Sample` without replacement at
fraction = 1, no rows are excluded, so dropping the sample changes nothing
observable. The current guard is therefore stricter than its rationale
requires, and unnecessarily skips join pushdown for queries that land at
`TABLESAMPLE SYSTEM (100 PERCENT)` (p [...]
With replacement is a different story: `SampleExec` uses `PoissonSampler`,
and even at rate 1.0 each input row can be emitted 0, 1, 2, … times, so it is
**not** a no-op. SQL `TABLESAMPLE` always sets `withReplacement = false`, but
`DataFrame.sample(withReplacement = true, fraction = 1.0)` can be pushed to
DSv2, so the guard must keep blocking that case.
### Does this PR introduce _any_ user-facing change?
No behavior change for queries with fraction < 1, or for `withReplacement =
true` at any fraction. For queries where the pushed sample has `withReplacement
= false` and fraction = 1, join pushdown now proceeds — same result set, faster
plan.
### How was this patch tested?
- Existing `"join pushdown is skipped when a side has a pushed sample"`
test moved from `100 PERCENT` to `50 PERCENT` so it keeps exercising the
(still-active) fraction < 1 branch of the guard.
- New `"100% SYSTEM sample does not block join pushdown"` test asserts the
new fraction = 1, without-replacement short-circuit.
- New `"with-replacement sample blocks join pushdown even at fraction 1"`
test uses the DataFrame API to assert that Poisson sampling at fraction 1 keeps
blocking join pushdown.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #56031 from cloud-fan/SPARK-55978-followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e1babc6425562b0f9ebf13e63bb540cf96eaef11)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../datasources/v2/V2ScanRelationPushDown.scala | 13 +++--
.../connector/DataSourceV2TableSampleSuite.scala | 58 +++++++++++++++++++++-
2 files changed, 66 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index 60a2017e6947..a1c69847c509 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -151,11 +151,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
// Cross joins are not supported because they increase the amount of
data.
condition.isDefined &&
- // Do not push down join if either side has a pushed sample, because
- // the merged scan builder would silently discard it.
+ // Do not push down join if either side has a pushed sample with
+ // fraction < 1, because the merged scan builder would silently
+ // discard it and change the result. At fraction = 1 without
+ // replacement the sample is a no-op on the result set, so dropping
+ // it is safe. With replacement (Poisson sampling), even fraction 1
+ // can emit each row 0, 1, 2, ... times, so it is not a no-op.
// TODO(SPARK-56504): Extend SupportsPushDownJoin to accept pushed
// samples so sources supporting both can handle the composition.
- leftHolder.pushedSample.isEmpty && rightHolder.pushedSample.isEmpty &&
+ leftHolder.pushedSample.forall(s =>
+ !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) &&
+ rightHolder.pushedSample.forall(s =>
+ !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) &&
lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
// Process left and right columns in original order
val (leftSideRequiredColumnsWithAliases,
rightSideRequiredColumnsWithAliases) =
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
index 76ec2e588eae..164c098e95e8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala
@@ -171,9 +171,11 @@ class DataSourceV2TableSampleSuite extends
DatasourceV2SQLBase
val dfNoSample = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.id = $t2.id")
checkJoinPushed(dfNoSample)
- // With SYSTEM sample on one side: join pushdown should be skipped
+ // With a SYSTEM sample (fraction < 1) on one side: join pushdown
+ // should be skipped because the merged scan builder would silently
+ // discard the sample.
val dfWithSample = sql(
- s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " +
+ s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (50 PERCENT) " +
s"JOIN $t2 ON $t1.id = $t2.id")
checkJoinNotPushed(dfWithSample)
// The sample should still be pushed down though
@@ -185,6 +187,58 @@ class DataSourceV2TableSampleSuite extends
DatasourceV2SQLBase
}
}
+ test("SPARK-55978: 100% SYSTEM sample does not block join pushdown") {
+ val joinSampleCatalog = "testjoinsample100"
+ registerCatalog(joinSampleCatalog,
classOf[InMemoryTableWithJoinAndSampleCatalog])
+ val t1 = s"$joinSampleCatalog.ns.t1"
+ val t2 = s"$joinSampleCatalog.ns.t2"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING _")
+ sql(s"CREATE TABLE $t2 (id bigint, data string) USING _")
+ try {
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')")
+ withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+ // At fraction = 1 the sample is a no-op on the result set, so
+ // dropping it inside the merged scan builder is safe. The guard
+ // in V2ScanRelationPushDown short-circuits and join pushdown
+ // proceeds.
+ val dfWithSample = sql(
+ s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " +
+ s"JOIN $t2 ON $t1.id = $t2.id")
+ checkJoinPushed(dfWithSample)
+ }
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $t1")
+ sql(s"DROP TABLE IF EXISTS $t2")
+ }
+ }
+
+ test("SPARK-55978: with-replacement sample blocks join pushdown even at
fraction 1") {
+ val joinSampleCatalog = "testjoinsamplerepl"
+ registerCatalog(joinSampleCatalog,
classOf[InMemoryTableWithJoinAndSampleCatalog])
+ val t1 = s"$joinSampleCatalog.ns.t1"
+ val t2 = s"$joinSampleCatalog.ns.t2"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING _")
+ sql(s"CREATE TABLE $t2 (id bigint, data string) USING _")
+ try {
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')")
+ withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+ // SQL TABLESAMPLE always sets withReplacement=false, so use the
+ // DataFrame API. Poisson sampling at fraction 1 still emits each
+ // input row 0, 1, 2, ... times, so the sample is not a no-op and
+ // join pushdown must remain blocked.
+ val df = spark.table(t1).sample(withReplacement = true, fraction = 1.0)
+ .join(spark.table(t2), "id")
+ checkJoinNotPushed(df)
+ checkSamplePushed(df, pushed = true)
+ }
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $t1")
+ sql(s"DROP TABLE IF EXISTS $t2")
+ }
+ }
+
test("SPARK-55978: legacy connector with only 4-arg pushTableSample -
BERNOULLI pushes down") {
val legacyCatalog = "testlegacysample"
registerCatalog(legacyCatalog,
classOf[InMemoryTableWithLegacyTableSampleCatalog])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]