This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a1c727f3867 [SPARK-41666][PYTHON] Support parameterized SQL by `sql()`
a1c727f3867 is described below

commit a1c727f386724156f680953fa34ec51bb35348a4
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Fri Dec 23 12:30:30 2022 +0300

    [SPARK-41666][PYTHON] Support parameterized SQL by `sql()`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to extend the `sql()` method in PySpark to support 
parameterized SQL queries, see https://github.com/apache/spark/pull/38864, and 
add new parameter - `args` of the type `Dict[str, str]`. This parameter maps 
named parameters that can occur in the input SQL query to SQL literals like 1, 
INTERVAL '1-1' YEAR TO MONTH, DATE'2022-12-22' (see [the doc 
](https://spark.apache.org/docs/latest/sql-ref-literals.html)of supported 
literals).
    
    For example:
    ```python
        >>> spark.sql("SELECT * FROM range(10) WHERE id > :minId", args = 
{"minId" : "7"})
           id
        0   8
        1   9
    ```
    
    Closes #39159
    
    ### Why are the changes needed?
    To achieve feature parity with Scala/Java API, and provide PySpark users 
the same feature.
    
    ### Does this PR introduce _any_ user-facing change?
    No, it shouldn't.
    
    ### How was this patch tested?
    Checked the examples locally, and running the tests:
    ```
    $ python/run-tests --modules=pyspark-sql --parallelism=1
    ```
    
    Closes #39183 from MaxGekk/parameterized-sql-pyspark-dict.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   |  6 +++---
 .../source/migration_guide/pyspark_3.3_to_3.4.rst  |  2 ++
 python/pyspark/pandas/sql_formatter.py             | 20 +++++++++++++++++--
 python/pyspark/sql/session.py                      | 23 ++++++++++++++++++----
 4 files changed, 42 insertions(+), 9 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index ff235e80dbb..95db9005d02 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -813,7 +813,7 @@
   },
   "INVALID_SQL_ARG" : {
     "message" : [
-      "The argument <name> of `sql()` is invalid. Consider to replace it by a 
SQL literal statement."
+      "The argument <name> of `sql()` is invalid. Consider to replace it by a 
SQL literal."
     ]
   },
   "INVALID_SQL_SYNTAX" : {
@@ -1164,7 +1164,7 @@
   },
   "UNBOUND_SQL_PARAMETER" : {
     "message" : [
-      "Found the unbound parameter: <name>. Please, fix `args` and provide a 
mapping of the parameter to a SQL literal statement."
+      "Found the unbound parameter: <name>. Please, fix `args` and provide a 
mapping of the parameter to a SQL literal."
     ]
   },
   "UNCLOSED_BRACKETED_COMMENT" : {
@@ -5225,4 +5225,4 @@
       "grouping() can only be used with GroupingSets/Cube/Rollup"
     ]
   }
-}
\ No newline at end of file
+}
diff --git a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst 
b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
index b3baa8345aa..ca942c54979 100644
--- a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
+++ b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
@@ -39,3 +39,5 @@ Upgrading from PySpark 3.3 to 3.4
 * In Spark 3.4, the ``Series.concat`` sort parameter will be respected to 
follow pandas 1.4 behaviors.
 
 * In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace 
pre-existing arrays, which will NOT be over-written to follow pandas 1.4 
behaviors.
+
+* In Spark 3.4, the ``SparkSession.sql`` and the Pandas on Spark API ``sql`` 
have got new parameter ``args`` which provides binding of named parameters to 
their SQL literals.
diff --git a/python/pyspark/pandas/sql_formatter.py 
b/python/pyspark/pandas/sql_formatter.py
index 45c615161d9..9103366c192 100644
--- a/python/pyspark/pandas/sql_formatter.py
+++ b/python/pyspark/pandas/sql_formatter.py
@@ -17,7 +17,7 @@
 
 import os
 import string
-from typing import Any, Optional, Union, List, Sequence, Mapping, Tuple
+from typing import Any, Dict, Optional, Union, List, Sequence, Mapping, Tuple
 import uuid
 import warnings
 
@@ -43,6 +43,7 @@ _CAPTURE_SCOPES = 3
 def sql(
     query: str,
     index_col: Optional[Union[str, List[str]]] = None,
+    args: Dict[str, str] = {},
     **kwargs: Any,
 ) -> DataFrame:
     """
@@ -57,6 +58,8 @@ def sql(
         * pandas Series
         * string
 
+    Also the method can bind named parameters to SQL literals from `args`.
+
     Parameters
     ----------
     query : str
@@ -99,6 +102,12 @@ def sql(
             e      f       3  6
 
             Also note that the index name(s) should be matched to the existing 
name.
+    args : dict
+        A dictionary of named parameters that begin from the `:` marker and
+        their SQL literals for substituting.
+
+        .. versionadded:: 3.4.0
+
     kwargs
         other variables that the user want to set that can be referenced in 
the query
 
@@ -152,6 +161,13 @@ def sql(
     0  1
     1  2
     2  3
+
+    And substitude named parameters with the `:` prefix by SQL literals.
+
+    >>> ps.sql("SELECT * FROM range(10) WHERE id > :bound1", 
args={"bound1":"7"})
+       id
+    0   8
+    1   9
     """
     if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1":
         from pyspark.pandas import sql_processor
@@ -166,7 +182,7 @@ def sql(
     session = default_session()
     formatter = PandasSQLStringFormatter(session)
     try:
-        sdf = session.sql(formatter.format(query, **kwargs))
+        sdf = session.sql(formatter.format(query, **kwargs), args)
     finally:
         formatter.clear()
 
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index ff1f1cedd76..de93b34a272 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -1293,20 +1293,26 @@ class SparkSession(SparkConversionMixin):
         df._schema = struct
         return df
 
-    def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
+    def sql(self, sqlQuery: str, args: Dict[str, str] = {}, **kwargs: Any) -> 
DataFrame:
         """Returns a :class:`DataFrame` representing the result of the given 
query.
         When ``kwargs`` is specified, this method formats the given string by 
using the Python
-        standard formatter.
+        standard formatter. The method binds named parameters to SQL literals 
from `args`.
 
         .. versionadded:: 2.0.0
 
         .. versionchanged:: 3.4.0
-            Support Spark Connect.
+            Support Spark Connect and parameterized SQL.
 
         Parameters
         ----------
         sqlQuery : str
             SQL query string.
+        args : dict
+            A dictionary of named parameters that begin from the `:` marker and
+            their SQL literals for substituting.
+
+            .. versionadded:: 3.4.0
+
         kwargs : dict
             Other variables that the user wants to set that can be referenced 
in the query
 
@@ -1380,13 +1386,22 @@ class SparkSession(SparkConversionMixin):
         |  2|  4|
         |  3|  6|
         +---+---+
+
+        And substitude named parameters with the `:` prefix by SQL literals.
+
+        >>> spark.sql("SELECT * FROM {df} WHERE {df[B]} > :minB", {"minB" : 
"5"}, df=mydf).show()
+        +---+---+
+        |  A|  B|
+        +---+---+
+        |  3|  6|
+        +---+---+
         """
 
         formatter = SQLStringFormatter(self)
         if len(kwargs) > 0:
             sqlQuery = formatter.format(sqlQuery, **kwargs)
         try:
-            return DataFrame(self._jsparkSession.sql(sqlQuery), self)
+            return DataFrame(self._jsparkSession.sql(sqlQuery, args), self)
         finally:
             if len(kwargs) > 0:
                 formatter.clear()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to