Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with
Spark, we are more confident to avoid "Lost Executor" due to memory issue. So
far, most of our jobs won't fail or slow down due to "Lost executor".
But sometimes, I observed that individual tasks failed due to
"sendMessageReliably failed because ack was not received within 120 sec".
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We
are conservative for the Spark jobs, with low number of cores + big
parallelism/partitions to control the memory usage in the job, so far we are
happen to avoid "lost executor".
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042
--executor-memory 20G --total-executor-cores 168 --conf
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000
--conf spark.default.parallelism=6000 --conf
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no
cache needed, so I make the storage memoryFraction very lownio is much robust
than netty in our experience
For this big daily job generating over 20000 of tasks, they all could finish
without this issue, but sometimes, for the same job, tasks keep failing due to
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry
maybe part of life for distribute environment, but I want to know what root
cause could behind it and how to avoid it.
Do I increase "spark.core.connection.ack.wait.timeout" to fix this error? When
this happened, I saw there is no executor lost, all are alive.
Below is the message in the log, for example, it complained about timeout to
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17,
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException:
sendMessageReliably failed because ack was not received within 120 sec at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at
org.apache.spark.scheduler.Task.run(Task.scala:64) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException:
sendMessageReliably failed because ack was not received within 120 sec at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929)
at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928)
at scala.Option.foreach(Option.scala:236) at
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928)
at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367) ...
1 more
Then I check the GC log of executor of host-121, I didn't see any obvious
problem, as following:
12327.896: [GC [PSYoungGen: 4660710K->2330106K(4660736K)]
13954033K->12324769K(18641920K), 0.9448550 secs] [Times: user=16.40 sys=0.03,
real=0.94 secs]
12331.085: [GC [PSYoungGen: 4660730K->2330106K(4660736K)]
14655393K->12988787K(18641920K), 0.8691010 secs] [Times: user=14.76 sys=0.03,
real=0.86 secs]
12334.869: [GC [PSYoungGen: 4660730K->2330080K(4660736K)]
15319411K->13631046K(18641920K), 0.8940860 secs] [Times: user=15.20 sys=0.04,
real=0.89 secs]
12339.050: [GC [PSYoungGen: 4660704K->1990303K(4660736K)]
15961670K->13964847K(18641920K), 0.7512230 secs] [Times: user=13.25 sys=0.02,
real=0.75 secs]
12345.046: [GC [PSYoungGen: 4320927K->994234K(4660736K)]
16295471K->14101940K(18641920K), 0.8231710 secs] [Times: user=14.37 sys=0.04,
real=0.83 secs]
12345.869: [Full GC [PSYoungGen: 994234K->0K(4660736K)] [ParOldGen:
13107705K->11473827K(13981184K)] 14101940K->11473827K(18641920K) [PSPermGen:
56819K->56819K(57344K)], 8.7725670 secs] [Times: user=143.61 sys=0.13,
real=8.77 secs]
12361.444: [GC [PSYoungGen: 2330624K->544K(4660736K)]
13804451K->11474371K(18641920K), 0.0183270 secs] [Times: user=0.26 sys=0.00,
real=0.02 secs]
12368.445: [GC [PSYoungGen: 2331168K->352K(4660736K)]
13804995K->11474320K(18641920K), 0.0330490 secs] [Times: user=0.43 sys=0.02,
real=0.03 secs]
12375.528: [GC [PSYoungGen: 2330976K->352K(4660736K)]
13804944K->11474368K(18641920K), 0.0253410 secs] [Times: user=0.35 sys=0.00,
real=0.03 secs]
12382.833: [GC [PSYoungGen: 2330976K->384K(4660736K)]
13804992K->11474424K(18641920K), 0.0190140 secs] [Times: user=0.28 sys=0.00,
real=0.02 secs]
12390.006: [GC [PSYoungGen: 2331008K->576K(4660736K)]
13805048K->11474632K(18641920K), 0.0166370 secs] [Times: user=0.25 sys=0.01,
real=0.02 secs]
12397.345: [GC [PSYoungGen: 2331200K->416K(4660736K)]
13805256K->11474504K(18641920K), 0.0159600 secs] [Times: user=0.24 sys=0.00,
real=0.01 secs]
12405.098: [GC [PSYoungGen: 2331040K->416K(4660736K)]
13805128K->11474568K(18641920K), 0.0162000 secs] [Times: user=0.23 sys=0.00,
real=0.01 secs]
12412.492: [GC [PSYoungGen: 2331040K->416K(4660736K)]
13805192K->11474608K(18641920K), 0.0281690 secs] [Times: user=0.44 sys=0.00,
real=0.03 secs]
12419.666: [GC [PSYoungGen: 2331040K->416K(4660736K)]
13805232K->11474624K(18641920K), 0.0155110 secs] [Times: user=0.23 sys=0.00,
real=0.02 secs]
12427.648: [GC [PSYoungGen: 2331040K->384K(4660736K)]
13805248K->11474616K(18641920K), 0.0474190 secs] [Times: user=0.51 sys=0.01,
real=0.04 secs]
12435.858: [GC [PSYoungGen: 2331008K->320K(4660736K)]
13805240K->11474576K(18641920K), 0.0271780 secs] [Times: user=0.32 sys=0.00,
real=0.03 secs] So the small GC finished tens of milli-second, and full gc just
be done in several seconds.
Any idea how to avoid this case?
Thanks
Yong