I've filed a ticket for this issue here: https://issues.apache.org/jira/browse/SPARK-5209. (This reproduces the problem on a smaller cluster size.) -Sven
On Wed, Jan 7, 2015 at 11:13 AM, Sven Krasser <kras...@gmail.com> wrote: > Could you try it on AWS using EMR? That'd give you an exact replica of the > environment that causes the error. > > Sent from my iPhone > >> On Jan 7, 2015, at 10:53 AM, Davies Liu <dav...@databricks.com> wrote: >> >> Hey Sven, >> >> I tried with all of your configurations, 2 node with 2 executors each, >> but in standalone mode, >> it worked fine. >> >> Could you try to narrow down the possible change of configurations? >> >> Davies >> >>> On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser <kras...@gmail.com> wrote: >>> Hey Davies, >>> >>> Here are some more details on a configuration that causes this error for me. >>> Launch an AWS Spark EMR cluster as follows: >>> >>> aws emr create-cluster --region us-west-1 --no-auto-terminate \ >>> >>> --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ >>> >>> --bootstrap-actions >>> Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \ >>> >>> --ami-version 3.3 --instance-groups >>> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ >>> >>> InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name >>> "Spark Issue Repro" \ >>> >>> --visible-to-all-users --applications Name=Ganglia >>> >>> This is a 10 node cluster (not sure if this makes a difference outside of >>> HDFS block locality). Then use this Gist here as your spark-defaults file >>> (it'll configure 2 executors per job as well): >>> https://gist.github.com/skrasser/9b978d3d572735298d16 >>> >>> With that, I am seeing this again: >>> >>> 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] >>> executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in >>> stage 0.0 (TID 27) >>> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >>> List([B@4cfae71c) >>> >>> Thanks for the performance pointers -- the repro script is fairly unpolished >>> (just enough to cause the aforementioned exception). >>> >>> Hope this sheds some light on the error. From what I can tell so far, >>> something in the spark-defaults file triggers it (with other settings it >>> completes just fine). >>> >>> Thanks for your help! >>> -Sven >>> >>> >>>> On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu <dav...@databricks.com> wrote: >>>> >>>> I still can not reproduce it with 2 nodes (4 CPUs). >>>> >>>> Your repro.py could be faster (10 min) than before (22 min): >>>> >>>> inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or >>>> 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): >>>> pc==3).collect() >>>> >>>> (also, no cache needed anymore) >>>> >>>> Davies >>>> >>>> >>>> >>>>> On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser <kras...@gmail.com> wrote: >>>>> The issue has been sensitive to the number of executors and input data >>>>> size. >>>>> I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of >>>>> memory >>>>> overhead for YARN. This will fit onto Amazon r3 instance types. >>>>> -Sven >>>>> >>>>> On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu <dav...@databricks.com> >>>>> wrote: >>>>>> >>>>>> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not >>>>>> reproduce your failure. Should I test it with big memory node? >>>>>> >>>>>>> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser <kras...@gmail.com> wrote: >>>>>>> Thanks for the input! I've managed to come up with a repro of the >>>>>>> error >>>>>>> with >>>>>>> test data only (and without any of the custom code in the original >>>>>>> script), >>>>>>> please see here: >>>>>>> >>>>>>> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md >>>>>>> >>>>>>> The Gist contains a data generator and the script reproducing the >>>>>>> error >>>>>>> (plus driver and executor logs). If I run using full cluster capacity >>>>>>> (32 >>>>>>> executors with 28GB), there are no issues. If I run on only two, the >>>>>>> error >>>>>>> appears again and the job fails: >>>>>>> >>>>>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >>>>>>> List([B@294b55b7) >>>>>>> >>>>>>> >>>>>>> Any thoughts or any obvious problems you can spot by any chance? >>>>>>> >>>>>>> Thank you! >>>>>>> -Sven >>>>>>> >>>>>>> On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen <rosenvi...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> It doesn’t seem like there’s a whole lot of clues to go on here >>>>>>>> without >>>>>>>> seeing the job code. The original "org.apache.spark.SparkException: >>>>>>>> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests >>>>>>>> that >>>>>>>> maybe >>>>>>>> there’s an issue with PySpark’s serialization / tracking of types, >>>>>>>> but >>>>>>>> it’s >>>>>>>> hard to say from this error trace alone. >>>>>>>> >>>>>>>> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hey Josh, >>>>>>>> >>>>>>>> I am still trying to prune this to a minimal example, but it has >>>>>>>> been >>>>>>>> tricky since scale seems to be a factor. The job runs over ~720GB of >>>>>>>> data >>>>>>>> (the cluster's total RAM is around ~900GB, split across 32 >>>>>>>> executors). >>>>>>>> I've >>>>>>>> managed to run it over a vastly smaller data set without issues. >>>>>>>> Curiously, >>>>>>>> when I run it over slightly smaller data set of ~230GB (using >>>>>>>> sort-based >>>>>>>> shuffle), my job also fails, but I see no shuffle errors in the >>>>>>>> executor >>>>>>>> logs. All I see is the error below from the driver (this is also >>>>>>>> what >>>>>>>> the >>>>>>>> driver prints when erroring out on the large data set, but I assumed >>>>>>>> the >>>>>>>> executor errors to be the root cause). >>>>>>>> >>>>>>>> Any idea on where to look in the interim for more hints? I'll >>>>>>>> continue >>>>>>>> to >>>>>>>> try to get to a minimal repro. >>>>>>>> >>>>>>>> 2014-12-30 21:35:34,539 INFO >>>>>>>> [sparkDriver-akka.actor.default-dispatcher-14] >>>>>>>> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - >>>>>>>> Asked >>>>>>>> to >>>>>>>> send map output locations for shuffle 0 to >>>>>>>> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 >>>>>>>> 2014-12-30 21:35:39,512 INFO >>>>>>>> [sparkDriver-akka.actor.default-dispatcher-17] >>>>>>>> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - >>>>>>>> Asked >>>>>>>> to >>>>>>>> send map output locations for shuffle 0 to >>>>>>>> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 >>>>>>>> 2014-12-30 21:35:58,893 WARN >>>>>>>> [sparkDriver-akka.actor.default-dispatcher-16] >>>>>>>> remote.ReliableDeliverySupervisor >>>>>>>> (Slf4jLogger.scala:apply$mcV$sp(71)) >>>>>>>> - >>>>>>>> Association with remote system >>>>>>>> >>>>>>>> >>>>>>>> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] >>>>>>>> has >>>>>>>> failed, address is now gated for [5000] ms. Reason is: >>>>>>>> [Disassociated]. >>>>>>>> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] >>>>>>>> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - >>>>>>>> Yarn >>>>>>>> application has already exited with state FINISHED! >>>>>>>> 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] >>>>>>>> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped >>>>>>>> o.e.j.s.ServletContextHandler{/stages/stage/kill,null} >>>>>>>> >>>>>>>> [...] >>>>>>>> >>>>>>>> 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] >>>>>>>> ui.SparkUI >>>>>>>> (Logging.scala:logInfo(59)) - Stopped Spark web UI at >>>>>>>> http://ip-10-20-80-37.us-west-1.compute.internal:4040 >>>>>>>> 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] >>>>>>>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping >>>>>>>> DAGScheduler >>>>>>>> 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] >>>>>>>> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - >>>>>>>> Shutting >>>>>>>> down all executors >>>>>>>> 2014-12-30 21:35:59,132 INFO >>>>>>>> [sparkDriver-akka.actor.default-dispatcher-14] >>>>>>>> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - >>>>>>>> Asking >>>>>>>> each >>>>>>>> executor to shut down >>>>>>>> 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler >>>>>>>> (Logging.scala:logInfo(59)) - Job 1 failed: collect at >>>>>>>> /home/hadoop/test_scripts/test.py:63, took 980.751936 s >>>>>>>> Traceback (most recent call last): >>>>>>>> File "/home/hadoop/test_scripts/test.py", line 63, in <module> >>>>>>>> result = j.collect() >>>>>>>> File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in >>>>>>>> collect >>>>>>>> bytesInJava = self._jrdd.collect().iterator() >>>>>>>> File >>>>>>>> >>>>>>>> >>>>>>>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", >>>>>>>> line 538, in __call__ >>>>>>>> File >>>>>>>> >>>>>>>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", >>>>>>>> line >>>>>>>> 300, in get_return_value >>>>>>>> py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn >>>>>>>> application >>>>>>>> state monitor] cluster.YarnClientSchedulerBackend >>>>>>>> (Logging.scala:logInfo(59)) - Stopped >>>>>>>> : An error occurred while calling o117.collect. >>>>>>>> : org.apache.spark.SparkException: Job cancelled because >>>>>>>> SparkContext >>>>>>>> was >>>>>>>> shut down >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701) >>>>>>>> at >>>>>>>> scala.collection.mutable.HashSet.foreach(HashSet.scala:79) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428) >>>>>>>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) >>>>>>>> at akka.actor.ActorCell.terminate(ActorCell.scala:369) >>>>>>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) >>>>>>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >>>>>>>> at >>>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) >>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>> >>>>>>>> >>>>>>>> Thank you! >>>>>>>> -Sven >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Dec 30, 2014 at 12:15 PM, Josh Rosen <rosenvi...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Sven, >>>>>>>>> >>>>>>>>> Do you have a small example program that you can share which will >>>>>>>>> allow >>>>>>>>> me to reproduce this issue? If you have a workload that runs into >>>>>>>>> this, you >>>>>>>>> should be able to keep iteratively simplifying the job and reducing >>>>>>>>> the data >>>>>>>>> set size until you hit a fairly minimal reproduction (assuming the >>>>>>>>> issue is >>>>>>>>> deterministic, which it sounds like it is). >>>>>>>>> >>>>>>>>> On Tue, Dec 30, 2014 at 9:49 AM, Sven Krasser <kras...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hey all, >>>>>>>>>> >>>>>>>>>> Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 >>>>>>>>>> fails >>>>>>>>>> during shuffle. I've tried reverting from the sort-based shuffle >>>>>>>>>> back >>>>>>>>>> to the >>>>>>>>>> hash one, and that fails as well. Does anyone see similar problems >>>>>>>>>> or >>>>>>>>>> has an >>>>>>>>>> idea on where to look next? >>>>>>>>>> >>>>>>>>>> For the sort-based shuffle I get a bunch of exception like this in >>>>>>>>>> the >>>>>>>>>> executor logs: >>>>>>>>>> >>>>>>>>>> 2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2] >>>>>>>>>> executor.Executor (Logging.scala:logError(96)) - Exception in task >>>>>>>>>> 4523.0 in >>>>>>>>>> stage 1.0 (TID 4524) >>>>>>>>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >>>>>>>>>> List([B@130dc7ad) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> For the hash-based shuffle, there are now a bunch of these >>>>>>>>>> exceptions >>>>>>>>>> in >>>>>>>>>> the logs: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0] >>>>>>>>>> executor.Executor (Logging.scala:logError(96)) - Exception in task >>>>>>>>>> 4479.0 in >>>>>>>>>> stage 1.0 (TID 4480) >>>>>>>>>> java.io.FileNotFoundException: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0 >>>>>>>>>> (No such file or directory) >>>>>>>>>> at java.io.FileOutputStream.open(Native Method) >>>>>>>>>> at >>>>>>>>>> java.io.FileOutputStream.<init>(FileOutputStream.java:221) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) >>>>>>>>>> at >>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>>>>> at >>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thank you! >>>>>>>>>> -Sven >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> http://sites.google.com/site/krasser/?utm_source=sig >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> http://sites.google.com/site/krasser/?utm_source=sig >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> http://sites.google.com/site/krasser/?utm_source=sig >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> http://sites.google.com/site/krasser/?utm_source=sig >>> >>> >>> >>> >>> -- >>> http://sites.google.com/site/krasser/?utm_source=sig -- http://sites.google.com/site/krasser/?utm_source=sig --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org