Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause).
Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 2014-12-30 21:35:58,893 WARN [sparkDriver-akka.actor.default-dispatcher-16] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn application has already exited with state FINISHED! 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} [...] 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI (Logging.scala:logInfo(59)) - Stopped Spark web UI at http://ip-10-20-80-37.us-west-1.compute.internal:4040 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down all executors 2014-12-30 21:35:59,132 INFO [sparkDriver-akka.actor.default-dispatcher-14] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each executor to shut down 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 1 failed: collect at /home/hadoop/test_scripts/test.py:63, took 980.751936 s Traceback (most recent call last): File "/home/hadoop/test_scripts/test.py", line 63, in <module> result = j.collect() File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect bytesInJava = self._jrdd.collect().iterator() File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Stopped : An error occurred while calling o117.collect. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thank you! -Sven On Tue, Dec 30, 2014 at 12:15 PM, Josh Rosen <rosenvi...@gmail.com> wrote: > Hi Sven, > > Do you have a small example program that you can share which will allow me > to reproduce this issue? If you have a workload that runs into this, you > should be able to keep iteratively simplifying the job and reducing the > data set size until you hit a fairly minimal reproduction (assuming the > issue is deterministic, which it sounds like it is). > > On Tue, Dec 30, 2014 at 9:49 AM, Sven Krasser <kras...@gmail.com> wrote: > >> Hey all, >> >> Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails >> during shuffle. I've tried reverting from the sort-based shuffle back to >> the hash one, and that fails as well. Does anyone see similar problems or >> has an idea on where to look next? >> >> For the sort-based shuffle I get a bunch of exception like this in the >> executor logs: >> >> 2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2] >> executor.Executor (Logging.scala:logError(96)) - Exception in task 4523.0 in >> stage 1.0 (TID 4524) >> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >> List([B@130dc7ad) >> at >> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307) >> at >> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> For the hash-based shuffle, there are now a bunch of these exceptions in the >> logs: >> >> >> 2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0] >> executor.Executor (Logging.scala:logError(96)) - Exception in task 4479.0 in >> stage 1.0 (TID 4480) >> java.io.FileNotFoundException: >> /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0 >> (No such file or directory) >> at java.io.FileOutputStream.open(Native Method) >> at java.io.FileOutputStream.<init>(FileOutputStream.java:221) >> at >> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) >> at >> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) >> at >> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) >> at >> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at >> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> Thank you! >> -Sven >> >> >> >> -- >> http://sites.google.com/site/krasser/?utm_source=sig >> > > -- http://sites.google.com/site/krasser/?utm_source=sig