Re: pandas_udf is very slow

2020-04-06 Thread Gourav Sengupta
Hi Leon,

please refer to this link:
https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html

I have found using GROUP MAP to be a bit tricky, please refer to the
statement: "All data for a group is loaded into memory before the function
is applied. This can lead to out of memory exceptions, especially if the
group sizes are skewed. The configuration for maxRecordsPerBatch

is not applied on groups and it is up to you to ensure that the grouped
data will fit into the available memory."

Pandas UDF is definitely faster than the general Python UDFs'. But once
again all depends on the data volume against which you are testing, and the
way the UDF has been written.

Thanks and Regards,
Gourav


On Sun, Apr 5, 2020 at 8:28 AM Lian Jiang  wrote:

> Hi,
>
> I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is
> favored over non pandas udf per
> https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.
>
>
> My data has about 250M records and the pandas udf code is like:
>
> def pd_udf_func(data):
> return pd.DataFrame(["id"])
>
> pd_udf = pandas_udf(
> pd_udf_func,
> returnType=("id int"),
> functionType=PandasUDFType.GROUPED_MAP
> )
> df3 = df.groupBy("id").apply(pd_udf)
> df3.explain()
> """
> == Physical Plan ==
> FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
> +- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(id#9L, 200)
>   +- *(1) Project [id#9L, id#9L, txt#10]
>  +- Scan ExistingRDD[id#9L,txt#10]
> """
>
> As you can see, this pandas udf does nothing but returning a row having a
> pandas dataframe having None values. In reality, this pandas udf has
> complicated logic (e.g. iterate through the pandas dataframe rows and do
> some calculation). This simplification is to reduce noise caused by
> application specific logic. This pandas udf takes hours to run using 10
> executors (14 cores and 64G mem each). On the other hand, below non-pandas
> udf can finish in minutes:
>
> def udf_func(data_list):
> return "hello"
>
> udf = udf(udf_func, StringType())
> df2 =
> df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
> udf('txt1'))
> df2.explain()
> """
> == Physical Plan ==
> *(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
> +- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
>+- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0,
> 0)])
>   +- Exchange hashpartitioning(id#9L, 200)
>  +- ObjectHashAggregate(keys=[id#9L],
> functions=[partial_collect_list(txt#10, 0, 0)])
> +- Scan ExistingRDD[id#9L,txt#10]
> """
>
> The physical plans show pandas udf uses sortAggregate (slower) while
> non-pandas udf uses objectHashAggregate (faster).
>
> Below is what I have tried to improve the performance of pandas udf but
> none of them worked:
> 1. repartition before groupby. For example, df.repartition(140,
> "id").groupBy("id").apply(pd_udf). 140 is the same as 
> spark.sql.shuffle.partitions.
> I hope groupby can benefit from the repartition but according to the
> execution plan the repartition seems to be ignored since groupby will do
> partitioning itself.
>
> 2. although this slowness is more likely caused by pandas udf instead of
> groupby, I still played with shuffle settings such as 
> spark.shuffle.compress=True,
> spark.shuffle.spill.compress=True.
>
> 3. I played with serDe settings such as 
> spark.serializer=org.apache.spark.serializer.KryoSerializer.
> Also I tried pyarrow settings such as spark.sql.execution.arrow.enabled=True
> and spark.sql.execution.arrow.maxRecordsPerBatch=10
>
> 4. I tried to replace the solution of "groupby + pandas udf " with
> combineByKey, reduceByKey, repartition + mapPartition. But it is not easy
> since the pandas udf has complicated logic.
>
> My questions:
>
> 1. why pandas udf is so slow?
> 2. is there a way to improve the performance of pandas_udf?
> 3. in case it is a known issue of pandas udf, what other remedy I can use?
> I guess I need to think harder on combineByKey, reduceByKey, repartition +
> mapPartition. But want to know if I missed anything obvious.
>
> Any clue is highly appreciated.
>
> Thanks
> Leon
>
>
>
>
>


Re: pandas_udf is very slow

2020-04-05 Thread Lian Jiang
Thanks Silvio. I need grouped map pandas UDF which takes a spark data frame as 
the input and outputs a spark data frame having a different shape from input. 
Grouped map is kind of unique to pandas udf and I have trouble to find a 
similar non pandas udf for an apple to apple comparison. Let me know if you 
have better idea for investigating grouped map pandas udf slowness.

One potential work around could be grouping the 250M records by id. For each 
group, do groupby(‘id’).apply(pd_udf). Not sure which way is more promising 
compared with repartition + mapPartition, reduceByKey, combineByKey.

Appreciate any clue.

Sent from my iPhone

