Arpan, Which version of Spark are you using? Could you try the master or 1.1 branch? which can spill the data into disk during groupByKey().
PS: it's better to use reduceByKey() or combineByKey() to reduce data size during shuffle. Maybe there is a huge key in the data sets, you can find it in this way: rdd.countByKey().sortBy(lambda x:x[1], False).take(10) Davies On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh <ar...@automatic.com> wrote: > Hi, > > Let me begin by describing my Spark setup on EC2 (launched using the > provided spark-ec2.py script): > > 100 c3.2xlarge workers (8 cores & 15GB memory each) > 1 c3.2xlarge Master (only running master daemon) > Spark 1.0.2 > 8GB mounted at / & 80 GB mounted at /mnt > > spark-defaults.conf (A lot of config options have been added here to try and > fix the problem. I also encounter the problem while running with the default > options) > > spark.executor.memory 12991m > spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ > spark.executor.extraClassPath /root/ephemeral-hdfs/conf > spark.shuffle.file.buffer.kb 1024 > spark.reducer.maxMbInFlight 96 > spark.serializer.objectStreamReset 100000 > spark.akka.frameSize 100 > spark.akka.threads 32 > spark.akka.timeout 1000 > spark.serializer org.apache.spark.serializer.KryoSerializer > > spark-env.sh (A lot of config options have been added here to try and fix > the problem. I also encounter the problem while running with the default > options) > > export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark" > export SPARK_MASTER_OPTS="-Dspark.worker.timeout=900" > export SPARK_WORKER_INSTANCES=1 > export SPARK_WORKER_CORES=8 > export HADOOP_HOME="/root/ephemeral-hdfs" > export SPARK_MASTER_IP=<Master's Public DNS, as added by spark-ec2.py > script> > export MASTER=`cat /root/spark-ec2/cluster-url` > export > SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/" > export > SPARK_SUBMIT_CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf" > export SPARK_PUBLIC_DNS=<wget command to get the public hostname, as added > by spark-ec2.py script> > > # Set a high ulimit for large shuffles > > ulimit -n 10000000 > > > I am trying to run a very simple Job which reads in CSV data (~ 124 GB) from > a S3 bucket, tries to group it based on a key and counts the number of > groups. The number of partitions for the input textFile() is set to 1600 and > the number of partitions for the groupByKey() operation is also 1600 > > conf = SparkConf().setAppName(JOB_NAME).setMaster(master) > sc = SparkContext(conf=sconf) > > drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions) > > > drive_grouped_by_user_vin_and_week = > drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\ > > .groupByKey(numPartitions=user_vin_week_group_partitions)\ > > .count() > > > Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159 > seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of > which 1595 complete in under a minute. The same 5 TIDs consistently fail > with the following errors in the logs of their respective Executors: > > 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203 > > org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) > > at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141) > > at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) > > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > > at org.apache.spark.scheduler.Task.run(Task.scala:51) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.io.EOFException > > at java.io.DataInputStream.readInt(DataInputStream.java:392) > > at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92) > > ... 10 more > > 14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited unexpectedly > (crashed) > > java.net.SocketException: Connection reset > > at java.net.SocketInputStream.read(SocketInputStream.java:196) > > at java.net.SocketInputStream.read(SocketInputStream.java:122) > > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) > > at java.io.BufferedInputStream.read(BufferedInputStream.java:254) > > at java.io.DataInputStream.readInt(DataInputStream.java:387) > > at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92) > > at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) > > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > > at org.apache.spark.scheduler.Task.run(Task.scala:51) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > 14/08/13 02:45:30 ERROR python.PythonRDD: This may have been caused by a > prior exception: > > java.net.SocketException: Broken pipe > > at java.net.SocketOutputStream.socketWrite0(Native Method) > > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) > > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > > at java.io.DataOutputStream.write(DataOutputStream.java:107) > > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300) > > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298) > > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) > > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) > > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) > > at > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) > > 14/08/13 02:45:30 ERROR executor.Executor: Exception in task ID 2840 > > java.net.SocketException: Broken pipe > > at java.net.SocketOutputStream.socketWrite0(Native Method) > > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) > > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > > at java.io.DataOutputStream.write(DataOutputStream.java:107) > > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300) > > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298) > > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) > > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) > > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) > > at > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) > > > The final error reported to the driver program is: > > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 > > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled > > 14/08/13 19:03:43 INFO scheduler.DAGScheduler: Failed to run count at > /root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py:122 > > Traceback (most recent call last): > > File "/root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py", > line 122, in <module> > > .groupByKey(numPartitions=user_vin_week_group_partitions)\ > > File "/root/spark/python/pyspark/rdd.py", line 737, in count > > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > > File "/root/spark/python/pyspark/rdd.py", line 728, in sum > > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) > > File "/root/spark/python/pyspark/rdd.py", line 648, in reduce > > vals = self.mapPartitions(func).collect() > > File "/root/spark/python/pyspark/rdd.py", line 612, in collect > > bytesInJava = self._jrdd.collect().iterator() > > File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > line 537, in __call__ > > File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line > 300, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling o45.collect. > > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0.0:602 failed 4 times, most recent failure: Exception failure in TID 3212 > on host ip-10-146-221-202.ec2.internal: java.net.SocketException: Broken > pipe > > java.net.SocketOutputStream.socketWrite0(Native Method) > > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > > java.net.SocketOutputStream.write(SocketOutputStream.java:159) > > java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > > java.io.DataOutputStream.write(DataOutputStream.java:107) > > java.io.FilterOutputStream.write(FilterOutputStream.java:97) > > > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300) > > > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298) > > scala.collection.Iterator$class.foreach(Iterator.scala:727) > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298) > > > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) > > > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) > > > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) > > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) > > > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) > > Driver stacktrace: > > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > > at scala.Option.foreach(Option.scala:236) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > I also noticed some AssociationError's in the log of each Worker (in > /root/spark/logs): > > 14/08/13 19:03:44 ERROR remote.EndpointWriter: AssociationError > [akka.tcp://sparkWorker@ip-10-142-182-124.ec2.internal:57142] -> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]: Error > [Association failed with > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]] [ > > akka.remote.EndpointAssociationException: Association failed with > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159] > > Caused by: > akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: > Connection refused: ip-10-142-182-124.ec2.internal/10.142.182.124:51159] > > > It looks like the error is occurring during the shuffle when the reduce > tasks are trying to fetch their corresponding map outputs and the connection > over which they are fetching this data is getting reset or prematurely > terminated. This Job runs fine when I run it on the same setup with a > smaller dataset (~ 62 GB). I am unable to debug this further. Any help would > be appreciated. > > Thanks > > Arpan > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org