Hi Davies,

I tried the second option and launched my ec2 cluster with master on all
the slaves by providing the latest commit hash of master as the
"--spark-version" option to the spark-ec2 script. However, I am getting the
same errors as before. I am running the job with the original
spark-defaults.conf and spark-env.conf

java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:196)
        at java.net.SocketInputStream.read(SocketInputStream.java:122)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:101)
        at 
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
14/08/14 19:48:54 ERROR python.PythonRDD: This may have been caused by
a prior exception:
java.net.SocketException: Broken pipe
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:329)
        at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:327)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1249)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)

14/08/14 19:48:54 ERROR executor.Executor: Exception in task 1112.0 in
stage 0.0 (TID 3513)
java.net.SocketException: Broken pipe
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:329)
        at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:327)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1249)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)


14/08/14 19:48:53 ERROR executor.Executor: Exception in task 315.0 in
stage 0.0 (TID 2716)
java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:196)
        at java.net.SocketInputStream.read(SocketInputStream.java:122)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:101)
        at 
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)



On Wed, Aug 13, 2014 at 5:07 PM, Davies Liu <dav...@databricks.com> wrote:

> For the hottest key, it will need about 1-2 GB memory for Python
> worker to do groupByKey().
>
> These configurations can not help with the memory of Python worker.
>
> So, two options:
>
> 1) use reduceByKey() or combineByKey() to reduce the memory
> consumption in Python worker.
> 2) try master or 1.1 branch with the feature of spilling in Python.
>
> Davies
>
> On Wed, Aug 13, 2014 at 4:08 PM, Arpan Ghosh <ar...@automatic.com> wrote:
> > Here are the biggest keys:
> >
> > [   (17634, 87874097),
> >
> >     (8407, 38395833),
> >
> >     (20092, 14403311),
> >
> >     (9295, 4142636),
> >
> >     (14359, 3129206),
> >
> >     (13051, 2608708),
> >
> >     (14133, 2073118),
> >
> >     (4571, 2053514),
> >
> >     (16175, 2021669),
> >
> >     (5268, 1908557),
> >
> >     (3669, 1687313),
> >
> >     (14051, 1628416),
> >
> >     (19660, 1619860),
> >
> >     (10206, 1546037),
> >
> >     (3740, 1527272),
> >
> >     (426, 1522788),
> >
> >
> > Should I try to increase spark.shuffle.memoryFraction and decrease
> > spark.storage.memoryFraction ?
> >
> >
> >
> > On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu <dav...@databricks.com>
> wrote:
> >>
> >> Arpan,
> >>
> >> Which version of Spark are you using? Could you try the master or 1.1
> >> branch? which can spill the data into disk during groupByKey().
> >>
> >> PS: it's better to use reduceByKey() or combineByKey() to reduce data
> >> size during shuffle.
> >>
> >> Maybe there is a huge key in the data sets, you can find it in this way:
> >>
> >> rdd.countByKey().sortBy(lambda x:x[1], False).take(10)
> >>
> >> Davies
> >>
> >>
> >> On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh <ar...@automatic.com>
> wrote:
> >> > Hi,
> >> >
> >> > Let me begin by describing my Spark setup on EC2 (launched using the
> >> > provided spark-ec2.py script):
> >> >
> >> > 100 c3.2xlarge workers (8 cores & 15GB memory each)
> >> > 1 c3.2xlarge Master (only running master daemon)
> >> > Spark 1.0.2
> >> > 8GB mounted at / & 80 GB mounted at /mnt
> >> >
> >> > spark-defaults.conf (A lot of config options have been added here to
> try
> >> > and
> >> > fix the problem. I also encounter the problem while running with the
> >> > default
> >> > options)
> >> >
> >> > spark.executor.memory   12991m
> >> > spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
> >> > spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
> >> > spark.shuffle.file.buffer.kb    1024
> >> > spark.reducer.maxMbInFlight     96
> >> > spark.serializer.objectStreamReset      100000
> >> > spark.akka.frameSize    100
> >> > spark.akka.threads      32
> >> > spark.akka.timeout      1000
> >> > spark.serializer        org.apache.spark.serializer.KryoSerializer
> >> >
> >> > spark-env.sh (A lot of config options have been added here to try and
> >> > fix
> >> > the problem. I also encounter the problem while running with the
> default
> >> > options)
> >> >
> >> > export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark"
> >> > export SPARK_MASTER_OPTS="-Dspark.worker.timeout=900"
> >> > export SPARK_WORKER_INSTANCES=1
> >> > export SPARK_WORKER_CORES=8
> >> > export HADOOP_HOME="/root/ephemeral-hdfs"
> >> > export SPARK_MASTER_IP=<Master's Public DNS, as added by spark-ec2.py
> >> > script>
> >> > export MASTER=`cat /root/spark-ec2/cluster-url`
> >> > export
> >> >
> >> >
> SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/"
> >> > export
> >> >
> >> >
> SPARK_SUBMIT_CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf"
> >> > export SPARK_PUBLIC_DNS=<wget command to get the public hostname, as
> >> > added
> >> > by spark-ec2.py script>
> >> >
> >> > # Set a high ulimit for large shuffles
> >> >
> >> > ulimit -n 10000000
> >> >
> >> >
> >> > I am trying to run a very simple Job which reads in CSV data (~ 124
> GB)
> >> > from
> >> > a S3 bucket, tries to group it based on a key and counts the number of
> >> > groups. The number of partitions for the input textFile() is set to
> 1600
> >> > and
> >> > the number of partitions for the groupByKey() operation is also 1600
> >> >
> >> > conf = SparkConf().setAppName(JOB_NAME).setMaster(master)
> >> > sc = SparkContext(conf=sconf)
> >> >
> >> > drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)
> >> >
> >> >
> >> > drive_grouped_by_user_vin_and_week =
> >> > drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\
> >> >
> >> >         .groupByKey(numPartitions=user_vin_week_group_partitions)\
> >> >
> >> >         .count()
> >> >
> >> >
> >> > Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159
> >> > seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out
> of
> >> > which 1595 complete in under a minute. The same 5 TIDs consistently
> fail
> >> > with the following errors in the logs of their respective Executors:
> >> >
> >> > 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203
> >> >
> >> > org.apache.spark.SparkException: Python worker exited unexpectedly
> >> > (crashed)
> >> >
> >> > at
> >> >
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141)
> >> >
> >> > at
> >> >
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
> >> >
> >> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
> >> >
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >> >
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >> >
> >> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> >> >
> >> > at org.apache.spark.scheduler.Task.run(Task.scala:51)
> >> >
> >> > at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> >> >
> >> > at
> >> >
> >> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> >
> >> > at
> >> >
> >> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> >
> >> > at java.lang.Thread.run(Thread.java:745)
> >> >
> >> > Caused by: java.io.EOFException
> >> >
> >> > at java.io.DataInputStream.readInt(DataInputStream.java:392)
> >> >
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)
> >> >
> >> > ... 10 more
> >> >
> >> > 14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited
> >> > unexpectedly
> >> > (crashed)
> >> >
> >> > java.net.SocketException: Connection reset
> >> >
> >> > at java.net.SocketInputStream.read(SocketInputStream.java:196)
> >> >
> >> > at java.net.SocketInputStream.read(SocketInputStream.java:122)
> >> >
> >> > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> >> >
> >> > at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
> >> >
> >> > at java.io.DataInputStream.readInt(DataInputStream.java:387)
> >> >
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)
> >> >
> >> > at
> >> >
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
> >> >
> >> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
> >> >
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >> >
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >> >
> >> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> >> >
> >> > at org.apache.spark.scheduler.Task.run(Task.scala:51)
> >> >
> >> > at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> >> >
> >> > at
> >> >
> >> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> >
> >> > at
> >> >
> >> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> >
> >> > at java.lang.Thread.run(Thread.java:745)
> >> >
> >> > 14/08/13 02:45:30 ERROR python.PythonRDD: This may have been caused
> by a
> >> > prior exception:
> >> >
> >> > java.net.SocketException: Broken pipe
> >> >
> >> > at java.net.SocketOutputStream.socketWrite0(Native Method)
> >> >
> >> > at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> >> >
> >> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
> >> >
> >> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> >> >
> >> > at java.io.DataOutputStream.write(DataOutputStream.java:107)
> >> >
> >> > at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
> >> >
> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> >
> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> >> >
> >> > at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
> >> >
> >> > 14/08/13 02:45:30 ERROR executor.Executor: Exception in task ID 2840
> >> >
> >> > java.net.SocketException: Broken pipe
> >> >
> >> > at java.net.SocketOutputStream.socketWrite0(Native Method)
> >> >
> >> > at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> >> >
> >> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
> >> >
> >> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> >> >
> >> > at java.io.DataOutputStream.write(DataOutputStream.java:107)
> >> >
> >> > at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
> >> >
> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> >
> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> >> >
> >> > at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
> >> >
> >> >
> >> > The final error reported to the driver program is:
> >> >
> >> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
> >> >
> >> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Stage 0 was
> >> > cancelled
> >> >
> >> > 14/08/13 19:03:43 INFO scheduler.DAGScheduler: Failed to run count at
> >> > /root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py:122
> >> >
> >> > Traceback (most recent call last):
> >> >
> >> >   File
> >> > "/root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py",
> >> > line 122, in <module>
> >> >
> >> >     .groupByKey(numPartitions=user_vin_week_group_partitions)\
> >> >
> >> >   File "/root/spark/python/pyspark/rdd.py", line 737, in count
> >> >
> >> >     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> >> >
> >> >   File "/root/spark/python/pyspark/rdd.py", line 728, in sum
> >> >
> >> >     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
> >> >
> >> >   File "/root/spark/python/pyspark/rdd.py", line 648, in reduce
> >> >
> >> >     vals = self.mapPartitions(func).collect()
> >> >
> >> >   File "/root/spark/python/pyspark/rdd.py", line 612, in collect
> >> >
> >> >     bytesInJava = self._jrdd.collect().iterator()
> >> >
> >> >   File
> "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> >> > line 537, in __call__
> >> >
> >> >   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
> >> > line
> >> > 300, in get_return_value
> >> >
> >> > py4j.protocol.Py4JJavaError: An error occurred while calling
> >> > o45.collect.
> >> >
> >> > : org.apache.spark.SparkException: Job aborted due to stage failure:
> >> > Task
> >> > 0.0:602 failed 4 times, most recent failure: Exception failure in TID
> >> > 3212
> >> > on host ip-10-146-221-202.ec2.internal: java.net.SocketException:
> Broken
> >> > pipe
> >> >
> >> >         java.net.SocketOutputStream.socketWrite0(Native Method)
> >> >
> >> >
> >> > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> >> >
> >> >         java.net.SocketOutputStream.write(SocketOutputStream.java:159)
> >> >
> >> >
> >> > java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> >> >
> >> >         java.io.DataOutputStream.write(DataOutputStream.java:107)
> >> >
> >> >         java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> >> >
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
> >> >
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
> >> >
> >> >         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> >
> >> >         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> >
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
> >> >
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
> >> >
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> >> >
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> >> >
> >> >
> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
> >> >
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
> >> >
> >> > Driver stacktrace:
> >> >
> >> > at
> >> >
> >> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
> >> >
> >> > at
> >> >
> >> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> >
> >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
> >> >
> >> > at scala.Option.foreach(Option.scala:236)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
> >> >
> >> > at
> >> >
> >> >
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
> >> >
> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >> >
> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >> >
> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >> >
> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> >
> >> > at
> >> >
> >> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >> >
> >> > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >
> >> > at
> >> >
> >> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> >
> >> > at
> >> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >
> >> > at
> >> >
> >> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >
> >> >
> >> > I also noticed some AssociationError's in the log of each Worker (in
> >> > /root/spark/logs):
> >> >
> >> > 14/08/13 19:03:44 ERROR remote.EndpointWriter: AssociationError
> >> > [akka.tcp://sparkWorker@ip-10-142-182-124.ec2.internal:57142] ->
> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]:
> Error
> >> > [Association failed with
> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]] [
> >> >
> >> > akka.remote.EndpointAssociationException: Association failed with
> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]
> >> >
> >> > Caused by:
> >> >
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> >> > Connection refused: ip-10-142-182-124.ec2.internal/
> 10.142.182.124:51159]
> >> >
> >> >
> >> > It looks like the error is occurring during the shuffle when the
> reduce
> >> > tasks are trying to fetch their corresponding map outputs and the
> >> > connection
> >> > over which they are fetching this data is getting reset or prematurely
> >> > terminated. This Job runs fine when I run it on the same setup with a
> >> > smaller dataset (~ 62 GB). I am unable to debug this further. Any help
> >> > would
> >> > be appreciated.
> >> >
> >> > Thanks
> >> >
> >> > Arpan
> >> >
> >> >
> >
> >
>

Reply via email to