This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 58e5d86cc07 [SPARK-44503][SQL] Add SQL grammar for PARTITION BY and 
ORDER BY clause after TABLE arguments for TVF calls
58e5d86cc07 is described below

commit 58e5d86cc076d4546dac5e1f594977d615ec1e7a
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Mon Jul 24 16:08:46 2023 -0700

    [SPARK-44503][SQL] Add SQL grammar for PARTITION BY and ORDER BY clause 
after TABLE arguments for TVF calls
    
    ### What changes were proposed in this pull request?
    
    This PR adds SQL grammar for PARTITION BY and ORDER BY clause after TABLE 
arguments for TVF calls.
    
    Note that this PR just includes the SQL grammar and parsing part, no 
analysis support is implemented yet, that will come next.
    
    Examples:
    
    ```
    select * from tvf(arg1 => table(t1) partition by col1);
    select * from tvf(arg1 => table(t1) partition by col1 order by col2 asc);
    select * from tvf(arg1 => table(t1) partition by col1, col2 order by col2 
asc, col3 desc);
    select * from tvf(arg1 => table(select col1, col2, col3 from v2)
        partition by col1, col2 order by col2 asc, col3 desc);
    ```
    
    ### Why are the changes needed?
    
    This will provide a way for the TVF caller to indicate desired semantics 
for dividing up the rows of the input table into partitions for consumption by 
the underlying algorithm.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it adds new SQL grammar.
    
    ### How was this patch tested?
    
    This PR adds new parser unit tests. Currently the parser returns "not 
implemented yet" error for these cases, and we will implement analysis for them 
next.
    
    Closes #42100 from dtenedor/partition-by-clause.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     | 11 +++--
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  7 +++
 .../sql/catalyst/parser/PlanParserSuite.scala      | 51 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 3 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 85dbc499fbd..372e0b54732 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -789,9 +789,14 @@ inlineTable
     ;
 
 functionTableSubqueryArgument
-    : TABLE identifierReference
-    | TABLE LEFT_PAREN identifierReference RIGHT_PAREN
-    | TABLE LEFT_PAREN query RIGHT_PAREN
+    : TABLE identifierReference tableArgumentPartitioning?
+    | TABLE LEFT_PAREN identifierReference RIGHT_PAREN 
tableArgumentPartitioning?
+    | TABLE LEFT_PAREN query RIGHT_PAREN tableArgumentPartitioning?
+    ;
+
+tableArgumentPartitioning
+    : (PARTITION | DISTRIBUTE) BY expressionSeq
+      ((ORDER | SORT) BY sortItem (COMMA sortItem)*)?
     ;
 
 functionTableNamedArgumentExpression
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7a28efa3e42..ccfcd13440c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1564,6 +1564,13 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     }.getOrElse {
       plan(ctx.query)
     }
+    val partitioning = Option(ctx.tableArgumentPartitioning)
+    if (partitioning.isDefined) {
+      // The PARTITION BY clause is not implemented yet for TABLE arguments to 
table valued function
+      // calls.
+      operationNotAllowed(
+        "Specifying the PARTITION BY clause for TABLE arguments is not 
implemented yet", ctx)
+    }
     FunctionTableSubqueryArgumentExpression(p)
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 4a5d0a0ae29..1d50e3bb479 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -1492,6 +1492,57 @@ class PlanParserSuite extends AnalysisTest {
         stop = sql1.length - 2))
   }
 
+  test("SPARK-44503: Support PARTITION BY and ORDER BY clause for TVF TABLE 
arguments") {
+    Seq("partition", "distribute").foreach { partition =>
+      Seq("order", "sort").foreach { order =>
+        val sql1suffix = s"table(v1) $partition by col1"
+        val sql1 = s"select * from my_tvf(arg1 => $sql1suffix)"
+        val startIndex = 29
+        val message =
+          "Specifying the PARTITION BY clause for TABLE arguments is not 
implemented yet"
+        checkError(
+          exception = parseException(sql1),
+          errorClass = "_LEGACY_ERROR_TEMP_0035",
+          parameters = Map("message" -> message),
+          context = ExpectedContext(
+            fragment = sql1suffix,
+            start = startIndex,
+            stop = sql1.length - 2))
+        val sql2suffix = s"table(v1) $partition by col1 $order by col2 asc"
+        val sql2 = s"select * from my_tvf(arg1 => $sql2suffix)"
+        checkError(
+          exception = parseException(sql2),
+          errorClass = "_LEGACY_ERROR_TEMP_0035",
+          parameters = Map("message" -> message),
+          context = ExpectedContext(
+            fragment = sql2suffix,
+            start = startIndex,
+            stop = sql2.length - 2))
+        val sql3suffix = s"table(v1) $partition by col1, col2 $order by col2 
asc, col3 desc"
+        val sql3 = s"select * from my_tvf(arg1 => $sql3suffix)"
+        checkError(
+          exception = parseException(sql3),
+          errorClass = "_LEGACY_ERROR_TEMP_0035",
+          parameters = Map("message" -> message),
+          context = ExpectedContext(
+            fragment = sql3suffix,
+            start = startIndex,
+            stop = sql3.length - 2))
+        val sql4Suffix = s"table(select col1, col2, col3 from v2) $partition 
by col1, col2 " +
+            s"$order by col2 asc, col3 desc"
+        val sql4 = s"select * from my_tvf(arg1 => $sql4Suffix)"
+        checkError(
+          exception = parseException(sql4),
+          errorClass = "_LEGACY_ERROR_TEMP_0035",
+          parameters = Map("message" -> message),
+          context = ExpectedContext(
+            fragment = sql4Suffix,
+            start = startIndex,
+            stop = sql4.length - 2))
+      }
+    }
+  }
+
   test("SPARK-32106: TRANSFORM plan") {
     // verify schema less
     assertEqual(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to