Hey Davies, Here are some more details on a configuration that causes this error for me. Launch an AWS Spark EMR cluster as follows:
*aws emr create-cluster --region us-west-1 --no-auto-terminate \ --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \ --ami-version 3.3 --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name "Spark Issue Repro" \ --visible-to-all-users --applications Name=Ganglia* This is a 10 node cluster (not sure if this makes a difference outside of HDFS block locality). Then use this Gist here as your spark-defaults file (it'll configure 2 executors per job as well): https://gist.github.com/skrasser/9b978d3d572735298d16 With that, I am seeing this again: 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in stage 0.0 (TID 27) org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@4cfae71c) Thanks for the performance pointers -- the repro script is fairly unpolished (just enough to cause the aforementioned exception). Hope this sheds some light on the error. From what I can tell so far, something in the spark-defaults file triggers it (with other settings it completes just fine). Thanks for your help! -Sven On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu <dav...@databricks.com> wrote: > I still can not reproduce it with 2 nodes (4 CPUs). > > Your repro.py could be faster (10 min) than before (22 min): > > inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or > 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): > pc==3).collect() > > (also, no cache needed anymore) > > Davies > > > > On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser <kras...@gmail.com> wrote: > > The issue has been sensitive to the number of executors and input data > size. > > I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory > > overhead for YARN. This will fit onto Amazon r3 instance types. > > -Sven > > > > On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu <dav...@databricks.com> > wrote: > >> > >> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not > >> reproduce your failure. Should I test it with big memory node? > >> > >> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser <kras...@gmail.com> wrote: > >> > Thanks for the input! I've managed to come up with a repro of the > error > >> > with > >> > test data only (and without any of the custom code in the original > >> > script), > >> > please see here: > >> > > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md > >> > > >> > The Gist contains a data generator and the script reproducing the > error > >> > (plus driver and executor logs). If I run using full cluster capacity > >> > (32 > >> > executors with 28GB), there are no issues. If I run on only two, the > >> > error > >> > appears again and the job fails: > >> > > >> > org.apache.spark.SparkException: PairwiseRDD: unexpected value: > >> > List([B@294b55b7) > >> > > >> > > >> > Any thoughts or any obvious problems you can spot by any chance? > >> > > >> > Thank you! > >> > -Sven > >> > > >> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen <rosenvi...@gmail.com> > wrote: > >> >> > >> >> It doesn’t seem like there’s a whole lot of clues to go on here > without > >> >> seeing the job code. The original "org.apache.spark.SparkException: > >> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests > that > >> >> maybe > >> >> there’s an issue with PySpark’s serialization / tracking of types, > but > >> >> it’s > >> >> hard to say from this error trace alone. > >> >> > >> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) > >> >> wrote: > >> >> > >> >> Hey Josh, > >> >> > >> >> I am still trying to prune this to a minimal example, but it has been > >> >> tricky since scale seems to be a factor. The job runs over ~720GB of > >> >> data > >> >> (the cluster's total RAM is around ~900GB, split across 32 > executors). > >> >> I've > >> >> managed to run it over a vastly smaller data set without issues. > >> >> Curiously, > >> >> when I run it over slightly smaller data set of ~230GB (using > >> >> sort-based > >> >> shuffle), my job also fails, but I see no shuffle errors in the > >> >> executor > >> >> logs. All I see is the error below from the driver (this is also what > >> >> the > >> >> driver prints when erroring out on the large data set, but I assumed > >> >> the > >> >> executor errors to be the root cause). > >> >> > >> >> Any idea on where to look in the interim for more hints? I'll > continue > >> >> to > >> >> try to get to a minimal repro. > >> >> > >> >> 2014-12-30 21:35:34,539 INFO > >> >> [sparkDriver-akka.actor.default-dispatcher-14] > >> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked > >> >> to > >> >> send map output locations for shuffle 0 to > >> >> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 > >> >> 2014-12-30 21:35:39,512 INFO > >> >> [sparkDriver-akka.actor.default-dispatcher-17] > >> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked > >> >> to > >> >> send map output locations for shuffle 0 to > >> >> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 > >> >> 2014-12-30 21:35:58,893 WARN > >> >> [sparkDriver-akka.actor.default-dispatcher-16] > >> >> remote.ReliableDeliverySupervisor > (Slf4jLogger.scala:apply$mcV$sp(71)) > >> >> - > >> >> Association with remote system > >> >> > >> >> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] > has > >> >> failed, address is now gated for [5000] ms. Reason is: > [Disassociated]. > >> >> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] > >> >> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - > Yarn > >> >> application has already exited with state FINISHED! > >> >> 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] > >> >> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped > >> >> o.e.j.s.ServletContextHandler{/stages/stage/kill,null} > >> >> > >> >> [...] > >> >> > >> >> 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] > >> >> ui.SparkUI > >> >> (Logging.scala:logInfo(59)) - Stopped Spark web UI at > >> >> http://ip-10-20-80-37.us-west-1.compute.internal:4040 > >> >> 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] > >> >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping > >> >> DAGScheduler > >> >> 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] > >> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - > >> >> Shutting > >> >> down all executors > >> >> 2014-12-30 21:35:59,132 INFO > >> >> [sparkDriver-akka.actor.default-dispatcher-14] > >> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - > Asking > >> >> each > >> >> executor to shut down > >> >> 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler > >> >> (Logging.scala:logInfo(59)) - Job 1 failed: collect at > >> >> /home/hadoop/test_scripts/test.py:63, took 980.751936 s > >> >> Traceback (most recent call last): > >> >> File "/home/hadoop/test_scripts/test.py", line 63, in <module> > >> >> result = j.collect() > >> >> File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in > collect > >> >> bytesInJava = self._jrdd.collect().iterator() > >> >> File > >> >> > >> >> > "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > >> >> line 538, in __call__ > >> >> File > >> >> > "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > >> >> line > >> >> 300, in get_return_value > >> >> py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn > >> >> application > >> >> state monitor] cluster.YarnClientSchedulerBackend > >> >> (Logging.scala:logInfo(59)) - Stopped > >> >> : An error occurred while calling o117.collect. > >> >> : org.apache.spark.SparkException: Job cancelled because SparkContext > >> >> was > >> >> shut down > >> >> at > >> >> > >> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) > >> >> at > >> >> > >> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701) > >> >> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) > >> >> at > >> >> > >> >> > org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701) > >> >> at > >> >> > >> >> > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428) > >> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > >> >> at > >> >> > >> >> > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375) > >> >> at > >> >> > >> >> > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > >> >> at > >> >> > >> >> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > >> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) > >> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > >> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > >> >> at > >> >> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >> >> at > >> >> > >> >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > >> >> at > >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> >> at > >> >> > >> >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> >> at > >> >> > >> >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> >> at > >> >> > >> >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> >> > >> >> > >> >> Thank you! > >> >> -Sven > >> >> > >> >> > >> >> On Tue, Dec 30, 2014 at 12:15 PM, Josh Rosen <rosenvi...@gmail.com> > >> >> wrote: > >> >>> > >> >>> Hi Sven, > >> >>> > >> >>> Do you have a small example program that you can share which will > >> >>> allow > >> >>> me to reproduce this issue? If you have a workload that runs into > >> >>> this, you > >> >>> should be able to keep iteratively simplifying the job and reducing > >> >>> the data > >> >>> set size until you hit a fairly minimal reproduction (assuming the > >> >>> issue is > >> >>> deterministic, which it sounds like it is). > >> >>> > >> >>> On Tue, Dec 30, 2014 at 9:49 AM, Sven Krasser <kras...@gmail.com> > >> >>> wrote: > >> >>>> > >> >>>> Hey all, > >> >>>> > >> >>>> Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 > >> >>>> fails > >> >>>> during shuffle. I've tried reverting from the sort-based shuffle > back > >> >>>> to the > >> >>>> hash one, and that fails as well. Does anyone see similar problems > or > >> >>>> has an > >> >>>> idea on where to look next? > >> >>>> > >> >>>> For the sort-based shuffle I get a bunch of exception like this in > >> >>>> the > >> >>>> executor logs: > >> >>>> > >> >>>> 2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2] > >> >>>> executor.Executor (Logging.scala:logError(96)) - Exception in task > >> >>>> 4523.0 in > >> >>>> stage 1.0 (TID 4524) > >> >>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value: > >> >>>> List([B@130dc7ad) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305) > >> >>>> at > >> >>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > >> >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) > >> >>>> at > >> >>>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > >> >>>> at > >> >>>> > >> >>>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > >> >>>> at > >> >>>> > >> >>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > >> >>>> at java.lang.Thread.run(Thread.java:745) > >> >>>> > >> >>>> > >> >>>> For the hash-based shuffle, there are now a bunch of these > exceptions > >> >>>> in > >> >>>> the logs: > >> >>>> > >> >>>> > >> >>>> 2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0] > >> >>>> executor.Executor (Logging.scala:logError(96)) - Exception in task > >> >>>> 4479.0 in > >> >>>> stage 1.0 (TID 4480) > >> >>>> java.io.FileNotFoundException: > >> >>>> > >> >>>> > /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0 > >> >>>> (No such file or directory) > >> >>>> at java.io.FileOutputStream.open(Native Method) > >> >>>> at > java.io.FileOutputStream.<init>(FileOutputStream.java:221) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) > >> >>>> at > >> >>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> >>>> at > >> >>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > >> >>>> at > >> >>>> > >> >>>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > >> >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) > >> >>>> at > >> >>>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > >> >>>> at > >> >>>> > >> >>>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > >> >>>> at > >> >>>> > >> >>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > >> >>>> at java.lang.Thread.run(Thread.java:745) > >> >>>> > >> >>>> > >> >>>> Thank you! > >> >>>> -Sven > >> >>>> > >> >>>> > >> >>>> > >> >>>> -- > >> >>>> http://sites.google.com/site/krasser/?utm_source=sig > >> >>> > >> >>> > >> >> > >> >> > >> >> > >> >> -- > >> >> http://sites.google.com/site/krasser/?utm_source=sig > >> > > >> > > >> > > >> > > >> > -- > >> > http://sites.google.com/site/krasser/?utm_source=sig > > > > > > > > > > -- > > http://sites.google.com/site/krasser/?utm_source=sig > -- http://sites.google.com/site/krasser/?utm_source=sig