[ 
https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25139:
-----------------------------
    Description: 
We run pyspark streaming on YARN, the executor will die caused by the error.

The executor trace log is below:
18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
 at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
 at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
 18/08/17 13:52:20 stdout writer for python ERROR 
SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
writer for python,5,main]

I think shoud wait WriterThread after Task#run.

  was:
We run pyspark streaming on YARN, the executor will die caused by the error.

The executor trace log is below:
{noformat}
18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
 at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
 at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
 18/08/17 13:52:20 stdout writer for python ERROR 
SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
writer for python,5,main]
{noformat}

I think shoud wait WriterThread after Task#run.


> PythonRunner#WriterThread released block after TaskRunner finally block which 
>  invoke BlockManager#releaseAllLocksForTask
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25139
>                 URL: https://issues.apache.org/jira/browse/SPARK-25139
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager
>    Affects Versions: 2.3.1
>            Reporter: DENG FEI
>            Priority: Major
>
> We run pyspark streaming on YARN, the executor will die caused by the error.
> The executor trace log is below:
> 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG 
> BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 
> Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 
> trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 
> Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 
> acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task 
> launch worker for task 137 DEBUG BlockManager: Level for block 
> input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 
> 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found 
> block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch 
> worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, 
> finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: 
> Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor 
> task launch worker for task 137 INFO Executor: 1 block locks were not 
> released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout 
> writer for python ERROR Utils: Uncaught exception in thread stdout writer for 
> python java.lang.AssertionError: assertion failed: Block 
> input-0-1534485138800 is not locked for reading at 
> scala.Predef$.assert(Predef.scala:170) at 
> org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) 
> at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) 
> at 
> org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
>  at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
>  at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
>  at 
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
>  at 
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
>  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
>  18/08/17 13:52:20 stdout writer for python ERROR 
> SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
> writer for python,5,main]
> I think shoud wait WriterThread after Task#run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to