[ 
https://issues.apache.org/jira/browse/SPARK-18406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113702#comment-16113702
 ] 

Taichi Sano commented on SPARK-18406:
-------------------------------------

Hello,
I am experiencing an issue very similar to this. I am currently trying to do a 
groupByKeyAndWindow() with batch size of 1, window size of 80, and shift size 
of 1 from data that is being streamed from Kafka (ver 0.10) with Direct 
Streaming. Every once in a while, I encounter the AssertionError like so:

17/08/03 22:32:19 ERROR org.apache.spark.executor.Executor: Exception in task 
0.0 in stage 20936.0 (TID 4409)
java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:156)
        at 
org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
        at 
org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
        at 
org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
17/08/03 22:32:19 ERROR org.apache.spark.executor.Executor: Exception in task 
0.1 in stage 20936.0 (TID 4410)
java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:156)
        at 
org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
        at 
org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
        at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
        at 
org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
17/08/03 22:32:19 ERROR org.apache.spark.util.Utils: Uncaught exception in 
thread stdout writer for /opt/conda/bin/python
java.lang.AssertionError: assertion failed: Block rdd_30291_0 is not locked for 
reading
        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
        at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
        at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
        at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
        at 
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at 
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/08/03 22:32:19 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: 
Uncaught exception in thread Thread[stdout writer for 
/opt/conda/bin/python,5,main]
java.lang.AssertionError: assertion failed: Block rdd_30291_0 is not locked for 
reading
        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
        at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
        at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
        at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
        at 
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at 
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

which also kills the executor. Sometimes a new executor is spawned to pick up 
where the dead executor left off but sometimes the whole Spark job also crashes 
due to this error. I'm running version 2.2.0 on Google Dataproc on a single 
node cluster. 

> Race between end-of-task and completion iterator read lock release
> ------------------------------------------------------------------
>
>                 Key: SPARK-18406
>                 URL: https://issues.apache.org/jira/browse/SPARK-18406
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Spark Core
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Josh Rosen
>            Assignee: Jiang Xingbo
>             Fix For: 2.0.3, 2.1.2, 2.2.0
>
>
> The following log comes from a production streaming job where executors 
> periodically die due to uncaught exceptions during block release:
> {code}
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7921
> 16/11/07 17:11:06 INFO Executor: Running task 0.0 in stage 2390.0 (TID 7921)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7922
> 16/11/07 17:11:06 INFO Executor: Running task 1.0 in stage 2390.0 (TID 7922)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7923
> 16/11/07 17:11:06 INFO Executor: Running task 2.0 in stage 2390.0 (TID 7923)
> 16/11/07 17:11:06 INFO TorrentBroadcast: Started reading broadcast variable 
> 2721
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7924
> 16/11/07 17:11:06 INFO Executor: Running task 3.0 in stage 2390.0 (TID 7924)
> 16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721_piece0 stored as 
> bytes in memory (estimated size 5.0 KB, free 4.9 GB)
> 16/11/07 17:11:06 INFO TorrentBroadcast: Reading broadcast variable 2721 took 
> 3 ms
> 16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721 stored as values in 
> memory (estimated size 9.4 KB, free 4.9 GB)
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_3 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_2 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_4 locally
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 2, boot = -566, init = 
> 567, finish = 1
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 7, boot = -540, init = 
> 541, finish = 6
> 16/11/07 17:11:06 INFO Executor: Finished task 2.0 in stage 2390.0 (TID 
> 7923). 1429 bytes result sent to driver
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 8, boot = -532, init = 
> 533, finish = 7
> 16/11/07 17:11:06 INFO Executor: Finished task 3.0 in stage 2390.0 (TID 
> 7924). 1429 bytes result sent to driver
> 16/11/07 17:11:06 ERROR Executor: Exception in task 0.0 in stage 2390.0 (TID 
> 7921)
> java.lang.AssertionError: assertion failed
>       at scala.Predef$.assert(Predef.scala:165)
>       at 
> org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
>       at 
> org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at 
> org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
>       at 
> org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7925
> 16/11/07 17:11:06 INFO Executor: Running task 0.1 in stage 2390.0 (TID 7925)
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 41, boot = -536, init = 
> 576, finish = 1
> 16/11/07 17:11:06 INFO Executor: Finished task 1.0 in stage 2390.0 (TID 
> 7922). 1429 bytes result sent to driver
> 16/11/07 17:11:06 ERROR Utils: Uncaught exception in thread stdout writer for 
> /databricks/python/bin/python
> java.lang.AssertionError: assertion failed: Block rdd_2741_1 is not locked 
> for reading
>       at scala.Predef$.assert(Predef.scala:179)
>       at 
> org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294)
>       at 
> org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:630)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:434)
>       at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
>       at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1882)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> 16/11/07 17:11:06 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[stdout writer for /databricks/python/bin/python,5,main]
> java.lang.AssertionError: assertion failed: Block rdd_2741_1 is not locked 
> for reading
>       at scala.Predef$.assert(Predef.scala:179)
>       at 
> org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294)
>       at 
> org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:630)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:434)
>       at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
>       at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1882)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala
> {code}
> I think that there's some sort of internal race condition between a task 
> finishing (TID 7921) and automatically releasing locks and between some 
> "automatically release locks on hitting the end of an iterator" logic running 
> in a separate thread. The log above came from a production streaming job 
> where executors periodically died with this type of error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to