cloud-fan commented on code in PR #56092:
URL: https://github.com/apache/spark/pull/56092#discussion_r3299488638


##########
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala:
##########
@@ -789,6 +802,27 @@ private[v2] trait V2JDBCTest
         checkSamplePushed(df8, false)
         checkFilterPushed(df8)
         assert(df8.collect().length < 10)
+
+        // SYSTEM sampling pushdown
+        if (supportsTableSampleSystem) {
+          val df9 = sql(s"SELECT * FROM $catalogName.new_table $tableOptions " 
+
+            "TABLESAMPLE SYSTEM (50 PERCENT)")
+          checkSamplePushed(df9)
+          if (partitioningEnabled) {
+            multiplePartitionAdditionalCheck(df1, partitionInfo)
+          }
+          assert(df6.collect().length <= 10)

Review Comment:
   `df6` is bounded by `LIMIT 2` (line 781), so `df6.collect().length <= 10` is 
trivially true and never exercises the SYSTEM-sampled row count. The two `df6` 
references in the new SYSTEM block (this line and line 824) look like 
copy-paste from the BERNOULLI cases — they should reference `df9` and `df10` 
respectively. With PG TABLESAMPLE SYSTEM on a 10-row, single-block table at 
50%, the result will commonly be all 10 rows or 0 rows, so consider asserting 
something stronger than `<= 10`, e.g. also verifying that the scan plan 
reflects column pruning for `df10` by inspecting collected rows.



##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##########
@@ -868,11 +868,26 @@ abstract class JdbcDialect extends Serializable with 
Logging {
    */
   def supportsOffset: Boolean = false
 
+  @deprecated("Use compileTableSample instead", "4.2.0")
   def supportsTableSample: Boolean = false
 
+  @deprecated("Use compileTableSample instead", "4.2.0")
   def getTableSample(sample: TableSampleInfo): String =
     throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3183")
 
+  /**
+   * Compile a 
[[org.apache.spark.sql.execution.datasources.v2.TableSampleInfo]] into a
+   * SQL `TABLESAMPLE` clause, or return [[scala.None]] if the  dialect cannot 
represent
+   * the requested sampling semantics (e.g. sampling with replacement).
+   *
+   * The default implementation delegates to [[getTableSample]] when 
[[supportsTableSample]]
+   * is true, and returns [[scala.None]] otherwise.
+   */
+  @Since("4.2.0")
+  def compileTableSample(sample: TableSampleInfo): Option[String] = {
+    if (supportsTableSample) Some(getTableSample(sample)) else None
+  }

Review Comment:
   The default delegation to `getTableSample(sample)` ignores 
`sample.sampleMethod` and `sample.withReplacement`. A third-party dialect 
compiled against 3.5.x that opted in via the deprecated `supportsTableSample = 
true` / `getTableSample(...)` was written under the old contract (BERNOULLI 
only, `withReplacement` blindly accepted). With the PR's change, 
`JDBCScanBuilder.pushTableSample` no longer gates on 
`dialect.supportsTableSample` and always calls `compileTableSample` with the 
real `sampleMethod` / `withReplacement`. The result: such a dialect is asked to 
compile a SYSTEM-or-withReplacement sample and silently returns the old 
BERNOULLI-style string — Spark believes the sample was pushed down with the 
requested semantics, the connector runs the wrong semantics. This is the exact 
correctness bug the PR cites as motivation, just shifted onto legacy 
third-party dialects.
   
   Suggested fix — gate the legacy path to the original contract:
   
   ```suggestion
     def compileTableSample(sample: TableSampleInfo): Option[String] = {
       if (supportsTableSample &&
           !sample.withReplacement &&
           sample.sampleMethod == SampleMethod.Bernoulli) {
         Some(getTableSample(sample))
       } else {
         None
       }
     }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala:
##########
@@ -32,6 +32,8 @@ import org.apache.spark.sql.connector.catalog.functions._
 import 
org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
 import org.apache.spark.sql.connector.expressions.{BucketTransform, Cast => 
V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, 
IdentityTransform, Literal => V2Literal, NamedReference, NamedTransform, 
NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => 
V2SortOrder, SortValue, Transform}
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue}
+import org.apache.spark.sql.connector.read.{SampleMethod => V2SampleMethod}
+import org.apache.spark.sql.catalyst.plans.logical.{SampleMethod}

Review Comment:
   Two nits on these new imports:
   - The `catalyst.plans.logical` block is out of order — it should be folded 
into the existing import on line 29 (`{LocalRelation, LogicalPlan, 
SampleMethod}`), rather than appearing below `connector.read`.
   - The braces around a single import (`{SampleMethod}`) are unnecessary.



##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##########
@@ -868,11 +868,26 @@ abstract class JdbcDialect extends Serializable with 
Logging {
    */
   def supportsOffset: Boolean = false
 
+  @deprecated("Use compileTableSample instead", "4.2.0")
   def supportsTableSample: Boolean = false
 
+  @deprecated("Use compileTableSample instead", "4.2.0")
   def getTableSample(sample: TableSampleInfo): String =
     throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3183")
 
+  /**
+   * Compile a 
[[org.apache.spark.sql.execution.datasources.v2.TableSampleInfo]] into a
+   * SQL `TABLESAMPLE` clause, or return [[scala.None]] if the  dialect cannot 
represent

Review Comment:
   Typo: double space — "if the  dialect" → "if the dialect".



##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala:
##########
@@ -170,10 +169,10 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: 
JDBCOptions) {
   }
 
   /**
-   * Constructs the table sample clause that following dialect's SQL syntax.
+   * Sets a pre-compiled table sample clause directly.
    */
-  def withTableSample(sample: TableSampleInfo): JdbcSQLQueryBuilder = {
-    tableSampleClause = dialect.getTableSample(sample)
+  def withTableSampleClause(clause: String): JdbcSQLQueryBuilder = {

Review Comment:
   Re: your self-comment on MIMA — `JdbcSQLQueryBuilder` is `@since 3.5.0` and 
is the documented extension point (Oracle / MySQL subclass it via 
`getJdbcSQLQueryBuilder`, third parties can too), so removing 
`withTableSample(TableSampleInfo)` is a real released-ABI break, not just MIMA 
noise to exclude. Cleanest fix is to keep it as a deprecated bridge:
   
   ```scala
   @deprecated("Use withTableSampleClause(String) instead", "4.2.0")
   def withTableSample(sample: TableSampleInfo): JdbcSQLQueryBuilder = {
     tableSampleClause = dialect.getTableSample(sample)
     this
   }
   
   def withTableSampleClause(clause: String): JdbcSQLQueryBuilder = {
     tableSampleClause = clause
     this
   }
   ```
   
   I'd keep the bridge body calling `getTableSample` rather than 
`compileTableSample` for behavior parity — the old method body was identical, 
so any third-party caller sees the same behavior as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to