This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 110c0575448 [SPARK-41503][CONNECT][PYTHON] Implement Partition 
Transformation Functions
110c0575448 is described below

commit 110c0575448c63c1d40e670e8e27b7ee6fb74907
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Sat Dec 24 13:11:12 2022 -0800

    [SPARK-41503][CONNECT][PYTHON] Implement Partition Transformation Functions
    
    ### What changes were proposed in this pull request?
    Implement [Partition Transformation 
Functions](https://github.com/apache/spark/blob/master/python/docs/source/reference/pyspark.sql/functions.rst#partition-transformation-functions)
    
    ### Why are the changes needed?
    for API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ### How was this patch tested?
    existing UT
    
    Closes #39203 from zhengruifeng/connect_function_transform_partition.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../sql/connect/planner/SparkConnectPlanner.scala  | 21 +++++++++++
 python/pyspark/sql/connect/functions.py            | 44 ++++++++++++++++++++++
 2 files changed, 65 insertions(+)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dce3a8c8e55..cb8d30b180c 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -646,6 +646,27 @@ class SparkConnectPlanner(session: SparkSession) {
         }
         Some(NthValue(children(0), children(1), ignoreNulls))
 
+      case "bucket" if fun.getArgumentsCount == 2 =>
+        val children = 
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        (children.head, children.last) match {
+          case (numBuckets: Literal, child) if numBuckets.dataType == 
IntegerType =>
+            Some(Bucket(numBuckets, child))
+          case (other, _) =>
+            throw InvalidPlanInput(s"numBuckets should be a literal integer, 
but got $other")
+        }
+
+      case "years" if fun.getArgumentsCount == 1 =>
+        Some(Years(transformExpression(fun.getArguments(0))))
+
+      case "months" if fun.getArgumentsCount == 1 =>
+        Some(Months(transformExpression(fun.getArguments(0))))
+
+      case "days" if fun.getArgumentsCount == 1 =>
+        Some(Days(transformExpression(fun.getArguments(0))))
+
+      case "hours" if fun.getArgumentsCount == 1 =>
+        Some(Hours(transformExpression(fun.getArguments(0))))
+
       case _ => None
     }
   }
diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index c2b1c7d61c8..407e7536f03 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2070,6 +2070,50 @@ def timestamp_seconds(col: "ColumnOrName") -> Column:
 timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__
 
 
+# Partition Transformation Functions
+
+
+def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
+    if isinstance(numBuckets, int):
+        _numBuckets = lit(numBuckets)
+    elif isinstance(numBuckets, Column):
+        _numBuckets = numBuckets
+    else:
+        raise TypeError("numBuckets should be a Column or an int, got 
{}".format(type(numBuckets)))
+
+    return _invoke_function("bucket", _numBuckets, _to_col(col))
+
+
+bucket.__doc__ = pysparkfuncs.bucket.__doc__
+
+
+def years(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("years", col)
+
+
+years.__doc__ = pysparkfuncs.years.__doc__
+
+
+def months(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("months", col)
+
+
+months.__doc__ = pysparkfuncs.months.__doc__
+
+
+def days(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("days", col)
+
+
+days.__doc__ = pysparkfuncs.days.__doc__
+
+
+def hours(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("hours", col)
+
+
+hours.__doc__ = pysparkfuncs.hours.__doc__
+
 # Misc Functions
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to