[ https://issues.apache.org/jira/browse/SPARK-33021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-33021: --------------------------------- Fix Version/s: 3.0.2 > Move functions related test cases into test_functions.py > -------------------------------------------------------- > > Key: SPARK-33021 > URL: https://issues.apache.org/jira/browse/SPARK-33021 > Project: Spark > Issue Type: Test > Components: PySpark > Affects Versions: 3.1.0 > Reporter: Hyukjin Kwon > Assignee: Hyukjin Kwon > Priority: Minor > Fix For: 3.0.2, 3.1.0 > > > Function related test cases should be located in {{test_functions.py}}. > However, some tests below are located in {{test_context.py}}. > {code} > def test_window_functions(self): > df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, > "2")], ["key", "value"]) > w = Window.partitionBy("value").orderBy("key") > from pyspark.sql import functions as F > sel = df.select(df.value, df.key, > F.max("key").over(w.rowsBetween(0, 1)), > F.min("key").over(w.rowsBetween(0, 1)), > F.count("key").over(w.rowsBetween(float('-inf'), > float('inf'))), > F.row_number().over(w), > F.rank().over(w), > F.dense_rank().over(w), > F.ntile(2).over(w), > F.nth_value("key", 2)) > rs = sorted(sel.collect()) > expected = [ > ("1", 1, 1, 1, 1, 1, 1, 1, 1), > ("2", 1, 1, 1, 3, 1, 1, 1, 1), > ("2", 1, 2, 1, 3, 2, 1, 1, 1), > ("2", 2, 2, 2, 3, 3, 3, 2, 2) > ] > for r, ex in zip(rs, expected): > self.assertEqual(tuple(r), ex[:len(r)]) > def test_window_functions_without_partitionBy(self): > df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, > "2")], ["key", "value"]) > w = Window.orderBy("key", df.value) > from pyspark.sql import functions as F > sel = df.select(df.value, df.key, > F.max("key").over(w.rowsBetween(0, 1)), > F.min("key").over(w.rowsBetween(0, 1)), > F.count("key").over(w.rowsBetween(float('-inf'), > float('inf'))), > F.row_number().over(w), > F.rank().over(w), > F.dense_rank().over(w), > F.ntile(2).over(w)) > rs = sorted(sel.collect()) > expected = [ > ("1", 1, 1, 1, 4, 1, 1, 1, 1), > ("2", 1, 1, 1, 4, 2, 2, 2, 1), > ("2", 1, 2, 1, 4, 3, 2, 2, 2), > ("2", 2, 2, 2, 4, 4, 4, 3, 2) > ] > for r, ex in zip(rs, expected): > self.assertEqual(tuple(r), ex[:len(r)]) > def test_window_functions_cumulative_sum(self): > df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", > "value"]) > from pyspark.sql import functions as F > # Test cumulative sum > sel = df.select( > df.key, > > F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 0))) > rs = sorted(sel.collect()) > expected = [("one", 1), ("two", 3)] > for r, ex in zip(rs, expected): > self.assertEqual(tuple(r), ex[:len(r)]) > # Test boundary values less than JVM's Long.MinValue and make sure we > don't overflow > sel = df.select( > df.key, > F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding > - 1, 0))) > rs = sorted(sel.collect()) > expected = [("one", 1), ("two", 3)] > for r, ex in zip(rs, expected): > self.assertEqual(tuple(r), ex[:len(r)]) > # Test boundary values greater than JVM's Long.MaxValue and make sure > we don't overflow > frame_end = Window.unboundedFollowing + 1 > sel = df.select( > df.key, > F.sum(df.value).over(Window.rowsBetween(Window.currentRow, > frame_end))) > rs = sorted(sel.collect()) > expected = [("one", 3), ("two", 2)] > for r, ex in zip(rs, expected): > self.assertEqual(tuple(r), ex[:len(r)]) > def test_collect_functions(self): > df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, > "2")], ["key", "value"]) > from pyspark.sql import functions > self.assertEqual( > > sorted(df.select(functions.collect_set(df.key).alias('r')).collect()[0].r), > [1, 2]) > self.assertEqual( > > sorted(df.select(functions.collect_list(df.key).alias('r')).collect()[0].r), > [1, 1, 1, 2]) > self.assertEqual( > > sorted(df.select(functions.collect_set(df.value).alias('r')).collect()[0].r), > ["1", "2"]) > self.assertEqual( > > sorted(df.select(functions.collect_list(df.value).alias('r')).collect()[0].r), > ["1", "2", "2", "2"]) > def test_datetime_functions(self): > from pyspark.sql import functions > from datetime import date > df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol") > parse_result = > df.select(functions.to_date(functions.col("dateCol"))).first() > self.assertEquals(date(2017, 1, 22), parse_result['to_date(dateCol)']) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org