Hi,

I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored
over non pandas udf per
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.


My data has about 250M records and the pandas udf code is like:

def pd_udf_func(data):
    return pd.DataFrame(["id"])

pd_udf = pandas_udf(
            pd_udf_func,
            returnType=("id int"),
            functionType=PandasUDFType.GROUPED_MAP
        )
df3 = df.groupBy("id").apply(pd_udf)
df3.explain()
"""
== Physical Plan ==
FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
+- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#9L, 200)
      +- *(1) Project [id#9L, id#9L, txt#10]
         +- Scan ExistingRDD[id#9L,txt#10]
"""

As you can see, this pandas udf does nothing but returning a row having a
pandas dataframe having None values. In reality, this pandas udf has
complicated logic (e.g. iterate through the pandas dataframe rows and do
some calculation). This simplification is to reduce noise caused by
application specific logic. This pandas udf takes hours to run using 10
executors (14 cores and 64G mem each). On the other hand, below non-pandas
udf can finish in minutes:

def udf_func(data_list):
    return "hello"

udf = udf(udf_func, StringType())
df2 =
df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
udf('txt1'))
df2.explain()
"""
== Physical Plan ==
*(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
+- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
   +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0,
0)])
      +- Exchange hashpartitioning(id#9L, 200)
         +- ObjectHashAggregate(keys=[id#9L],
functions=[partial_collect_list(txt#10, 0, 0)])
            +- Scan ExistingRDD[id#9L,txt#10]
"""

The physical plans show pandas udf uses sortAggregate (slower) while
non-pandas udf uses objectHashAggregate (faster).

Below is what I have tried to improve the performance of pandas udf but
none of them worked:
1. repartition before groupby. For example, df.repartition(140,
"id").groupBy("id").apply(pd_udf). 140 is the same as
spark.sql.shuffle.partitions.
I hope groupby can benefit from the repartition but according to the
execution plan the repartition seems to be ignored since groupby will do
partitioning itself.

2. although this slowness is more likely caused by pandas udf instead of
groupby, I still played with shuffle settings such as
spark.shuffle.compress=True,
spark.shuffle.spill.compress=True.

3. I played with serDe settings such as
spark.serializer=org.apache.spark.serializer.KryoSerializer.
Also I tried pyarrow settings such as spark.sql.execution.arrow.enabled=True
and spark.sql.execution.arrow.maxRecordsPerBatch=100000

4. I tried to replace the solution of "groupby + pandas udf " with
combineByKey, reduceByKey, repartition + mapPartition. But it is not easy
since the pandas udf has complicated logic.

My questions:

1. why pandas udf is so slow?
2. is there a way to improve the performance of pandas_udf?
3. in case it is a known issue of pandas udf, what other remedy I can use?
I guess I need to think harder on combineByKey, reduceByKey, repartition +
mapPartition. But want to know if I missed anything obvious.

Any clue is highly appreciated.

Thanks
Leon

Reply via email to