In Spark (Scala/Java), it will spill the data to disk, but in PySpark,
it will not.

On Wed, Aug 13, 2014 at 2:10 PM, Arpan Ghosh <> wrote:
> So you are saying that in-spite of spark.shuffle.spill being set to true by
> default, version 1.0.2 does not spill data to disk during a groupByKey()?
> On Wed, Aug 13, 2014 at 2:05 PM, Davies Liu <> wrote:
>> The 1.1 release will come out this or next month, we will really
>> appreciate that
>> if you could test it with you real case.
>> Davies
>> On Wed, Aug 13, 2014 at 1:57 PM, Arpan Ghosh <> wrote:
>> > Thanks Davies. I am running Spark 1.0.2 (which seems to be the latest
>> > release)
>> >
>> > I'll try changing it to a reduceByKey() and check the size of the
>> > largest
>> > key and post the results here.
>> >
>> > UPDATE: If I run this job and DO NOT specify the number of partitions
>> > for
>> > the input textFile() (124 GB being read in from S3), Spark launches 41
>> > tasks
>> > for the flatMap(). However, this time, none of the flatMap() tasks
>> > complete
>> > and I start seeing the same "Connection Reset" errors.
>> >
>> >
>> > On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu <>
>> > wrote:
>> >>
>> >> Arpan,
>> >>
>> >> Which version of Spark are you using? Could you try the master or 1.1
>> >> branch? which can spill the data into disk during groupByKey().
>> >>
>> >> PS: it's better to use reduceByKey() or combineByKey() to reduce data
>> >> size during shuffle.
>> >>
>> >> Maybe there is a huge key in the data sets, you can find it in this
>> >> way:
>> >>
>> >> rdd.countByKey().sortBy(lambda x:x[1], False).take(10)
>> >>
>> >> Davies
>> >>
>> >>
>> >> On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh <>
>> >> wrote:
>> >> > Hi,
>> >> >
>> >> > Let me begin by describing my Spark setup on EC2 (launched using the
>> >> > provided script):
>> >> >
>> >> > 100 c3.2xlarge workers (8 cores & 15GB memory each)
>> >> > 1 c3.2xlarge Master (only running master daemon)
>> >> > Spark 1.0.2
>> >> > 8GB mounted at / & 80 GB mounted at /mnt
>> >> >
>> >> > spark-defaults.conf (A lot of config options have been added here to
>> >> > try
>> >> > and
>> >> > fix the problem. I also encounter the problem while running with the
>> >> > default
>> >> > options)
>> >> >
>> >> > spark.executor.memory   12991m
>> >> > spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
>> >> > spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
>> >> > spark.shuffle.file.buffer.kb    1024
>> >> > spark.reducer.maxMbInFlight     96
>> >> > spark.serializer.objectStreamReset      100000
>> >> > spark.akka.frameSize    100
>> >> > spark.akka.threads      32
>> >> > spark.akka.timeout      1000
>> >> > spark.serializer        org.apache.spark.serializer.KryoSerializer
>> >> >
>> >> > (A lot of config options have been added here to try and
>> >> > fix
>> >> > the problem. I also encounter the problem while running with the
>> >> > default
>> >> > options)
>> >> >
>> >> > export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark"
>> >> > export SPARK_MASTER_OPTS="-Dspark.worker.timeout=900"
>> >> > export SPARK_WORKER_CORES=8
>> >> > export HADOOP_HOME="/root/ephemeral-hdfs"
>> >> > export SPARK_MASTER_IP=<Master's Public DNS, as added by
>> >> > script>
>> >> > export MASTER=`cat /root/spark-ec2/cluster-url`
>> >> > export
>> >> >
>> >> >
>> >> > SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/"
>> >> > export
>> >> >
>> >> >
>> >> > export SPARK_PUBLIC_DNS=<wget command to get the public hostname, as
>> >> > added
>> >> > by script>
>> >> >
>> >> > # Set a high ulimit for large shuffles
>> >> >
>> >> > ulimit -n 10000000
>> >> >
>> >> >
>> >> > I am trying to run a very simple Job which reads in CSV data (~ 124
>> >> > GB)
>> >> > from
>> >> > a S3 bucket, tries to group it based on a key and counts the number
>> >> > of
>> >> > groups. The number of partitions for the input textFile() is set to
>> >> > 1600
>> >> > and
>> >> > the number of partitions for the groupByKey() operation is also 1600
>> >> >
>> >> > conf = SparkConf().setAppName(JOB_NAME).setMaster(master)
>> >> > sc = SparkContext(conf=sconf)
>> >> >
>> >> > drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)
>> >> >
>> >> >
>> >> > drive_grouped_by_user_vin_and_week =
>> >> > drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\
>> >> >
>> >> >         .groupByKey(numPartitions=user_vin_week_group_partitions)\
>> >> >
>> >> >         .count()
>> >> >
>> >> >
>> >> > Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159
>> >> > seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out
>> >> > of
>> >> > which 1595 complete in under a minute. The same 5 TIDs consistently
>> >> > fail
>> >> > with the following errors in the logs of their respective Executors:
>> >> >
>> >> > 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203
>> >> >
>> >> > org.apache.spark.SparkException: Python worker exited unexpectedly
>> >> > (crashed)
>> >> >
>> >> > at
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anon$
>> >> >
>> >> > at
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
>> >> >
>> >> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
>> >> >
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >> >
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >> >
>> >> > at
>> >> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> > org.apache.spark.executor.Executor$
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > java.util.concurrent.ThreadPoolExecutor$
>> >> >
>> >> > at
>> >> >
>> >> > Caused by:
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anon$
>> >> >
>> >> > ... 10 more
>> >> >
>> >> > 14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited
>> >> > unexpectedly
>> >> > (crashed)
>> >> >
>> >> > Connection reset
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anon$
>> >> >
>> >> > at
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
>> >> >
>> >> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
>> >> >
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >> >
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >> >
>> >> > at
>> >> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> > org.apache.spark.executor.Executor$
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > java.util.concurrent.ThreadPoolExecutor$
>> >> >
>> >> > at
>> >> >
>> >> > 14/08/13 02:45:30 ERROR python.PythonRDD: This may have been caused
>> >> > by a
>> >> > prior exception:
>> >> >
>> >> > Broken pipe
>> >> >
>> >> > at Method)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
>> >> >
>> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >
>> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >> >
>> >> > at
>> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$
>> >> >
>> >> > 14/08/13 02:45:30 ERROR executor.Executor: Exception in task ID 2840
>> >> >
>> >> > Broken pipe
>> >> >
>> >> > at Method)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
>> >> >
>> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >
>> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >> >
>> >> > at
>> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$
>> >> >
>> >> >
>> >> > The final error reported to the driver program is:
>> >> >
>> >> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage
>> >> > 0
>> >> >
>> >> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Stage 0 was
>> >> > cancelled
>> >> >
>> >> > 14/08/13 19:03:43 INFO scheduler.DAGScheduler: Failed to run count at
>> >> > /root/data_infrastructure/src/
>> >> >
>> >> > Traceback (most recent call last):
>> >> >
>> >> >   File
>> >> > "/root/data_infrastructure/src/",
>> >> > line 122, in <module>
>> >> >
>> >> >     .groupByKey(numPartitions=user_vin_week_group_partitions)\
>> >> >
>> >> >   File "/root/spark/python/pyspark/", line 737, in count
>> >> >
>> >> >     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> >> >
>> >> >   File "/root/spark/python/pyspark/", line 728, in sum
>> >> >
>> >> >     return self.mapPartitions(lambda x:
>> >> > [sum(x)]).reduce(operator.add)
>> >> >
>> >> >   File "/root/spark/python/pyspark/", line 648, in reduce
>> >> >
>> >> >     vals = self.mapPartitions(func).collect()
>> >> >
>> >> >   File "/root/spark/python/pyspark/", line 612, in collect
>> >> >
>> >> >     bytesInJava = self._jrdd.collect().iterator()
>> >> >
>> >> >   File
>> >> > "/root/spark/python/lib/",
>> >> > line 537, in __call__
>> >> >
>> >> >   File "/root/spark/python/lib/",
>> >> > line
>> >> > 300, in get_return_value
>> >> >
>> >> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> >> > o45.collect.
>> >> >
>> >> > : org.apache.spark.SparkException: Job aborted due to stage failure:
>> >> > Task
>> >> > 0.0:602 failed 4 times, most recent failure: Exception failure in TID
>> >> > 3212
>> >> > on host ip-10-146-221-202.ec2.internal:
>> >> > Broken
>> >> > pipe
>> >> >
>> >> > Method)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
>> >> >
>> >> >         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >
>> >> >
>> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >> >
>> >> >
>> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.spark.api.python.PythonRDD$
>> >> >
>> >> > Driver stacktrace:
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> >$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >> >
>> >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>> >> >
>> >> > at scala.Option.foreach(Option.scala:236)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> >> >
>> >> > at
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> >> >
>> >> > at
>> >> > scala.concurrent.forkjoin.ForkJoinTask.doExec(
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> >> >
>> >> > at
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> >> >
>> >> > at
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > I also noticed some AssociationError's in the log of each Worker (in
>> >> > /root/spark/logs):
>> >> >
>> >> > 14/08/13 19:03:44 ERROR remote.EndpointWriter: AssociationError
>> >> > [akka.tcp://sparkWorker@ip-10-142-182-124.ec2.internal:57142] ->
>> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]:
>> >> > Error
>> >> > [Association failed with
>> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]] [
>> >> >
>> >> > akka.remote.EndpointAssociationException: Association failed with
>> >> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]
>> >> >
>> >> > Caused by:
>> >> >
>> >> > akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>> >> > Connection refused:
>> >> > ip-10-142-182-124.ec2.internal/]
>> >> >
>> >> >
>> >> > It looks like the error is occurring during the shuffle when the
>> >> > reduce
>> >> > tasks are trying to fetch their corresponding map outputs and the
>> >> > connection
>> >> > over which they are fetching this data is getting reset or
>> >> > prematurely
>> >> > terminated. This Job runs fine when I run it on the same setup with a
>> >> > smaller dataset (~ 62 GB). I am unable to debug this further. Any
>> >> > help
>> >> > would
>> >> > be appreciated.
>> >> >
>> >> > Thanks
>> >> >
>> >> > Arpan
>> >> >
>> >> >
>> >
>> >

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to