cloud-fan commented on code in PR #56092:
URL: https://github.com/apache/spark/pull/56092#discussion_r3299488638
##########
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala:
##########
@@ -789,6 +802,27 @@ private[v2] trait V2JDBCTest
checkSamplePushed(df8, false)
checkFilterPushed(df8)
assert(df8.collect().length < 10)
+
+ // SYSTEM sampling pushdown
+ if (supportsTableSampleSystem) {
+ val df9 = sql(s"SELECT * FROM $catalogName.new_table $tableOptions "
+
+ "TABLESAMPLE SYSTEM (50 PERCENT)")
+ checkSamplePushed(df9)
+ if (partitioningEnabled) {
+ multiplePartitionAdditionalCheck(df1, partitionInfo)
+ }
+ assert(df6.collect().length <= 10)
Review Comment:
`df6` is bounded by `LIMIT 2` (line 781), so `df6.collect().length <= 10` is
trivially true and never exercises the SYSTEM-sampled row count. The two `df6`
references in the new SYSTEM block (this line and line 824) look like
copy-paste from the BERNOULLI cases — they should reference `df9` and `df10`
respectively. With PG TABLESAMPLE SYSTEM on a 10-row, single-block table at
50%, the result will commonly be all 10 rows or 0 rows, so consider asserting
something stronger than `<= 10`, e.g. also verifying that the scan plan
reflects column pruning for `df10` by inspecting collected rows.
##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##########
@@ -868,11 +868,26 @@ abstract class JdbcDialect extends Serializable with
Logging {
*/
def supportsOffset: Boolean = false
+ @deprecated("Use compileTableSample instead", "4.2.0")
def supportsTableSample: Boolean = false
+ @deprecated("Use compileTableSample instead", "4.2.0")
def getTableSample(sample: TableSampleInfo): String =
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3183")
+ /**
+ * Compile a
[[org.apache.spark.sql.execution.datasources.v2.TableSampleInfo]] into a
+ * SQL `TABLESAMPLE` clause, or return [[scala.None]] if the dialect cannot
represent
+ * the requested sampling semantics (e.g. sampling with replacement).
+ *
+ * The default implementation delegates to [[getTableSample]] when
[[supportsTableSample]]
+ * is true, and returns [[scala.None]] otherwise.
+ */
+ @Since("4.2.0")
+ def compileTableSample(sample: TableSampleInfo): Option[String] = {
+ if (supportsTableSample) Some(getTableSample(sample)) else None
+ }
Review Comment:
The default delegation to `getTableSample(sample)` ignores
`sample.sampleMethod` and `sample.withReplacement`. A third-party dialect
compiled against 3.5.x that opted in via the deprecated `supportsTableSample =
true` / `getTableSample(...)` was written under the old contract (BERNOULLI
only, `withReplacement` blindly accepted). With the PR's change,
`JDBCScanBuilder.pushTableSample` no longer gates on
`dialect.supportsTableSample` and always calls `compileTableSample` with the
real `sampleMethod` / `withReplacement`. The result: such a dialect is asked to
compile a SYSTEM-or-withReplacement sample and silently returns the old
BERNOULLI-style string — Spark believes the sample was pushed down with the
requested semantics, the connector runs the wrong semantics. This is the exact
correctness bug the PR cites as motivation, just shifted onto legacy
third-party dialects.
Suggested fix — gate the legacy path to the original contract:
```suggestion
def compileTableSample(sample: TableSampleInfo): Option[String] = {
if (supportsTableSample &&
!sample.withReplacement &&
sample.sampleMethod == SampleMethod.Bernoulli) {
Some(getTableSample(sample))
} else {
None
}
}
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala:
##########
@@ -32,6 +32,8 @@ import org.apache.spark.sql.connector.catalog.functions._
import
org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
import org.apache.spark.sql.connector.expressions.{BucketTransform, Cast =>
V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression,
IdentityTransform, Literal => V2Literal, NamedReference, NamedTransform,
NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder =>
V2SortOrder, SortValue, Transform}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse,
AlwaysTrue}
+import org.apache.spark.sql.connector.read.{SampleMethod => V2SampleMethod}
+import org.apache.spark.sql.catalyst.plans.logical.{SampleMethod}
Review Comment:
Two nits on these new imports:
- The `catalyst.plans.logical` block is out of order — it should be folded
into the existing import on line 29 (`{LocalRelation, LogicalPlan,
SampleMethod}`), rather than appearing below `connector.read`.
- The braces around a single import (`{SampleMethod}`) are unnecessary.
##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##########
@@ -868,11 +868,26 @@ abstract class JdbcDialect extends Serializable with
Logging {
*/
def supportsOffset: Boolean = false
+ @deprecated("Use compileTableSample instead", "4.2.0")
def supportsTableSample: Boolean = false
+ @deprecated("Use compileTableSample instead", "4.2.0")
def getTableSample(sample: TableSampleInfo): String =
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3183")
+ /**
+ * Compile a
[[org.apache.spark.sql.execution.datasources.v2.TableSampleInfo]] into a
+ * SQL `TABLESAMPLE` clause, or return [[scala.None]] if the dialect cannot
represent
Review Comment:
Typo: double space — "if the dialect" → "if the dialect".
##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala:
##########
@@ -170,10 +169,10 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options:
JDBCOptions) {
}
/**
- * Constructs the table sample clause that following dialect's SQL syntax.
+ * Sets a pre-compiled table sample clause directly.
*/
- def withTableSample(sample: TableSampleInfo): JdbcSQLQueryBuilder = {
- tableSampleClause = dialect.getTableSample(sample)
+ def withTableSampleClause(clause: String): JdbcSQLQueryBuilder = {
Review Comment:
Re: your self-comment on MIMA — `JdbcSQLQueryBuilder` is `@since 3.5.0` and
is the documented extension point (Oracle / MySQL subclass it via
`getJdbcSQLQueryBuilder`, third parties can too), so removing
`withTableSample(TableSampleInfo)` is a real released-ABI break, not just MIMA
noise to exclude. Cleanest fix is to keep it as a deprecated bridge:
```scala
@deprecated("Use withTableSampleClause(String) instead", "4.2.0")
def withTableSample(sample: TableSampleInfo): JdbcSQLQueryBuilder = {
tableSampleClause = dialect.getTableSample(sample)
this
}
def withTableSampleClause(clause: String): JdbcSQLQueryBuilder = {
tableSampleClause = clause
this
}
```
I'd keep the bridge body calling `getTableSample` rather than
`compileTableSample` for behavior parity — the old method body was identical,
so any third-party caller sees the same behavior as before.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]