This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 616c492aa1f [SPARK-43884] Param markers in DDL 616c492aa1f is described below commit 616c492aa1f4915e85b7f153d31ec7cdfe02202a Author: srielau <se...@rielau.com> AuthorDate: Sat Jun 10 22:03:14 2023 -0700 [SPARK-43884] Param markers in DDL ### What changes were proposed in this pull request? In this change we allow parameter markers (:parm) to be used in various DDL statements, specifically as input to the IDENTIFIER-clause to allow templating of statements such as CREATE TABLE IDENTIFIER(:mytab). We block parameter markers from view bodies to prevent them from appearing in the persisted view definition. ### Why are the changes needed? This change is needed to allow exploitation of the IDENTIFIER clause in DDL statements ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new tests to the parameter suite. Closes #41395 from srielau/SPARK-43884-param-markers-in-ddl. Lead-authored-by: srielau <se...@rielau.com> Co-authored-by: Gengliang Wang <gengli...@apache.org> Co-authored-by: Serge Rielau <srie...@users.noreply.github.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- core/src/main/resources/error/error-classes.json | 2 +- .../spark/sql/catalyst/analysis/parameters.scala | 19 +-------- .../spark/sql/errors/QueryParsingErrors.scala | 9 ++++ .../spark/sql/execution/SparkSqlParser.scala | 28 ++++++++++++- .../analyzer-results/identifier-clause.sql.out | 10 ++--- .../sql-tests/inputs/identifier-clause.sql | 2 +- .../sql-tests/results/identifier-clause.sql.out | 10 ++--- .../org/apache/spark/sql/ParametersSuite.scala | 49 ++++++++++++++++++++-- 8 files changed, 92 insertions(+), 37 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 8c3c076ce74..7b39ab7266c 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -2335,7 +2335,7 @@ }, "PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT" : { "message" : [ - "Parameter markers in unexpected statement: <statement>. Parameter markers must only be used in a query, or DML statement." + "Parameter markers are not allowed in <statement>." ] }, "PARTITION_WITH_NESTED_COLUMN_IS_UNSUPPORTED" : { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index 29c36300673..2a31e90465c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, LeafExpression, Literal, SubqueryExpression, Unevaluable} -import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, InsertIntoStatement, LogicalPlan, MergeIntoTable, UnaryNode, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} import org.apache.spark.sql.errors.QueryErrorsBase @@ -72,23 +72,6 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { // We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE // relations are not children of `UnresolvedWith`. case p @ ParameterizedQuery(child, args) if !child.containsPattern(UNRESOLVED_WITH) => - // Some commands may store the original SQL text, like CREATE VIEW, GENERATED COLUMN, etc. - // We can't store the original SQL text with parameters, as we don't store the arguments and - // are not able to resolve it after parsing it back. Since parameterized query is mostly - // used to avoid SQL injection for SELECT queries, we simply forbid non-DML commands here. - child match { - case _: InsertIntoStatement => // OK - case _: UpdateTable => // OK - case _: DeleteFromTable => // OK - case _: MergeIntoTable => // OK - case cmd: Command => - child.failAnalysis( - errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT", - messageParameters = Map("statement" -> cmd.nodeName) - ) - case _ => // OK - } - args.find(!_._2.isInstanceOf[Literal]).foreach { case (name, expr) => expr.failAnalysis( errorClass = "INVALID_SQL_ARG", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index 0eafd2bf278..d2831f27e37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -530,6 +530,15 @@ private[sql] object QueryParsingErrors extends QueryErrorsBase { new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx) } + def parameterMarkerNotAllowed(statement: String, origin: Origin): Throwable = { + new ParseException( + command = origin.sqlText, + start = origin, + stop = origin, + errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT", + messageParameters = Map("statement" -> statement)) + } + def defineTempViewWithIfNotExistsError(ctx: CreateViewContext): Throwable = { new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0053", ctx) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c59444f3661..e3ae1b83a16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.errors.QueryParsingErrors import org.apache.spark.sql.execution.command._ @@ -440,6 +441,21 @@ class SparkSqlAstBuilder extends AstBuilder { } + private def checkInvalidParameter(plan: LogicalPlan, statement: String): + Unit = { + plan.foreach { p => + p.expressions.foreach { expr => + if (expr.containsPattern(PARAMETER)) { + throw QueryParsingErrors.parameterMarkerNotAllowed(statement, p.origin) + } + } + } + plan.children.foreach(p => checkInvalidParameter(p, statement)) + plan.innerChildren.collect { + case child: LogicalPlan => checkInvalidParameter(child, statement) + } + } + /** * Create or replace a view. This creates a [[CreateViewCommand]]. * @@ -488,6 +504,14 @@ class SparkSqlAstBuilder extends AstBuilder { } else { LocalTempView } + val qPlan: LogicalPlan = plan(ctx.query) + + // Disallow parameter markers in the body of the view. + // We need this limitation because we store the original query text, pre substitution. + // To lift this we would need to reconstitute the body with parameter markers replaced with the + // values given at CREATE VIEW time, or we would need to store the parameter values alongside + // the text. + checkInvalidParameter(qPlan, "CREATE VIEW body") if (viewType == PersistedView) { val originalText = source(ctx.query) assert(Option(originalText).isDefined, @@ -498,7 +522,7 @@ class SparkSqlAstBuilder extends AstBuilder { visitCommentSpecList(ctx.commentSpec()), properties, Some(originalText), - plan(ctx.query), + qPlan, ctx.EXISTS != null, ctx.REPLACE != null) } else { @@ -522,7 +546,7 @@ class SparkSqlAstBuilder extends AstBuilder { visitCommentSpecList(ctx.commentSpec()), properties, Option(source(ctx.query)), - plan(ctx.query), + qPlan, ctx.EXISTS != null, ctx.REPLACE != null, viewType = viewType) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out index 98a65374619..7c98d4d1670 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out @@ -116,15 +116,13 @@ org.apache.spark.SparkUnsupportedOperationException -- !query MERGE INTO IDENTIFIER('ta' || 'b') AS t USING IDENTIFIER('ta' || 'b') AS s ON s.c1 = t.c1 - WHEN MATCHED UPDATE SET c1 = 3 + WHEN MATCHED THEN UPDATE SET c1 = 3 -- !query analysis -org.apache.spark.sql.catalyst.parser.ParseException +org.apache.spark.SparkUnsupportedOperationException { - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", + "errorClass" : "_LEGACY_ERROR_TEMP_2096", "messageParameters" : { - "error" : "'UPDATE'", - "hint" : "" + "ddl" : "MERGE INTO TABLE" } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql index 6330c1798b9..93e67411172 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql @@ -24,7 +24,7 @@ INSERT INTO IDENTIFIER('ta' || 'b') VALUES(1); DELETE FROM IDENTIFIER('ta' || 'b') WHERE 1=0; UPDATE IDENTIFIER('ta' || 'b') SET c1 = 2; MERGE INTO IDENTIFIER('ta' || 'b') AS t USING IDENTIFIER('ta' || 'b') AS s ON s.c1 = t.c1 - WHEN MATCHED UPDATE SET c1 = 3; + WHEN MATCHED THEN UPDATE SET c1 = 3; SELECT * FROM IDENTIFIER('tab'); SELECT * FROM IDENTIFIER('s.tab'); SELECT * FROM IDENTIFIER('`s`.`tab`'); diff --git a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out index 8f44095f677..97220606317 100644 --- a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out @@ -127,17 +127,15 @@ org.apache.spark.SparkUnsupportedOperationException -- !query MERGE INTO IDENTIFIER('ta' || 'b') AS t USING IDENTIFIER('ta' || 'b') AS s ON s.c1 = t.c1 - WHEN MATCHED UPDATE SET c1 = 3 + WHEN MATCHED THEN UPDATE SET c1 = 3 -- !query schema struct<> -- !query output -org.apache.spark.sql.catalyst.parser.ParseException +org.apache.spark.SparkUnsupportedOperationException { - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", + "errorClass" : "_LEGACY_ERROR_TEMP_2096", "messageParameters" : { - "error" : "'UPDATE'", - "hint" : "" + "ddl" : "MERGE INTO TABLE" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index 2319761f55d..985d0373c4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -122,6 +122,15 @@ class ParametersSuite extends QueryTest with SharedSparkSession { Row(1)) } + test("parameter in identifier clause in DDL and utility commands") { + spark.sql("CREATE VIEW IDENTIFIER(:p1)(c1) AS SELECT 1", args = Map("p1" -> "v")) + spark.sql("ALTER VIEW IDENTIFIER(:p1) AS SELECT 2 AS c1", args = Map("p1" -> "v")) + checkAnswer( + spark.sql("SHOW COLUMNS FROM IDENTIFIER(:p1)", args = Map("p1" -> "v")), + Row("c1")) + spark.sql("DROP VIEW IDENTIFIER(:p1)", args = Map("p1" -> "v")) + } + test("parameters in INSERT") { withTable("t") { sql("CREATE TABLE t (col INT) USING json") @@ -130,7 +139,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession { } } - test("parameters not allowed in DDL commands") { + test("parameters not allowed in view body ") { val sqlText = "CREATE VIEW v AS SELECT :p AS p" val args = Map("p" -> 1) checkError( @@ -138,9 +147,43 @@ class ParametersSuite extends QueryTest with SharedSparkSession { spark.sql(sqlText, args) }, errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT", - parameters = Map("statement" -> "CreateView"), + parameters = Map("statement" -> "CREATE VIEW body"), + context = ExpectedContext( + fragment = sqlText, + start = 0, + stop = sqlText.length - 1)) + } + + test("parameters not allowed in view body - WITH and scalar subquery") { + val sqlText = "CREATE VIEW v AS WITH cte(a) AS (SELECT (SELECT :p) AS a) SELECT a FROM cte" + val args = Map("p" -> 1) + checkError( + exception = intercept[AnalysisException] { + spark.sql(sqlText, args) + }, + errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT", + parameters = Map("statement" -> "CREATE VIEW body"), + context = ExpectedContext( + fragment = sqlText, + start = 0, + stop = sqlText.length - 1)) + } + + test("parameters not allowed in view body - nested WITH and EXIST") { + val sqlText = + """CREATE VIEW v AS + |SELECT a as a + |FROM (WITH cte(a) AS (SELECT CASE WHEN EXISTS(SELECT :p) THEN 1 END AS a) + |SELECT a FROM cte)""".stripMargin + val args = Map("p" -> 1) + checkError( + exception = intercept[AnalysisException] { + spark.sql(sqlText, args) + }, + errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT", + parameters = Map("statement" -> "CREATE VIEW body"), context = ExpectedContext( - fragment = "CREATE VIEW v AS SELECT :p AS p", + fragment = sqlText, start = 0, stop = sqlText.length - 1)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org