The errors are occurring in the exact same time in the job as well......right at the end of the groupByKey() when 5 tasks are left.
On Thu, Aug 14, 2014 at 12:59 PM, Arpan Ghosh <ar...@automatic.com> wrote: > Hi Davies, > > I tried the second option and launched my ec2 cluster with master on all > the slaves by providing the latest commit hash of master as the > "--spark-version" option to the spark-ec2 script. However, I am getting the > same errors as before. I am running the job with the original > spark-defaults.conf and spark-env.conf > > > java.net.SocketException: Connection reset > at java.net.SocketInputStream.read(SocketInputStream.java:196) > at java.net.SocketInputStream.read(SocketInputStream.java:122) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) > at java.io.BufferedInputStream.read(BufferedInputStream.java:254) > at java.io.DataInputStream.readInt(DataInputStream.java: > 387) > at > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:101) > at > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > at org.apache.spark.scheduler.Task.run(Task.scala:54) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 14/08/14 19:48:54 ERROR python.PythonRDD: This may have been caused by a > prior exception: > java.net.SocketException: Broken pipe > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:329) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:327) > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1249) > at > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) > > 14/08/14 19:48:54 ERROR executor.Executor: Exception in task 1112.0 in stage > 0.0 (TID 3513) > java.net.SocketException: Broken pipe > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:329) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:327) > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) > at > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1249) > at > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) > > > 14/08/14 19:48:53 ERROR executor.Executor: Exception in task 315.0 in stage > 0.0 (TID 2716) > java.net.SocketException: Connection reset > at java.net.SocketInputStream.read(SocketInputStream.java:196) > at java.net.SocketInputStream.read(SocketInputStream.java:122) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) > at java.io.BufferedInputStream.read(BufferedInputStream.java:254) > at java.io.DataInputStream.readInt(DataInputStream.java:387) > at > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:101) > at > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > at org.apache.spark.scheduler.Task.run(Task.scala:54) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > On Wed, Aug 13, 2014 at 5:07 PM, Davies Liu <dav...@databricks.com> wrote: > >> For the hottest key, it will need about 1-2 GB memory for Python >> worker to do groupByKey(). >> >> These configurations can not help with the memory of Python worker. >> >> So, two options: >> >> 1) use reduceByKey() or combineByKey() to reduce the memory >> consumption in Python worker. >> 2) try master or 1.1 branch with the feature of spilling in Python. >> >> Davies >> >> On Wed, Aug 13, 2014 at 4:08 PM, Arpan Ghosh <ar...@automatic.com> wrote: >> > Here are the biggest keys: >> > >> > [ (17634, 87874097), >> > >> > (8407, 38395833), >> > >> > (20092, 14403311), >> > >> > (9295, 4142636), >> > >> > (14359, 3129206), >> > >> > (13051, 2608708), >> > >> > (14133, 2073118), >> > >> > (4571, 2053514), >> > >> > (16175, 2021669), >> > >> > (5268, 1908557), >> > >> > (3669, 1687313), >> > >> > (14051, 1628416), >> > >> > (19660, 1619860), >> > >> > (10206, 1546037), >> > >> > (3740, 1527272), >> > >> > (426, 1522788), >> > >> > >> > Should I try to increase spark.shuffle.memoryFraction and decrease >> > spark.storage.memoryFraction ? >> > >> > >> > >> > On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu <dav...@databricks.com> >> wrote: >> >> >> >> Arpan, >> >> >> >> Which version of Spark are you using? Could you try the master or 1.1 >> >> branch? which can spill the data into disk during groupByKey(). >> >> >> >> PS: it's better to use reduceByKey() or combineByKey() to reduce data >> >> size during shuffle. >> >> >> >> Maybe there is a huge key in the data sets, you can find it in this >> way: >> >> >> >> rdd.countByKey().sortBy(lambda x:x[1], False).take(10) >> >> >> >> Davies >> >> >> >> >> >> On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh <ar...@automatic.com> >> wrote: >> >> > Hi, >> >> > >> >> > Let me begin by describing my Spark setup on EC2 (launched using the >> >> > provided spark-ec2.py script): >> >> > >> >> > 100 c3.2xlarge workers (8 cores & 15GB memory each) >> >> > 1 c3.2xlarge Master (only running master daemon) >> >> > Spark 1.0.2 >> >> > 8GB mounted at / & 80 GB mounted at /mnt >> >> > >> >> > spark-defaults.conf (A lot of config options have been added here to >> try >> >> > and >> >> > fix the problem. I also encounter the problem while running with the >> >> > default >> >> > options) >> >> > >> >> > spark.executor.memory 12991m >> >> > spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ >> >> > spark.executor.extraClassPath /root/ephemeral-hdfs/conf >> >> > spark.shuffle.file.buffer.kb 1024 >> >> > spark.reducer.maxMbInFlight 96 >> >> > spark.serializer.objectStreamReset 100000 >> >> > spark.akka.frameSize 100 >> >> > spark.akka.threads 32 >> >> > spark.akka.timeout 1000 >> >> > spark.serializer org.apache.spark.serializer.KryoSerializer >> >> > >> >> > spark-env.sh (A lot of config options have been added here to try and >> >> > fix >> >> > the problem. I also encounter the problem while running with the >> default >> >> > options) >> >> > >> >> > export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark" >> >> > export SPARK_MASTER_OPTS="-Dspark.worker.timeout=900" >> >> > export SPARK_WORKER_INSTANCES=1 >> >> > export SPARK_WORKER_CORES=8 >> >> > export HADOOP_HOME="/root/ephemeral-hdfs" >> >> > export SPARK_MASTER_IP=<Master's Public DNS, as added by spark-ec2.py >> >> > script> >> >> > export MASTER=`cat /root/spark-ec2/cluster-url` >> >> > export >> >> > >> >> > >> SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/" >> >> > export >> >> > >> >> > >> SPARK_SUBMIT_CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf" >> >> > export SPARK_PUBLIC_DNS=<wget command to get the public hostname, as >> >> > added >> >> > by spark-ec2.py script> >> >> > >> >> > # Set a high ulimit for large shuffles >> >> > >> >> > ulimit -n 10000000 >> >> > >> >> > >> >> > I am trying to run a very simple Job which reads in CSV data (~ 124 >> GB) >> >> > from >> >> > a S3 bucket, tries to group it based on a key and counts the number >> of >> >> > groups. The number of partitions for the input textFile() is set to >> 1600 >> >> > and >> >> > the number of partitions for the groupByKey() operation is also 1600 >> >> > >> >> > conf = SparkConf().setAppName(JOB_NAME).setMaster(master) >> >> > sc = SparkContext(conf=sconf) >> >> > >> >> > drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions) >> >> > >> >> > >> >> > drive_grouped_by_user_vin_and_week = >> >> > drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\ >> >> > >> >> > .groupByKey(numPartitions=user_vin_week_group_partitions)\ >> >> > >> >> > .count() >> >> > >> >> > >> >> > Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159 >> >> > seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out >> of >> >> > which 1595 complete in under a minute. The same 5 TIDs consistently >> fail >> >> > with the following errors in the logs of their respective Executors: >> >> > >> >> > 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203 >> >> > >> >> > org.apache.spark.SparkException: Python worker exited unexpectedly >> >> > (crashed) >> >> > >> >> > at >> >> > >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141) >> >> > >> >> > at >> >> > >> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) >> >> > >> >> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) >> >> > >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >> > >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >> > >> >> > at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >> >> > >> >> > at org.apache.spark.scheduler.Task.run(Task.scala:51) >> >> > >> >> > at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) >> >> > >> >> > at >> >> > >> >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> > >> >> > at >> >> > >> >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> > >> >> > at java.lang.Thread.run(Thread.java:745) >> >> > >> >> > Caused by: java.io.EOFException >> >> > >> >> > at java.io.DataInputStream.readInt(DataInputStream.java:392) >> >> > >> >> > at >> >> > >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92) >> >> > >> >> > ... 10 more >> >> > >> >> > 14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited >> >> > unexpectedly >> >> > (crashed) >> >> > >> >> > java.net.SocketException: Connection reset >> >> > >> >> > at java.net.SocketInputStream.read(SocketInputStream.java:196) >> >> > >> >> > at java.net.SocketInputStream.read(SocketInputStream.java:122) >> >> > >> >> > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) >> >> > >> >> > at java.io.BufferedInputStream.read(BufferedInputStream.java:254) >> >> > >> >> > at java.io.DataInputStream.readInt(DataInputStream.java:387) >> >> > >> >> > at >> >> > >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92) >> >> > >> >> > at >> >> > >> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) >> >> > >> >> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) >> >> > >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >> > >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >> > >> >> > at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >> >> > >> >> > at org.apache.spark.scheduler.Task.run(Task.scala:51) >> >> > >> >> > at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) >> >> > >> >> > at >> >> > >> >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> > >> >> > at >> >> > >> >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> > >> >> > at java.lang.Thread.run(Thread.java:745) >> >> > >> >> > 14/08/13 02:45:30 ERROR python.PythonRDD: This may have been caused >> by a >> >> > prior exception: >> >> > >> >> > java.net.SocketException: Broken pipe >> >> > >> >> > at java.net.SocketOutputStream.socketWrite0(Native Method) >> >> > >> >> > at >> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) >> >> > >> >> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) >> >> > >> >> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) >> >> > >> >> > at java.io.DataOutputStream.write(DataOutputStream.java:107) >> >> > >> >> > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298) >> >> > >> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> > >> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) >> >> > >> >> > at >> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) >> >> > >> >> > 14/08/13 02:45:30 ERROR executor.Executor: Exception in task ID 2840 >> >> > >> >> > java.net.SocketException: Broken pipe >> >> > >> >> > at java.net.SocketOutputStream.socketWrite0(Native Method) >> >> > >> >> > at >> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) >> >> > >> >> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) >> >> > >> >> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) >> >> > >> >> > at java.io.DataOutputStream.write(DataOutputStream.java:107) >> >> > >> >> > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298) >> >> > >> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> > >> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) >> >> > >> >> > at >> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) >> >> > >> >> > >> >> > The final error reported to the driver program is: >> >> > >> >> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage >> 0 >> >> > >> >> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Stage 0 was >> >> > cancelled >> >> > >> >> > 14/08/13 19:03:43 INFO scheduler.DAGScheduler: Failed to run count at >> >> > /root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py:122 >> >> > >> >> > Traceback (most recent call last): >> >> > >> >> > File >> >> > "/root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py", >> >> > line 122, in <module> >> >> > >> >> > .groupByKey(numPartitions=user_vin_week_group_partitions)\ >> >> > >> >> > File "/root/spark/python/pyspark/rdd.py", line 737, in count >> >> > >> >> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() >> >> > >> >> > File "/root/spark/python/pyspark/rdd.py", line 728, in sum >> >> > >> >> > return self.mapPartitions(lambda x: >> [sum(x)]).reduce(operator.add) >> >> > >> >> > File "/root/spark/python/pyspark/rdd.py", line 648, in reduce >> >> > >> >> > vals = self.mapPartitions(func).collect() >> >> > >> >> > File "/root/spark/python/pyspark/rdd.py", line 612, in collect >> >> > >> >> > bytesInJava = self._jrdd.collect().iterator() >> >> > >> >> > File >> "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", >> >> > line 537, in __call__ >> >> > >> >> > File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", >> >> > line >> >> > 300, in get_return_value >> >> > >> >> > py4j.protocol.Py4JJavaError: An error occurred while calling >> >> > o45.collect. >> >> > >> >> > : org.apache.spark.SparkException: Job aborted due to stage failure: >> >> > Task >> >> > 0.0:602 failed 4 times, most recent failure: Exception failure in TID >> >> > 3212 >> >> > on host ip-10-146-221-202.ec2.internal: java.net.SocketException: >> Broken >> >> > pipe >> >> > >> >> > java.net.SocketOutputStream.socketWrite0(Native Method) >> >> > >> >> > >> >> > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) >> >> > >> >> > >> java.net.SocketOutputStream.write(SocketOutputStream.java:159) >> >> > >> >> > >> >> > java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) >> >> > >> >> > java.io.DataOutputStream.write(DataOutputStream.java:107) >> >> > >> >> > java.io.FilterOutputStream.write(FilterOutputStream.java:97) >> >> > >> >> > >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300) >> >> > >> >> > >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298) >> >> > >> >> > scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> > >> >> > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> > >> >> > >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298) >> >> > >> >> > >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) >> >> > >> >> > >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) >> >> > >> >> > >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175) >> >> > >> >> > >> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) >> >> > >> >> > >> >> > >> >> > >> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) >> >> > >> >> > Driver stacktrace: >> >> > >> >> > at >> >> > >> >> > org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >> >> > >> >> > at >> >> > >> >> > >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> > >> >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> >> > >> >> > at scala.Option.foreach(Option.scala:236) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >> >> > >> >> > at >> >> > >> >> > >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >> >> > >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> >> > >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> >> > >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> >> > >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> >> > >> >> > at >> >> > >> >> > >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> >> > >> >> > at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> > >> >> > at >> >> > >> >> > >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> > >> >> > at >> >> > >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> > >> >> > at >> >> > >> >> > >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> > >> >> > >> >> > I also noticed some AssociationError's in the log of each Worker (in >> >> > /root/spark/logs): >> >> > >> >> > 14/08/13 19:03:44 ERROR remote.EndpointWriter: AssociationError >> >> > [akka.tcp://sparkWorker@ip-10-142-182-124.ec2.internal:57142] -> >> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]: >> Error >> >> > [Association failed with >> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]] [ >> >> > >> >> > akka.remote.EndpointAssociationException: Association failed with >> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159] >> >> > >> >> > Caused by: >> >> > >> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: >> >> > Connection refused: ip-10-142-182-124.ec2.internal/ >> 10.142.182.124:51159] >> >> > >> >> > >> >> > It looks like the error is occurring during the shuffle when the >> reduce >> >> > tasks are trying to fetch their corresponding map outputs and the >> >> > connection >> >> > over which they are fetching this data is getting reset or >> prematurely >> >> > terminated. This Job runs fine when I run it on the same setup with a >> >> > smaller dataset (~ 62 GB). I am unable to debug this further. Any >> help >> >> > would >> >> > be appreciated. >> >> > >> >> > Thanks >> >> > >> >> > Arpan >> >> > >> >> > >> > >> > >> > >