This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 38a3d326bd62 [SPARK-53417][PYTHON][TESTS][FOLLOW-UP] Add more tests 
for aggregation and window
38a3d326bd62 is described below

commit 38a3d326bd629e2c63065af11e8418a4f252ac87
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Mon Sep 1 07:59:07 2025 +0900

    [SPARK-53417][PYTHON][TESTS][FOLLOW-UP] Add more tests for aggregation and 
window
    
    ### What changes were proposed in this pull request?
    followup of https://github.com/apache/spark/pull/52162
    
    ### Why are the changes needed?
    to improve test coverage
    
    ### Does this PR introduce _any_ user-facing change?
    no, test-only
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #52179 from zhengruifeng/arrow_agg_time.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/tests/arrow/test_arrow_udf_grouped_agg.py  | 29 ++++++++++++++++++++
 .../sql/tests/arrow/test_arrow_udf_window.py       | 31 ++++++++++++++++++++++
 2 files changed, 60 insertions(+)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py
index c0b87a3d4c75..3545801c4b5a 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py
@@ -778,6 +778,35 @@ class GroupedAggArrowUDFTestsMixin:
 
         self.assertEqual(expected1.collect(), result1.collect())
 
+    def test_time_min(self):
+        import pyarrow as pa
+
+        df = self.spark.sql(
+            """
+            SELECT * FROM VALUES
+            (1, TIME '12:34:56'),
+            (1, TIME '1:2:3'),
+            (2, TIME '0:58:59'),
+            (2, TIME '10:58:59'),
+            (2, TIME '10:00:03')
+            AS tab(i, t)
+            """
+        )
+
+        @arrow_udf("time", ArrowUDFType.GROUPED_AGG)
+        def agg_min_time(v):
+            assert isinstance(v, pa.Array)
+            assert isinstance(v, pa.Time64Array)
+            return pa.compute.min(v)
+
+        expected1 = df.select(sf.min("t").alias("res"))
+        result1 = df.select(agg_min_time("t").alias("res"))
+        self.assertEqual(expected1.collect(), result1.collect())
+
+        expected2 = df.groupby("i").agg(sf.min("t").alias("res")).sort("i")
+        result2 = df.groupby("i").agg(agg_min_time("t").alias("res")).sort("i")
+        self.assertEqual(expected2.collect(), result2.collect())
+
     def test_return_type_coercion(self):
         import pyarrow as pa
 
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py
index a66ccc0bd717..fde9d7243375 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py
@@ -652,6 +652,37 @@ class WindowArrowUDFTestsMixin:
 
         self.assertEqual(expected1.collect(), result1.collect())
 
+    def test_time_min(self):
+        import pyarrow as pa
+
+        df = self.spark.sql(
+            """
+            SELECT * FROM VALUES
+            (1, TIME '12:34:56'),
+            (1, TIME '1:2:3'),
+            (2, TIME '0:58:59'),
+            (2, TIME '10:58:59'),
+            (2, TIME '10:00:03')
+            AS tab(i, t)
+            """
+        )
+        w1 = Window.partitionBy("i").orderBy("t")
+        w2 = Window.orderBy("t")
+
+        @arrow_udf("time", ArrowUDFType.GROUPED_AGG)
+        def agg_min_time(v):
+            assert isinstance(v, pa.Array)
+            assert isinstance(v, pa.Time64Array)
+            return pa.compute.min(v)
+
+        expected1 = df.withColumn("res", sf.min("t").over(w1))
+        result1 = df.withColumn("res", agg_min_time("t").over(w1))
+        self.assertEqual(expected1.collect(), result1.collect())
+
+        expected2 = df.withColumn("res", sf.min("t").over(w2))
+        result2 = df.withColumn("res", agg_min_time("t").over(w2))
+        self.assertEqual(expected2.collect(), result2.collect())
+
     def test_return_type_coercion(self):
         import pyarrow as pa
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to