[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15440786#comment-15440786
 ] 

Miao Wang commented on SPARK-17110:
-----------------------------------

I set up a two-node cluster, one master, one worker, 48 cores. 1G memory. 
pyspark run the above code works fine. No exception. It seems that this bug has 
been fix in latest master branch. Can you upgrade and try again?

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> ----------------------------------------------------------------
>
>                 Key: SPARK-17110
>                 URL: https://issues.apache.org/jira/browse/SPARK-17110
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.0
>         Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>            Reporter: Tomer Kaftan
>            Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
>         at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
>         at java.io.ObjectInputStream.<init>(ObjectInputStream.java:302)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
>         at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
>         at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
>         at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
>         at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
>         at scala.Option.map(Option.scala:146)
>         at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
>         at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
>         at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
>         at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>         at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
>     time.sleep(x)
>     return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
>     time.sleep(x)
>     return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to