[ 
https://issues.apache.org/jira/browse/SPARK-21935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166086#comment-16166086
 ] 

Nikolaos Tsipas commented on SPARK-21935:
-----------------------------------------

Thanks for the response [~srowen]! The problem only appears when a python UDF 
is used, the same UDF written in scala doesn't cause any memory issues. 

However if you are still thinking that the issue is somewhere else in the app 
what would be the best way to debug it? Focus on spark or yarn? Also, if you 
can think of any more specific debugging steps please make suggestions.

> Pyspark UDF causing ExecutorLostFailure 
> ----------------------------------------
>
>                 Key: SPARK-21935
>                 URL: https://issues.apache.org/jira/browse/SPARK-21935
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.1.0
>            Reporter: Nikolaos Tsipas
>              Labels: pyspark, udf
>         Attachments: cpu.png, Screen Shot 2017-09-06 at 11.30.28.png, Screen 
> Shot 2017-09-06 at 11.31.13.png, Screen Shot 2017-09-06 at 11.31.31.png
>
>
> Hi,
> I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as 
> follows:
> {code}
> from pyspark.sql.functions import col, udf
> from pyspark.sql.types import StringType
> path = 's3://some/parquet/dir/myfile.parquet'
> df = spark.read.load(path)
> def _test_udf(useragent):
>     return useragent.upper()
> test_udf = udf(_test_udf, StringType())
> df = df.withColumn('test_field', test_udf(col('std_useragent')))
> df.write.parquet('/output.parquet')
> {code}
> The following config is used in {{spark-defaults.conf}} (using 
> {{maximizeResourceAllocation}} in EMR)
> {code}
> ...
> spark.executor.instances         4
> spark.executor.cores             8
> spark.driver.memory              8G
> spark.executor.memory            9658M
> spark.default.parallelism        64
> spark.driver.maxResultSize       3G
> ...
> {code}
> The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, 
> 15 GiB memory, 160 SSD GB storage
> The above example fails every single time with errors like the following:
> {code}
> 17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, 
> ip-172-31-7-125.eu-west-1.compute.internal, executor 10): ExecutorLostFailure 
> (executor 10 exited caused by one of the running tasks) Reason: Container 
> killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical 
> memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> {code}
> I tried to increase the  {{spark.yarn.executor.memoryOverhead}} to 3000 which 
> delays the errors but eventually I get them before the end of the job. The 
> job eventually fails.
> !Screen Shot 2017-09-06 at 11.31.31.png|width=800!
> If I run the above job in scala everything works as expected (without having 
> to adjust the memoryOverhead)
> {code}
> import org.apache.spark.sql.functions.udf
> val upper: String => String = _.toUpperCase
> val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
> val upperUDF = udf(upper)
> val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
> newdf.write.parquet("/output.parquet")
> {code}
> !Screen Shot 2017-09-06 at 11.31.13.png|width=800!
> Cpu utilisation is very bad with pyspark
> !cpu.png|width=800!
> Is this a known bug with pyspark and udfs or is it a matter of bad 
> configuration? 
> Looking forward to suggestions. Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to