This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a1c727f3867 [SPARK-41666][PYTHON] Support parameterized SQL by `sql()` a1c727f3867 is described below commit a1c727f386724156f680953fa34ec51bb35348a4 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Fri Dec 23 12:30:30 2022 +0300 [SPARK-41666][PYTHON] Support parameterized SQL by `sql()` ### What changes were proposed in this pull request? In the PR, I propose to extend the `sql()` method in PySpark to support parameterized SQL queries, see https://github.com/apache/spark/pull/38864, and add new parameter - `args` of the type `Dict[str, str]`. This parameter maps named parameters that can occur in the input SQL query to SQL literals like 1, INTERVAL '1-1' YEAR TO MONTH, DATE'2022-12-22' (see [the doc ](https://spark.apache.org/docs/latest/sql-ref-literals.html)of supported literals). For example: ```python >>> spark.sql("SELECT * FROM range(10) WHERE id > :minId", args = {"minId" : "7"}) id 0 8 1 9 ``` Closes #39159 ### Why are the changes needed? To achieve feature parity with Scala/Java API, and provide PySpark users the same feature. ### Does this PR introduce _any_ user-facing change? No, it shouldn't. ### How was this patch tested? Checked the examples locally, and running the tests: ``` $ python/run-tests --modules=pyspark-sql --parallelism=1 ``` Closes #39183 from MaxGekk/parameterized-sql-pyspark-dict. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 6 +++--- .../source/migration_guide/pyspark_3.3_to_3.4.rst | 2 ++ python/pyspark/pandas/sql_formatter.py | 20 +++++++++++++++++-- python/pyspark/sql/session.py | 23 ++++++++++++++++++---- 4 files changed, 42 insertions(+), 9 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index ff235e80dbb..95db9005d02 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -813,7 +813,7 @@ }, "INVALID_SQL_ARG" : { "message" : [ - "The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal statement." + "The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal." ] }, "INVALID_SQL_SYNTAX" : { @@ -1164,7 +1164,7 @@ }, "UNBOUND_SQL_PARAMETER" : { "message" : [ - "Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal statement." + "Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal." ] }, "UNCLOSED_BRACKETED_COMMENT" : { @@ -5225,4 +5225,4 @@ "grouping() can only be used with GroupingSets/Cube/Rollup" ] } -} \ No newline at end of file +} diff --git a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst index b3baa8345aa..ca942c54979 100644 --- a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst +++ b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst @@ -39,3 +39,5 @@ Upgrading from PySpark 3.3 to 3.4 * In Spark 3.4, the ``Series.concat`` sort parameter will be respected to follow pandas 1.4 behaviors. * In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace pre-existing arrays, which will NOT be over-written to follow pandas 1.4 behaviors. + +* In Spark 3.4, the ``SparkSession.sql`` and the Pandas on Spark API ``sql`` have got new parameter ``args`` which provides binding of named parameters to their SQL literals. diff --git a/python/pyspark/pandas/sql_formatter.py b/python/pyspark/pandas/sql_formatter.py index 45c615161d9..9103366c192 100644 --- a/python/pyspark/pandas/sql_formatter.py +++ b/python/pyspark/pandas/sql_formatter.py @@ -17,7 +17,7 @@ import os import string -from typing import Any, Optional, Union, List, Sequence, Mapping, Tuple +from typing import Any, Dict, Optional, Union, List, Sequence, Mapping, Tuple import uuid import warnings @@ -43,6 +43,7 @@ _CAPTURE_SCOPES = 3 def sql( query: str, index_col: Optional[Union[str, List[str]]] = None, + args: Dict[str, str] = {}, **kwargs: Any, ) -> DataFrame: """ @@ -57,6 +58,8 @@ def sql( * pandas Series * string + Also the method can bind named parameters to SQL literals from `args`. + Parameters ---------- query : str @@ -99,6 +102,12 @@ def sql( e f 3 6 Also note that the index name(s) should be matched to the existing name. + args : dict + A dictionary of named parameters that begin from the `:` marker and + their SQL literals for substituting. + + .. versionadded:: 3.4.0 + kwargs other variables that the user want to set that can be referenced in the query @@ -152,6 +161,13 @@ def sql( 0 1 1 2 2 3 + + And substitude named parameters with the `:` prefix by SQL literals. + + >>> ps.sql("SELECT * FROM range(10) WHERE id > :bound1", args={"bound1":"7"}) + id + 0 8 + 1 9 """ if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1": from pyspark.pandas import sql_processor @@ -166,7 +182,7 @@ def sql( session = default_session() formatter = PandasSQLStringFormatter(session) try: - sdf = session.sql(formatter.format(query, **kwargs)) + sdf = session.sql(formatter.format(query, **kwargs), args) finally: formatter.clear() diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index ff1f1cedd76..de93b34a272 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -1293,20 +1293,26 @@ class SparkSession(SparkConversionMixin): df._schema = struct return df - def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame: + def sql(self, sqlQuery: str, args: Dict[str, str] = {}, **kwargs: Any) -> DataFrame: """Returns a :class:`DataFrame` representing the result of the given query. When ``kwargs`` is specified, this method formats the given string by using the Python - standard formatter. + standard formatter. The method binds named parameters to SQL literals from `args`. .. versionadded:: 2.0.0 .. versionchanged:: 3.4.0 - Support Spark Connect. + Support Spark Connect and parameterized SQL. Parameters ---------- sqlQuery : str SQL query string. + args : dict + A dictionary of named parameters that begin from the `:` marker and + their SQL literals for substituting. + + .. versionadded:: 3.4.0 + kwargs : dict Other variables that the user wants to set that can be referenced in the query @@ -1380,13 +1386,22 @@ class SparkSession(SparkConversionMixin): | 2| 4| | 3| 6| +---+---+ + + And substitude named parameters with the `:` prefix by SQL literals. + + >>> spark.sql("SELECT * FROM {df} WHERE {df[B]} > :minB", {"minB" : "5"}, df=mydf).show() + +---+---+ + | A| B| + +---+---+ + | 3| 6| + +---+---+ """ formatter = SQLStringFormatter(self) if len(kwargs) > 0: sqlQuery = formatter.format(sqlQuery, **kwargs) try: - return DataFrame(self._jsparkSession.sql(sqlQuery), self) + return DataFrame(self._jsparkSession.sql(sqlQuery, args), self) finally: if len(kwargs) > 0: formatter.clear() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org