Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r234790403 --- Diff: python/pyspark/sql/tests.py --- @@ -7064,12 +7098,104 @@ def test_invalid_args(self): foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP) df.withColumn('v2', foo_udf(df['v']).over(w)) - with QuietTest(self.sc): - with self.assertRaisesRegexp( - AnalysisException, - '.*Only unbounded window frame is supported.*'): - df.withColumn('mean_v', mean_udf(df['v']).over(ow)) + def test_bounded_simple(self): + from pyspark.sql.functions import mean, max, min, count + + df = self.data + w1 = self.sliding_row_window + w2 = self.shrinking_range_window + + plus_one = self.python_plus_one + count_udf = self.pandas_agg_count_udf + mean_udf = self.pandas_agg_mean_udf + max_udf = self.pandas_agg_max_udf + min_udf = self.pandas_agg_min_udf + + result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1))\ + .withColumn('count_v', count_udf(df['v']).over(w2)) \ + .withColumn('max_v', max_udf(df['v']).over(w2)) \ + .withColumn('min_v', min_udf(df['v']).over(w1)) \ + + expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1))\ + .withColumn('count_v', count(df['v']).over(w2)) \ + .withColumn('max_v', max(df['v']).over(w2)) \ + .withColumn('min_v', min(df['v']).over(w1)) \ + + self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + + def test_growing_window(self): + from pyspark.sql.functions import mean + + df = self.data + w1 = self.growing_row_window + w2 = self.growing_range_window + mean_udf = self.pandas_agg_mean_udf + + result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ + .withColumn('m2', mean_udf(df['v']).over(w2)) + + expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ + .withColumn('m2', mean(df['v']).over(w2)) + + self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + + def test_sliding_window(self): + from pyspark.sql.functions import mean + + df = self.data + w1 = self.sliding_row_window + w2 = self.sliding_range_window + + mean_udf = self.pandas_agg_mean_udf + + result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ + .withColumn('m2', mean_udf(df['v']).over(w2)) + + expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ + .withColumn('m2', mean(df['v']).over(w2)) + + result1.show() + expected1.show() --- End diff -- Thanks! Removed
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org