Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21650#discussion_r199204000 --- Diff: python/pyspark/sql/tests.py --- @@ -5060,6 +5049,138 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) + def test_mixed_udf(self): + import pandas as pd + from pyspark.sql.functions import udf, pandas_udf + + df = self.spark.range(0, 1).toDF('v') + + @udf('int') + def f1(x): + assert type(x) == int + return x + 1 + + @pandas_udf('int') + def f2(x): + assert type(x) == pd.Series + return x + 10 + + @udf('int') + def f3(x): + assert type(x) == int + return x + 100 + + @pandas_udf('int') + def f4(x): + assert type(x) == pd.Series + return x + 1000 + + # Test mixed udfs in a single projection + df1 = df.withColumn('f1', f1(df['v'])) + df1 = df1.withColumn('f2', f2(df1['v'])) + df1 = df1.withColumn('f3', f3(df1['v'])) + df1 = df1.withColumn('f4', f4(df1['v'])) + df1 = df1.withColumn('f2_f1', f2(df1['f1'])) + df1 = df1.withColumn('f3_f1', f3(df1['f1'])) + df1 = df1.withColumn('f4_f1', f4(df1['f1'])) + df1 = df1.withColumn('f3_f2', f3(df1['f2'])) + df1 = df1.withColumn('f4_f2', f4(df1['f2'])) + df1 = df1.withColumn('f4_f3', f4(df1['f3'])) + df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1'])) + df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1'])) + df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1'])) + df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2'])) + df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1'])) + + # Test mixed udfs in a single expression + df2 = df.withColumn('f1', f1(df['v'])) + df2 = df2.withColumn('f2', f2(df['v'])) + df2 = df2.withColumn('f3', f3(df['v'])) + df2 = df2.withColumn('f4', f4(df['v'])) + df2 = df2.withColumn('f2_f1', f2(f1(df['v']))) + df2 = df2.withColumn('f3_f1', f3(f1(df['v']))) + df2 = df2.withColumn('f4_f1', f4(f1(df['v']))) + df2 = df2.withColumn('f3_f2', f3(f2(df['v']))) + df2 = df2.withColumn('f4_f2', f4(f2(df['v']))) + df2 = df2.withColumn('f4_f3', f4(f3(df['v']))) + df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v'])))) + df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v'])))) + df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v'])))) + df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v']))))) + + df3 = df.withColumn('f1', df['v'] + 1) + df3 = df3.withColumn('f2', df['v'] + 10) + df3 = df3.withColumn('f3', df['v'] + 100) + df3 = df3.withColumn('f4', df['v'] + 1000) + df3 = df3.withColumn('f2_f1', df['v'] + 11) + df3 = df3.withColumn('f3_f1', df['v'] + 101) + df3 = df3.withColumn('f4_f1', df['v'] + 1001) + df3 = df3.withColumn('f3_f2', df['v'] + 110) + df3 = df3.withColumn('f4_f2', df['v'] + 1010) + df3 = df3.withColumn('f4_f3', df['v'] + 1100) + df3 = df3.withColumn('f3_f2_f1', df['v'] + 111) + df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011) + df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101) + df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110) + df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111) + + self.assertEquals(df3.collect(), df1.collect()) + self.assertEquals(df3.collect(), df2.collect()) + + def test_mixed_udf_and_sql(self): + import pandas as pd + from pyspark.sql.functions import udf, pandas_udf + + df = self.spark.range(0, 1).toDF('v') + + @udf('int') + def f1(x): + assert type(x) == int + return x + 1 + + def f2(x): --- End diff -- Added comments in test
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org