cloud-fan commented on a change in pull request #34451:
URL: https://github.com/apache/spark/pull/34451#discussion_r741619164



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -225,13 +226,33 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       withProjection
   }
 
+  def applySample(plan: LogicalPlan): LogicalPlan = plan.transform {

Review comment:
       nit: `pushDownSample`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -370,6 +371,10 @@ abstract class JdbcDialect extends Serializable with 
Logging{
    * returns whether the dialect supports limit or not
    */
   def supportsLimit(): Boolean = true
+
+  def supportsTableSample: Boolean = false
+
+  def getTableSample(sample: TableSampleInfo): String = ""

Review comment:
       shall we throw `UnsupportedOperationException` by default?

##########
File path: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -284,4 +288,141 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testIndexUsingSQL(s"$catalogName.new_table")
     }
   }
+
+  def supportsTableSample: Boolean = false
+
+  test("SPARK-37038: Test TABLESAMPLE") {
+    require(supportsTableSample)
+    withTable(s"$catalogName.new_table") {
+      sql(s"CREATE TABLE $catalogName.new_table (col1 INT, col2 INT)")
+      spark.range(10).select($"id" * 2, $"id" * 2 + 
1).write.insertInto(s"$catalogName.new_table")
+
+      val df1 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE 
(BUCKET 6 OUT OF 10)" +

Review comment:
       nit: it would be better to put some comments about the test, e.g. `// 
sample pushdown + column prunning` here

##########
File path: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -284,4 +288,141 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testIndexUsingSQL(s"$catalogName.new_table")
     }
   }
+
+  def supportsTableSample: Boolean = false
+
+  test("SPARK-37038: Test TABLESAMPLE") {
+    require(supportsTableSample)
+    withTable(s"$catalogName.new_table") {
+      sql(s"CREATE TABLE $catalogName.new_table (col1 INT, col2 INT)")
+      spark.range(10).select($"id" * 2, $"id" * 2 + 
1).write.insertInto(s"$catalogName.new_table")
+
+      val df1 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE 
(BUCKET 6 OUT OF 10)" +

Review comment:
       nit: it would be better to put some comments about the test, e.g. `// 
sample pushdown + column pruning` here

##########
File path: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -284,4 +288,141 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testIndexUsingSQL(s"$catalogName.new_table")
     }
   }
+
+  def supportsTableSample: Boolean = false
+
+  test("SPARK-37038: Test TABLESAMPLE") {
+    require(supportsTableSample)
+    withTable(s"$catalogName.new_table") {
+      sql(s"CREATE TABLE $catalogName.new_table (col1 INT, col2 INT)")
+      spark.range(10).select($"id" * 2, $"id" * 2 + 
1).write.insertInto(s"$catalogName.new_table")
+
+      val df1 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE 
(BUCKET 6 OUT OF 10)" +
+        " REPEATABLE (12345)")
+      val scan1 = df1.queryExecution.optimizedPlan.collectFirst {
+        case s: DataSourceV2ScanRelation => s
+      }.get
+      assert(scan1.schema.names.sameElements(Seq("col1")))
+
+      val sample1 = df1.queryExecution.optimizedPlan.collect {
+        case s: Sample => s
+      }
+      assert(sample1.isEmpty)
+      assert(df1.collect().length <= 7)
+
+      val df2 = sql(s"SELECT * FROM $catalogName.new_table TABLESAMPLE (50 
PERCENT)" +
+        " REPEATABLE (12345)")
+      val sample2 = df2.queryExecution.optimizedPlan.collect {
+        case s: Sample => s
+      }
+      assert(sample2.isEmpty)

Review comment:
       can we write a small method for this check? `def assertSamplePushed(df: 
DataFrame) ...`

##########
File path: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
##########
@@ -284,4 +288,141 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testIndexUsingSQL(s"$catalogName.new_table")
     }
   }
+
+  def supportsTableSample: Boolean = false
+
+  test("SPARK-37038: Test TABLESAMPLE") {
+    require(supportsTableSample)
+    withTable(s"$catalogName.new_table") {
+      sql(s"CREATE TABLE $catalogName.new_table (col1 INT, col2 INT)")
+      spark.range(10).select($"id" * 2, $"id" * 2 + 
1).write.insertInto(s"$catalogName.new_table")
+
+      val df1 = sql(s"SELECT col1 FROM $catalogName.new_table TABLESAMPLE 
(BUCKET 6 OUT OF 10)" +
+        " REPEATABLE (12345)")
+      val scan1 = df1.queryExecution.optimizedPlan.collectFirst {
+        case s: DataSourceV2ScanRelation => s
+      }.get
+      assert(scan1.schema.names.sameElements(Seq("col1")))
+
+      val sample1 = df1.queryExecution.optimizedPlan.collect {
+        case s: Sample => s
+      }
+      assert(sample1.isEmpty)
+      assert(df1.collect().length <= 7)
+
+      val df2 = sql(s"SELECT * FROM $catalogName.new_table TABLESAMPLE (50 
PERCENT)" +
+        " REPEATABLE (12345)")
+      val sample2 = df2.queryExecution.optimizedPlan.collect {
+        case s: Sample => s
+      }
+      assert(sample2.isEmpty)

Review comment:
       We can also add `def assertFilterPushed`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to