This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 58e5d86cc07 [SPARK-44503][SQL] Add SQL grammar for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls 58e5d86cc07 is described below commit 58e5d86cc076d4546dac5e1f594977d615ec1e7a Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Mon Jul 24 16:08:46 2023 -0700 [SPARK-44503][SQL] Add SQL grammar for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls ### What changes were proposed in this pull request? This PR adds SQL grammar for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls. Note that this PR just includes the SQL grammar and parsing part, no analysis support is implemented yet, that will come next. Examples: ``` select * from tvf(arg1 => table(t1) partition by col1); select * from tvf(arg1 => table(t1) partition by col1 order by col2 asc); select * from tvf(arg1 => table(t1) partition by col1, col2 order by col2 asc, col3 desc); select * from tvf(arg1 => table(select col1, col2, col3 from v2) partition by col1, col2 order by col2 asc, col3 desc); ``` ### Why are the changes needed? This will provide a way for the TVF caller to indicate desired semantics for dividing up the rows of the input table into partitions for consumption by the underlying algorithm. ### Does this PR introduce _any_ user-facing change? Yes, it adds new SQL grammar. ### How was this patch tested? This PR adds new parser unit tests. Currently the parser returns "not implemented yet" error for these cases, and we will implement analysis for them next. Closes #42100 from dtenedor/partition-by-clause. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Takuya UESHIN <ues...@databricks.com> --- .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 11 +++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 7 +++ .../sql/catalyst/parser/PlanParserSuite.scala | 51 ++++++++++++++++++++++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 85dbc499fbd..372e0b54732 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -789,9 +789,14 @@ inlineTable ; functionTableSubqueryArgument - : TABLE identifierReference - | TABLE LEFT_PAREN identifierReference RIGHT_PAREN - | TABLE LEFT_PAREN query RIGHT_PAREN + : TABLE identifierReference tableArgumentPartitioning? + | TABLE LEFT_PAREN identifierReference RIGHT_PAREN tableArgumentPartitioning? + | TABLE LEFT_PAREN query RIGHT_PAREN tableArgumentPartitioning? + ; + +tableArgumentPartitioning + : (PARTITION | DISTRIBUTE) BY expressionSeq + ((ORDER | SORT) BY sortItem (COMMA sortItem)*)? ; functionTableNamedArgumentExpression diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7a28efa3e42..ccfcd13440c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1564,6 +1564,13 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { }.getOrElse { plan(ctx.query) } + val partitioning = Option(ctx.tableArgumentPartitioning) + if (partitioning.isDefined) { + // The PARTITION BY clause is not implemented yet for TABLE arguments to table valued function + // calls. + operationNotAllowed( + "Specifying the PARTITION BY clause for TABLE arguments is not implemented yet", ctx) + } FunctionTableSubqueryArgumentExpression(p) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 4a5d0a0ae29..1d50e3bb479 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -1492,6 +1492,57 @@ class PlanParserSuite extends AnalysisTest { stop = sql1.length - 2)) } + test("SPARK-44503: Support PARTITION BY and ORDER BY clause for TVF TABLE arguments") { + Seq("partition", "distribute").foreach { partition => + Seq("order", "sort").foreach { order => + val sql1suffix = s"table(v1) $partition by col1" + val sql1 = s"select * from my_tvf(arg1 => $sql1suffix)" + val startIndex = 29 + val message = + "Specifying the PARTITION BY clause for TABLE arguments is not implemented yet" + checkError( + exception = parseException(sql1), + errorClass = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> message), + context = ExpectedContext( + fragment = sql1suffix, + start = startIndex, + stop = sql1.length - 2)) + val sql2suffix = s"table(v1) $partition by col1 $order by col2 asc" + val sql2 = s"select * from my_tvf(arg1 => $sql2suffix)" + checkError( + exception = parseException(sql2), + errorClass = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> message), + context = ExpectedContext( + fragment = sql2suffix, + start = startIndex, + stop = sql2.length - 2)) + val sql3suffix = s"table(v1) $partition by col1, col2 $order by col2 asc, col3 desc" + val sql3 = s"select * from my_tvf(arg1 => $sql3suffix)" + checkError( + exception = parseException(sql3), + errorClass = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> message), + context = ExpectedContext( + fragment = sql3suffix, + start = startIndex, + stop = sql3.length - 2)) + val sql4Suffix = s"table(select col1, col2, col3 from v2) $partition by col1, col2 " + + s"$order by col2 asc, col3 desc" + val sql4 = s"select * from my_tvf(arg1 => $sql4Suffix)" + checkError( + exception = parseException(sql4), + errorClass = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> message), + context = ExpectedContext( + fragment = sql4Suffix, + start = startIndex, + stop = sql4.length - 2)) + } + } + } + test("SPARK-32106: TRANSFORM plan") { // verify schema less assertEqual( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org