[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469295#comment-15469295 ] Tomer Kaftan commented on SPARK-17110: -- Thanks all who helped out with this! > Pyspark with locality ANY throw java.io.StreamCorruptedException > > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell >Reporter: Tomer Kaftan >Assignee: Josh Rosen >Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 slaves set to use only one > worker core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} > Or by running the following via spark-submit: > {code} > from pyspark import SparkContext > sc = SparkContext() > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1544#comment-1544 ] Tomer Kaftan commented on SPARK-17110: -- Hi Miao, all that is needed using the fully default configurations with 2 slaves is to just set the number of worker cores per slave to 1. That is, putting the following in /spark/conf/spark-env.sh {code} SPARK_WORKER_CORES=1 {code} More concretely (if you’re looking for exact steps), the way I start up the cluster & reproduce this example is as follows. I use the Spark EC2 scripts from the PR here: https://github.com/amplab/spark-ec2/pull/46 I launch the cluster on 2 r3.xlarge machines (but any machine should work, though you may need to change a later sed command): {code} ./spark-ec2 -k ec2_ssh_key -i path_to_key_here -s 2 -t r3.xlarge launch temp-cluster --spot-price=1.00 --spark-version=2.0.0 --region=us-west-2 --hadoop-major-version=yarn {code} I update the number of worker cores and launch the pyspark shell: {code} sed -i'f' 's/SPARK_WORKER_CORES=4/SPARK_WORKER_CORES=1/g' /root/spark/conf/spark-env.sh ~/spark-ec2/copy-dir ~/spark/conf/ ~/spark/sbin/stop-all.sh ~/spark/sbin/start-all.sh ~/spark/bin/pyspark {code} And then I run the example I included at the start: {code} x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} > Pyspark with locality ANY throw java.io.StreamCorruptedException > > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell >Reporter: Tomer Kaftan >Priority: Critical > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 slaves set to use only one > worker core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} > Or by running the following via spark-submit: > {code} > from pyspark import SparkContext > sc = SparkContext() > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} -- This
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 scripts, Spark 2.0.0, hadoop: yarn, pyspark shell (was: Cluster of 2 AWS r3.xlarge nodes launched via ec2 scripts, Spark 2.0.0, hadoop: yarn, pyspark shell) > Pyspark with locality ANY throw java.io.StreamCorruptedException > > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell >Reporter: Tomer Kaftan >Priority: Critical > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 slaves set to use only one > worker core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} > Or by running the following via spark-submit: > {code} > from pyspark import SparkContext > sc = SparkContext() > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440817#comment-15440817 ] Tomer Kaftan commented on SPARK-17110: -- Hi Miao, That setup wouldn't cause this bug to appear (However, I can try the most recent master tomorrow anyway just in case someone else has fixed it) I should have explicitly specified 2 slaves, not 2 nodes (as I suppose that is too ambiguous. I've updated the description). It is also critical that each slave is set to use *only 1 worker core* (as I did specify above) for this example. This is because this specific example & setup is designed to cause (non-deterministically, but with high probability) a situation where one of the pyspark workers reads data non-locally, which is what I have observed to cause this error consistently. To provide a mental model of how this example & code snippet forces this situation: 1. The workers initially cache the data, forcing it to be stored in memory locally. Worker A contains the large number (1000), Worker B contains only small numbers (1). 2. The two workers each process their local numbers one at a time. 3. Once Worker A hits the large wait, Worker B continues on to process all of its local data 4. Because Worker A is taking so long to finish its task (1000 seconds, but it can be set much smaller), the spark.locality.wait setting leads Worker B to begin processing data that is stored on Worker A 5. Worker B attempts to read non-local data not stored on that node, leading the stream corrupted exception to occur. The case in which this does not happen is the one run every few times where Worker A processes the large number (1000) last, as then there will be no data remaining on Worker A to attempt to launch on Worker B. > Pyspark with locality ANY throw java.io.StreamCorruptedException > > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell >Reporter: Tomer Kaftan >Priority: Critical > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 slaves set to use only one > worker core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x >
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Description: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: {noformat} WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamCorruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {noformat} The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 slaves set to use only one worker core each: {code} x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} Or by running the following via spark-submit: {code} from pyspark import SparkContext sc = SparkContext() x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} was: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: {noformat} WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamCorruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at
[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435418#comment-15435418 ] Tomer Kaftan commented on SPARK-17110: -- [~radost...@gmail.com] Unfortunately we haven't been able to find any for our project other than staying with spark 1.6 for the time being. > Pyspark with locality ANY throw java.io.StreamCorruptedException > > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell >Reporter: Tomer Kaftan >Priority: Critical > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 nodes set to use only one worker > core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} > Or by running the following via spark-submit: > {code} > from pyspark import SparkContext > sc = SparkContext() > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425752#comment-15425752 ] Tomer Kaftan commented on SPARK-17110: -- It is with the default configuration settings, and this worked fine with spark 1.6.0. Still, it's not unthinkable to me that the default configurations have changed since 1.6. It should happen with any wait time greater than the spark.locality.wait setting (forcing the task to be launched on a different node than the one the data was cached on). The initial workload we came across this in had each task taking ~20 seconds, and this error would consistently happen near the end of the stage, as soon as tasks began being executed non-locally. That workload also ran fine in spark 1.6. > Pyspark with locality ANY throw java.io.StreamCorruptedException > > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell >Reporter: Tomer Kaftan >Priority: Critical > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 nodes set to use only one worker > core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} > Or by running the following via spark-submit: > {code} > from pyspark import SparkContext > sc = SparkContext() > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 scripts, Spark 2.0.0, hadoop: yarn, pyspark shell (was: Cluster of 2 AWS r3.xlarge nodes launched via ec2 scripts, pyspark shell) > Pyspark with locality ANY throw java.io.StreamCorruptedException > > > Key: SPARK-17110 > URL: https://issues.apache.org/jira/browse/SPARK-17110 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 > Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 > scripts, Spark 2.0.0, hadoop: yarn, pyspark shell >Reporter: Tomer Kaftan >Priority: Critical > > In Pyspark 2.0.0, any task that accesses cached data non-locally throws a > StreamCorruptedException like the stacktrace below: > {noformat} > WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): > java.io.StreamCorruptedException: invalid stream header: 12010A80 > at > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) > at java.io.ObjectInputStream.(ObjectInputStream.java:302) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The simplest way I have found to reproduce this is by running the following > code in the pyspark shell, on a cluster of 2 nodes set to use only one worker > core each: > {code} > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} > Or by running the following via spark-submit: > {code} > from pyspark import SparkContext > sc = SparkContext() > x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() > x.count() > import time > def waitMap(x): > time.sleep(x) > return x > x.map(waitMap).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Description: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: {noformat} WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamCorruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {noformat} The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only one worker core each: {code} x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} Or by running the following via spark-submit: {code} from pyspark import SparkContext sc = SparkContext() x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} was: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: {noformat} WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Description: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: {noformat} WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {noformat} The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only one worker core each: {code} x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} Or by running the following via spark-submit: {code} from pyspark import SparkContext sc = SparkContext() x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} was: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Description: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only one worker core each: {code} x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} Or by running the following via spark-submit: {code} from pyspark import SparkContext sc = SparkContext() x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} was: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Description: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only one worker core each: {code} x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} was: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only
[jira] [Updated] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
[ https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-17110: - Description: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only one worker core each: {code} x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() {code} was: In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only one
[jira] [Created] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException
Tomer Kaftan created SPARK-17110: Summary: Pyspark with locality ANY throw java.io.StreamCorruptedException Key: SPARK-17110 URL: https://issues.apache.org/jira/browse/SPARK-17110 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.0.0 Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 scripts, pyspark shell Reporter: Tomer Kaftan Priority: Critical In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below: ``` WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamC orruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 nodes set to use only one worker core each: ```python x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count() ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5433) Spark EC2 doesn't mount local disks for some instance types
Tomer Kaftan created SPARK-5433: --- Summary: Spark EC2 doesn't mount local disks for some instance types Key: SPARK-5433 URL: https://issues.apache.org/jira/browse/SPARK-5433 Project: Spark Issue Type: Bug Components: EC2 Affects Versions: 1.2.0 Reporter: Tomer Kaftan Priority: Critical Launching a cluster using spark-ec2 will currently mount all local disks for the r3 instance types. Branch 1.3.0 of the ec2 scripts has also been updated to mount one local ssd disk for the i2 instance types. At the very least the i2 instance types need to have all local disks mounted, and there may be other instance types I'm not aware of that also need to be mounting local disks but aren't. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-5433) Spark EC2 doesn't mount local disks for all instance types
[ https://issues.apache.org/jira/browse/SPARK-5433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-5433: Description: Launching a cluster using spark-ec2 will currently mount all local disks for the r3 instance types. Branch 1.3.0 of the ec2 scripts has also been updated to mount one local ssd disk for the i2 instance types. At the very least the i2 instance types need to have all local disks mounted. We also need to find if there are any other instance types that also need to be updated. was: Launching a cluster using spark-ec2 will currently mount all local disks for the r3 instance types. Branch 1.3.0 of the ec2 scripts has also been updated to mount one local ssd disk for the i2 instance types. At the very least the i2 instance types need to have all local disks mounted, and there may be other instance types I'm not aware of that also need to be mounting local disks but aren't. Spark EC2 doesn't mount local disks for all instance types -- Key: SPARK-5433 URL: https://issues.apache.org/jira/browse/SPARK-5433 Project: Spark Issue Type: Bug Components: EC2 Affects Versions: 1.2.0 Reporter: Tomer Kaftan Priority: Critical Launching a cluster using spark-ec2 will currently mount all local disks for the r3 instance types. Branch 1.3.0 of the ec2 scripts has also been updated to mount one local ssd disk for the i2 instance types. At the very least the i2 instance types need to have all local disks mounted. We also need to find if there are any other instance types that also need to be updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-5433) Spark EC2 doesn't mount local disks for all instance types
[ https://issues.apache.org/jira/browse/SPARK-5433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Kaftan updated SPARK-5433: Summary: Spark EC2 doesn't mount local disks for all instance types (was: Spark EC2 doesn't mount local disks for some instance types) Spark EC2 doesn't mount local disks for all instance types -- Key: SPARK-5433 URL: https://issues.apache.org/jira/browse/SPARK-5433 Project: Spark Issue Type: Bug Components: EC2 Affects Versions: 1.2.0 Reporter: Tomer Kaftan Priority: Critical Launching a cluster using spark-ec2 will currently mount all local disks for the r3 instance types. Branch 1.3.0 of the ec2 scripts has also been updated to mount one local ssd disk for the i2 instance types. At the very least the i2 instance types need to have all local disks mounted, and there may be other instance types I'm not aware of that also need to be mounting local disks but aren't. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org