[ 
https://issues.apache.org/jira/browse/SPARK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17080257#comment-17080257
 ] 

JinxinTang edited comment on SPARK-31386 at 4/10/20, 5:32 AM:
--------------------------------------------------------------

or could you please try the spark 2.4.5, also no problem could be reproduced

[download 
link|https://www.apache.org/dyn/closer.lua/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz]


was (Author: jinxintang):
or could you please try the spark 2.4.5, also no problem could be reproduces

[download 
link|https://www.apache.org/dyn/closer.lua/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz]

> Reading broadcast in UDF raises MemoryError when 
> spark.executor.pyspark.memory is set
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-31386
>                 URL: https://issues.apache.org/jira/browse/SPARK-31386
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.4
>         Environment: Spark 2.4.4 or AWS EMR
> `pyspark --conf spark.executor.pyspark.memory=500m`
>            Reporter: Viacheslav Krot
>            Priority: Major
>         Attachments: 选区_267.png
>
>
> Following code with udf causes MemoryError when 
> `spark.executor.pyspark.memory` is set
> ```
> from pyspark.sql.types import BooleanType
>  from pyspark.sql.functions import udf
> df = spark.createDataFrame([
>    ('Alice', 10),
>    ('Bob', 12)
>  ], ['name', 'cnt'])
> broadcast = spark.sparkContext.broadcast([1,2,3])
> @udf(BooleanType())
>  def f(cnt):
>    return cnt < len(broadcast.value)
> df.filter(f(df.cnt)).count()
> ```
> Same code work well when spark.executor.pyspark.memory is not set. 
> The code by itself does not make any sense, just simplest code to reproduce 
> the bug.
>  
> Error:
> ```
> 20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 6, 
> ip-172-31-32-201.us-east-2.compute.internal, executor 2): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 
> 6, ip-172-31-32-201.us-east-2.compute.internal, executor 2): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
>  line 377, in main    process()  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
>  line 372, in process    serializer.dump_stream(func(split_index, iterator), 
> outfile)  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
>  line 345, in dump_stream    
> self.serializer.dump_stream(self._batched(iterator), stream)  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
>  line 141, in dump_stream    for obj in iterator:  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
>  line 334, in _batched    for item in iterator:  File "<string>", line 1, in 
> <lambda>  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
>  line 85, in <lambda>    return lambda *a: f(*a)  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/util.py",
>  line 113, in wrapper    return f(*args, **kwargs)  File "<stdin>", line 3, 
> in f  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
>  line 148, in value    self._value = self.load_from_path(self._path)  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
>  line 124, in load_from_path    with open(path, 'rb', 1 << 20) as 
> f:MemoryError
>  at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
>  at 
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
>  at 
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
>  at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
>  at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
>  Source) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>  at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
> org.apache.spark.scheduler.Task.run(Task.scala:123) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> ```
> pyspark command to launch:
> `pyspark --conf spark.executor.pyspark.memory=500m`
>  
> emr version - 5.27.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to