Hi,

Let me begin by describing my Spark setup on EC2 (launched using the
provided spark-ec2.py script):

   - 100 c3.2xlarge workers (8 cores & 15GB memory each)
   - 1 c3.2xlarge Master (only running master daemon)
   - Spark 1.0.2
   - 8GB mounted at */* & 80 GB mounted at */mnt*

*spark-defaults.conf *(A lot of config options have been added here to try
and fix the problem. I also encounter the problem while running with the
default options)

spark.executor.memory   12991m
spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
spark.shuffle.file.buffer.kb    1024
spark.reducer.maxMbInFlight     96
spark.serializer.objectStreamReset      100000
spark.akka.frameSize    100
spark.akka.threads      32
spark.akka.timeout      1000
spark.serializer        org.apache.spark.serializer.KryoSerializer

*spark-env.sh *(A lot of config options have been added here to try and fix
the problem. I also encounter the problem while running with the default
options)

export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark"
export SPARK_MASTER_OPTS="-Dspark.worker.timeout=900"
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_CORES=8
export HADOOP_HOME="/root/ephemeral-hdfs"
export SPARK_MASTER_IP=*<Master's Public DNS, as added by spark-ec2.py
script>*
export MASTER=`cat /root/spark-ec2/cluster-url`
export SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH
:/root/ephemeral-hdfs/lib/native/"
export SPARK_SUBMIT_CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH
:/root/ephemeral-hdfs/conf"
export SPARK_PUBLIC_DNS=*<wget command to get the public hostname, as added
by spark-ec2.py script>*

# Set a high ulimit for large shuffles

ulimit -n 10000000


I am trying to run a very simple Job which reads in CSV data (~ 124 GB)
from a S3 bucket, tries to group it based on a key and counts the number of
groups. The number of partitions for the input *textFile()* is set to 1600
and the number of partitions for the *groupByKey()* operation is also 1600

*conf = SparkConf().setAppName(JOB_NAME).setMaster(master)*
*sc = SparkContext(conf=sconf)*

*drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)*



*drive_grouped_by_user_vin_and_week =
drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\*

*        .groupByKey(numPartitions=user_vin_week_group_partitions)\*

*        .count()*


Stage 1 (*flatMap()*) launches 1601 tasks all of which complete in 159
seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of
which 1595 complete in under a minute. The same 5 TIDs consistently fail
with the following errors in the logs of their respective Executors:

14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203

org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141)

at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)

at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

at org.apache.spark.scheduler.Task.run(Task.scala:51)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.EOFException

at java.io.DataInputStream.readInt(DataInputStream.java:392)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)

... 10 more

14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited unexpectedly
(crashed)

java.net.SocketException: Connection reset

at java.net.SocketInputStream.read(SocketInputStream.java:196)

at java.net.SocketInputStream.read(SocketInputStream.java:122)

at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)

at java.io.BufferedInputStream.read(BufferedInputStream.java:254)

at java.io.DataInputStream.readInt(DataInputStream.java:387)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)

at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)

at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

at org.apache.spark.scheduler.Task.run(Task.scala:51)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

14/08/13 02:45:30 ERROR python.PythonRDD: This may have been caused by a
prior exception:

java.net.SocketException: Broken pipe

at java.net.SocketOutputStream.socketWrite0(Native Method)

at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)

at java.net.SocketOutputStream.write(SocketOutputStream.java:159)

at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)

at java.io.DataOutputStream.write(DataOutputStream.java:107)

at java.io.FilterOutputStream.write(FilterOutputStream.java:97)

at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)

at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)

at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)

at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)

14/08/13 02:45:30 ERROR executor.Executor: Exception in task ID 2840

java.net.SocketException: Broken pipe

at java.net.SocketOutputStream.socketWrite0(Native Method)

at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)

at java.net.SocketOutputStream.write(SocketOutputStream.java:159)

at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)

at java.io.DataOutputStream.write(DataOutputStream.java:107)

at java.io.FilterOutputStream.write(FilterOutputStream.java:97)

at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)

at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)

at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)

at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)

at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)


The final error reported to the driver program is:

14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0

14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled

14/08/13 19:03:43 INFO scheduler.DAGScheduler: Failed to run count at
/root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py:122

Traceback (most recent call last):

  File "/root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py",
line 122, in <module>

    .groupByKey(numPartitions=user_vin_week_group_partitions)\

  File "/root/spark/python/pyspark/rdd.py", line 737, in count

    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

  File "/root/spark/python/pyspark/rdd.py", line 728, in sum

    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

  File "/root/spark/python/pyspark/rdd.py", line 648, in reduce

    vals = self.mapPartitions(func).collect()

  File "/root/spark/python/pyspark/rdd.py", line 612, in collect

    bytesInJava = self._jrdd.collect().iterator()

  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 537, in __call__

  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o45.collect.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task
0.0:602 failed 4 times, most recent failure: Exception failure in TID 3212
on host ip-10-146-221-202.ec2.internal: java.net.SocketException: Broken
pipe

        java.net.SocketOutputStream.socketWrite0(Native Method)

        java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)

        java.net.SocketOutputStream.write(SocketOutputStream.java:159)

        java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)

        java.io.DataOutputStream.write(DataOutputStream.java:107)

        java.io.FilterOutputStream.write(FilterOutputStream.java:97)


org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)


org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)

        scala.collection.Iterator$class.foreach(Iterator.scala:727)

        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)


org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)


org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)


org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)


org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)

        org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)


org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I also noticed some AssociationError's in the log of each Worker (in
*/root/spark/logs*):

14/08/13 19:03:44 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://[email protected]:57142] ->
[akka.tcp://[email protected]:51159]: Error
[Association failed with
[akka.tcp://[email protected]:51159]] [

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://[email protected]:51159]

Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: ip-10-142-182-124.ec2.internal/10.142.182.124:51159]


It looks like the error is occurring during the shuffle when the reduce
tasks are trying to fetch their corresponding map outputs and the
connection over which they are fetching this data is getting reset or
prematurely terminated. This Job runs fine when I run it on the same setup
with a smaller dataset (~ 62 GB). I am unable to debug this further. Any
help would be appreciated.

Thanks

Arpan

Reply via email to