[ https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joshuawangzj updated SPARK-21045: --------------------------------- Environment: It has problem only in Python 2+. Python 3+ is ok. Summary: Spark executor blocked instead of throwing exception because exception occur when python worker send exception info to Java Gateway in Python 2+ (was: Spark executor blocked instead of throwing exception because exception occur when python worker send exception info to Java Gateway) > Spark executor blocked instead of throwing exception because exception occur > when python worker send exception info to Java Gateway in Python 2+ > ------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: SPARK-21045 > URL: https://issues.apache.org/jira/browse/SPARK-21045 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.0.1, 2.0.2, 2.1.1 > Environment: It has problem only in Python 2+. > Python 3+ is ok. > Reporter: Joshuawangzj > > My pyspark program is always blocking in product yarn cluster. Then I jstack > and found : > {code} > "Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 > tid=0x00007fb2f44e3000 nid=0xa003 runnable [0x0000000123b4a000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > at java.io.BufferedInputStream.read(BufferedInputStream.java:265) > - locked <0x00000007acab1c98> (a java.io.BufferedInputStream) > at java.io.DataInputStream.readInt(DataInputStream.java:387) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190) > at > org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) > at > org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > It is blocking in socket read. I view the log on blocking executor and found > error: > {code} > Traceback (most recent call last): > File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in > main > write_with_length(traceback.format_exc().encode("utf-8"), outfile) > UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: > ordinal not in range(128) > {code} > Finally I found the problem: > {code:title=worker.py|borderStyle=solid} > # 178 line in spark 2.1.1 > except Exception: > try: > write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) > write_with_length(traceback.format_exc().encode("utf-8"), outfile) > except IOError: > # JVM close the socket > pass > except Exception: > # Write the error to stderr if it happened while serializing > print("PySpark worker failed with exception:", file=sys.stderr) > print(traceback.format_exc(), file=sys.stderr) > {code} > when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur > exception like UnicodeDecodeError, the python worker can't send the trace > info, but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the > trace info length next. So it is blocking. > {code:title=PythonRDD.scala|borderStyle=solid} > # 190 line in spark 2.1.1 > case SpecialLengths.PYTHON_EXCEPTION_THROWN => > // Signals that an exception has been thrown in python > val exLength = stream.readInt() // It is possible to be blocked > {code} > {color:red} > We can triggle the bug use simple program: > {color} > {code:title=test.py|borderStyle=solid} > spark = SparkSession.builder.master('local').getOrCreate() > rdd = spark.sparkContext.parallelize(['δΈ']).map(lambda x: > x.encode("utf8")) > rdd.collect() > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org