> On Apr 5, 2020, at 6:18 AM, Silvio Fiorito  
> wrote:
> 
> 
> Your 2 examples are doing different things.
>  
> The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an 
> aggregate.
>  
> I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your 
> result the same?
>  
> From: Lian Jiang 
> Date: Sunday, April 5, 2020 at 3:28 AM
> To: user 
> Subject: pandas_udf is very slow
>  
> Hi,
>  
> I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored 
> over non pandas udf per 
> https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.
>  
> My data has about 250M records and the pandas udf code is like:
>  
> def pd_udf_func(data):
> return pd.DataFrame(["id"])
> 
> pd_udf = pandas_udf(
> pd_udf_func,
> returnType=("id int"),
> functionType=PandasUDFType.GROUPED_MAP
> )
> df3 = df.groupBy("id").apply(pd_udf)
> df3.explain()
> """
> == Physical Plan ==
> FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
> +- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(id#9L, 200)
>   +- *(1) Project [id#9L, id#9L, txt#10]
>  +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> As you can see, this pandas udf does nothing but returning a row having a 
> pandas dataframe having None values. In reality, this pandas udf has 
> complicated logic (e.g. iterate through the pandas dataframe rows and do some 
> calculation). This simplification is to reduce noise caused by application 
> specific logic. This pandas udf takes hours to run using 10 executors (14 
> cores and 64G mem each). On the other hand, below non-pandas udf can finish 
> in minutes:
>  
> def udf_func(data_list):
> return "hello"
> 
> udf = udf(udf_func, StringType())
> df2 = 
> df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
>  udf('txt1'))
> df2.explain()
> """
> == Physical Plan ==
> *(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
> +- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
>+- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0, 
> 0)])
>   +- Exchange hashpartitioning(id#9L, 200)
>  +- ObjectHashAggregate(keys=[id#9L], 
> functions=[partial_collect_list(txt#10, 0, 0)])
> +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> The physical plans show pandas udf uses sortAggregate (slower) while 
> non-pandas udf uses objectHashAggregate (faster).
>  
> Below is what I have tried to improve the performance of pandas udf but none 
> of them worked:
> 1. repartition before groupby. For example, df.repartition(140, 
> "id").groupBy("id").apply(pd_udf). 140 is the same as 
> spark.sql.shuffle.partitions. I hope groupby can benefit from the repartition 
> but according to the execution plan the repartition seems to be ignored since 
> groupby will do partitioning itself.
> 
> 
> 2. although this slowness is more likely caused by pandas udf instead of 
> groupby, I still played with shuffle settings such as 
> spark.shuffle.compress=True, spark.shuffle.spill.compress=True.
> 
> 
> 3. I played with serDe settings such as 
> spark.serializer=org.apache.spark.serializer.KryoSerializer. Also I tried 
> pyarrow settings such as spark.sql.execution.arrow.enabled=True and 
> spark.sql.execution.arrow.maxRecordsPerBatch=10
> 
> 
> 4. I tried to replace the solution of "groupby + pandas udf " with 
> combineByKey, reduceByKey, repartition + mapPartition. But it is not easy 
> since the pandas udf has complicated logic.
> 
>  
> My questions:
>  
> 1. why pandas udf is so slow?
> 2. is there a way to improve the performance of pandas_udf?
> 3. in case it is a known issue of pandas udf, what other remedy I can use? I 
> guess I need to think harder on combineByKey, reduceByKey, repartition + 
> mapPartition. But want to know if I missed anything obvious.
>  
> Any clue is highly appreciated.
>  
> Thanks
> Leon
>  
>  
>  
>  


Re: pandas_udf is very slow

2020-04-05 Thread Silvio Fiorito
Your 2 examples are doing different things.

The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an 
aggregate.

I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your 
result the same?

From: Lian Jiang 
Date: Sunday, April 5, 2020 at 3:28 AM
To: user 
Subject: pandas_udf is very slow

Hi,

I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored 
over non pandas udf per 
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.

My data has about 250M records and the pandas udf code is like:

def pd_udf_func(data):
return pd.DataFrame(["id"])

pd_udf = pandas_udf(
pd_udf_func,
returnType=("id int"),
functionType=PandasUDFType.GROUPED_MAP
)
df3 = df.groupBy("id").apply(pd_udf)
df3.explain()
"""
== Physical Plan ==
FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
+- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#9L, 200)
  +- *(1) Project [id#9L, id#9L, txt#10]
 +- Scan ExistingRDD[id#9L,txt#10]
"""

As you can see, this pandas udf does nothing but returning a row having a 
pandas dataframe having None values. In reality, this pandas udf has 
complicated logic (e.g. iterate through the pandas dataframe rows and do some 
calculation). This simplification is to reduce noise caused by application 
specific logic. This pandas udf takes hours to run using 10 executors (14 cores 
and 64G mem each). On the other hand, below non-pandas udf can finish in 
minutes:

def udf_func(data_list):
return "hello"

udf = udf(udf_func, StringType())
df2 = 
df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd', 
udf('txt1'))
df2.explain()
"""
== Physical Plan ==
*(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
+- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
   +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0, 0)])
  +- Exchange hashpartitioning(id#9L, 200)
 +- ObjectHashAggregate(keys=[id#9L], 
functions=[partial_collect_list(txt#10, 0, 0)])
+- Scan ExistingRDD[id#9L,txt#10]
"""

The physical plans show pandas udf uses sortAggregate (slower) while non-pandas 
udf uses objectHashAggregate (faster).

Below is what I have tried to improve the performance of pandas udf but none of 
them worked:
1. repartition before groupby. For example, df.repartition(140, 
"id").groupBy("id").apply(pd_udf). 140 is the same as 
spark.sql.shuffle.partitions. I hope groupby can benefit from the repartition 
but according to the execution plan the repartition seems to be ignored since 
groupby will do partitioning itself.


2. although this slowness is more likely caused by pandas udf instead of 
groupby, I still played with shuffle settings such as 
spark.shuffle.compress=True, spark.shuffle.spill.compress=True.


3. I played with serDe settings such as 
spark.serializer=org.apache.spark.serializer.KryoSerializer. Also I tried 
pyarrow settings such as spark.sql.execution.arrow.enabled=True and 
spark.sql.execution.arrow.maxRecordsPerBatch=10


4. I tried to replace the solution of "groupby + pandas udf " with 
combineByKey, reduceByKey, repartition + mapPartition. But it is not easy since 
the pandas udf has complicated logic.


My questions:

1. why pandas udf is so slow?
2. is there a way to improve the performance of pandas_udf?
3. in case it is a known issue of pandas udf, what other remedy I can use? I 
guess I need to think harder on combineByKey, reduceByKey, repartition + 
mapPartition. But want to know if I missed anything obvious.

Any clue is highly appreciated.

Thanks
Leon






pandas_udf is very slow

2020-04-05 Thread Lian Jiang
Hi,

I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored
over non pandas udf per
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.


My data has about 250M records and the pandas udf code is like:

def pd_udf_func(data):
return pd.DataFrame(["id"])

pd_udf = pandas_udf(
pd_udf_func,
returnType=("id int"),
functionType=PandasUDFType.GROUPED_MAP
)
df3 = df.groupBy("id").apply(pd_udf)
df3.explain()
"""
== Physical Plan ==
FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
+- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#9L, 200)
  +- *(1) Project [id#9L, id#9L, txt#10]
 +- Scan ExistingRDD[id#9L,txt#10]
"""

As you can see, this pandas udf does nothing but returning a row having a
pandas dataframe having None values. In reality, this pandas udf has
complicated logic (e.g. iterate through the pandas dataframe rows and do
some calculation). This simplification is to reduce noise caused by
application specific logic. This pandas udf takes hours to run using 10
executors (14 cores and 64G mem each). On the other hand, below non-pandas
udf can finish in minutes:

def udf_func(data_list):
return "hello"

udf = udf(udf_func, StringType())
df2 =
df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
udf('txt1'))
df2.explain()
"""
== Physical Plan ==
*(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
+- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
   +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0,
0)])
  +- Exchange hashpartitioning(id#9L, 200)
 +- ObjectHashAggregate(keys=[id#9L],
functions=[partial_collect_list(txt#10, 0, 0)])
+- Scan ExistingRDD[id#9L,txt#10]
"""

The physical plans show pandas udf uses sortAggregate (slower) while
non-pandas udf uses objectHashAggregate (faster).

Below is what I have tried to improve the performance of pandas udf but
none of them worked:
1. repartition before groupby. For example, df.repartition(140,
"id").groupBy("id").apply(pd_udf). 140 is the same as
spark.sql.shuffle.partitions.
I hope groupby can benefit from the repartition but according to the
execution plan the repartition seems to be ignored since groupby will do
partitioning itself.

2. although this slowness is more likely caused by pandas udf instead of
groupby, I still played with shuffle settings such as
spark.shuffle.compress=True,
spark.shuffle.spill.compress=True.

3. I played with serDe settings such as
spark.serializer=org.apache.spark.serializer.KryoSerializer.
Also I tried pyarrow settings such as spark.sql.execution.arrow.enabled=True
and spark.sql.execution.arrow.maxRecordsPerBatch=10

4. I tried to replace the solution of "groupby + pandas udf " with
combineByKey, reduceByKey, repartition + mapPartition. But it is not easy
since the pandas udf has complicated logic.

My questions:

1. why pandas udf is so slow?
2. is there a way to improve the performance of pandas_udf?
3. in case it is a known issue of pandas udf, what other remedy I can use?
I guess I need to think harder on combineByKey, reduceByKey, repartition +
mapPartition. But want to know if I missed anything obvious.

Any clue is highly appreciated.

Thanks
Leon