[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15440817#comment-15440817 ]
Tomer Kaftan commented on SPARK-17110: -------------------------------------- Hi Miao, That setup wouldn't cause this bug to appear (However, I can try the most recent master tomorrow anyway just in case someone else has fixed it) I should have explicitly specified 2 slaves, not 2 nodes (as I suppose that is too ambiguous. I've updated the description). It is also critical that each slave is set to use *only 1 worker core* (as I did specify above) for this example. This is because this specific example & setup is designed to cause (non-deterministically, but with high probability) a situation where one of the pyspark workers reads data non-locally, which is what I have observed to cause this error consistently. To provide a mental model of how this example & code snippet forces this situation: 1. The workers initially cache the data, forcing it to be stored in memory locally. Worker A contains the large number (1000), Worker B contains only small numbers (1). 2. The two workers each process their local numbers one at a time. 3. Once Worker A hits the large wait, Worker B continues on to process all of its local data 4. Because Worker A is taking so long to finish its task (1000 seconds, but it can be set much smaller), the spark.locality.wait setting leads Worker B to begin processing data that is stored on Worker A 5. Worker B attempts to read non-local data not stored on that node, leading the stream corrupted exception to occur. The case in which this does not happen is the one run every few times where Worker A processes the large number (1000) last, as then there will be no data remaining on Worker A to attempt to launch on Worker B. > Pyspark with locality ANY throw java.io.StreamCorruptedException > ---------------------------------------------------------------- > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell > Reporter: Tomer Kaftan > Priority: Critical > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.<init>(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 slaves set to use only one > worker core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} > Or by running the following via spark-submit: > {code} > from pyspark import SparkContext > sc = SparkContext() > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org