Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with 
Spark, we are more confident to avoid "Lost Executor" due to memory issue. So 
far, most of our jobs won't fail or slow down due to "Lost executor".
But sometimes, I observed that individual tasks failed due to 
"sendMessageReliably failed because ack was not received within 120 sec". 
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster 
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We 
are conservative for the Spark jobs, with low number of cores  + big 
parallelism/partitions to control the memory usage in the job, so far we are 
happen to avoid "lost executor".
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 20G --total-executor-cores 168 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 
--conf spark.default.parallelism=6000 --conf 
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no 
cache needed, so I make the storage memoryFraction very lownio is much robust 
than netty in our experience
For this big daily job generating over 20000 of tasks, they all could finish 
without this issue, but sometimes, for the same job, tasks keep failing due to 
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry 
maybe part of life for distribute environment, but I want to know what root 
cause could behind it and how to avoid it.
Do I increase "spark.core.connection.ack.wait.timeout" to fix this error? When 
this happened, I saw there is no executor lost, all are alive. 
Below is the message in the log, for example, it complained about timeout to 
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, 
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)  
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)  at 
org.apache.spark.scheduler.Task.run(Task.scala:64)  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
 at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929)
  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928)
  at scala.Option.foreach(Option.scala:236)  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928)
  at 
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
  at 
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
  at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)  ... 
1 more
Then I check the GC log of executor of host-121, I didn't see any obvious 
problem, as following:
12327.896: [GC [PSYoungGen: 4660710K->2330106K(4660736K)] 
13954033K->12324769K(18641920K), 0.9448550 secs] [Times: user=16.40 sys=0.03, 
real=0.94 secs] 
12331.085: [GC [PSYoungGen: 4660730K->2330106K(4660736K)] 
14655393K->12988787K(18641920K), 0.8691010 secs] [Times: user=14.76 sys=0.03, 
real=0.86 secs] 
12334.869: [GC [PSYoungGen: 4660730K->2330080K(4660736K)] 
15319411K->13631046K(18641920K), 0.8940860 secs] [Times: user=15.20 sys=0.04, 
real=0.89 secs] 
12339.050: [GC [PSYoungGen: 4660704K->1990303K(4660736K)] 
15961670K->13964847K(18641920K), 0.7512230 secs] [Times: user=13.25 sys=0.02, 
real=0.75 secs] 
12345.046: [GC [PSYoungGen: 4320927K->994234K(4660736K)] 
16295471K->14101940K(18641920K), 0.8231710 secs] [Times: user=14.37 sys=0.04, 
real=0.83 secs] 
12345.869: [Full GC [PSYoungGen: 994234K->0K(4660736K)] [ParOldGen: 
13107705K->11473827K(13981184K)] 14101940K->11473827K(18641920K) [PSPermGen: 
56819K->56819K(57344K)], 8.7725670 secs] [Times: user=143.61 sys=0.13, 
real=8.77 secs] 
12361.444: [GC [PSYoungGen: 2330624K->544K(4660736K)] 
13804451K->11474371K(18641920K), 0.0183270 secs] [Times: user=0.26 sys=0.00, 
real=0.02 secs] 
12368.445: [GC [PSYoungGen: 2331168K->352K(4660736K)] 
13804995K->11474320K(18641920K), 0.0330490 secs] [Times: user=0.43 sys=0.02, 
real=0.03 secs] 
12375.528: [GC [PSYoungGen: 2330976K->352K(4660736K)] 
13804944K->11474368K(18641920K), 0.0253410 secs] [Times: user=0.35 sys=0.00, 
real=0.03 secs] 
12382.833: [GC [PSYoungGen: 2330976K->384K(4660736K)] 
13804992K->11474424K(18641920K), 0.0190140 secs] [Times: user=0.28 sys=0.00, 
real=0.02 secs] 
12390.006: [GC [PSYoungGen: 2331008K->576K(4660736K)] 
13805048K->11474632K(18641920K), 0.0166370 secs] [Times: user=0.25 sys=0.01, 
real=0.02 secs] 
12397.345: [GC [PSYoungGen: 2331200K->416K(4660736K)] 
13805256K->11474504K(18641920K), 0.0159600 secs] [Times: user=0.24 sys=0.00, 
real=0.01 secs] 
12405.098: [GC [PSYoungGen: 2331040K->416K(4660736K)] 
13805128K->11474568K(18641920K), 0.0162000 secs] [Times: user=0.23 sys=0.00, 
real=0.01 secs] 
12412.492: [GC [PSYoungGen: 2331040K->416K(4660736K)] 
13805192K->11474608K(18641920K), 0.0281690 secs] [Times: user=0.44 sys=0.00, 
real=0.03 secs] 
12419.666: [GC [PSYoungGen: 2331040K->416K(4660736K)] 
13805232K->11474624K(18641920K), 0.0155110 secs] [Times: user=0.23 sys=0.00, 
real=0.02 secs] 
12427.648: [GC [PSYoungGen: 2331040K->384K(4660736K)] 
13805248K->11474616K(18641920K), 0.0474190 secs] [Times: user=0.51 sys=0.01, 
real=0.04 secs] 
12435.858: [GC [PSYoungGen: 2331008K->320K(4660736K)] 
13805240K->11474576K(18641920K), 0.0271780 secs] [Times: user=0.32 sys=0.00, 
real=0.03 secs] So the small GC finished tens of milli-second, and full gc just 
be done in several seconds. 
Any idea how to avoid this case?
Thanks
Yong
                                          

Reply via email to