The closed information I can found online related to this error 
ishttps://issues.apache.org/jira/browse/SPARK-3633
But it is quite different in our case. In our case, we never saw the "(Too many 
open files)" error, the log just simple show the 120 sec time out.
I checked all the GC output from all 42 executors, the max full gc real=11.79 
secs is what I can find, way less than 120 seconds time out.
>From 42 executors, there is on executor's stdout/stderr page hangs, I cannot 
>see any gc or log information for this executor, but it is shown as "LOADING" 
>in the master page, and I think the reason is just the "WorkerUI" cannot bind 
>to 8081 somehow during the boot time, and bind to 8082 instead, master UI 
>didn't catch that information.
Anyway, my only option now is to increase the timeout of both 
"spark.core.connection.ack.wait.timeout" and "spark.akka.timeout" to 600, as 
suggested in the jira, and will report back what I find later.
This same daily job runs about 12 hours in the Hive/MR, and can finish about 4 
hours in Spark (with 25% allocated cluster resource). On this point, Spark is 
faster and great, but IF (big IF) every tasks run smoothly.
In Hive/MR, if the job is setup, it will finish, maybe slow, but smoothly. In 
Spark, in this case, it does retry the failed partitions only, but we saw 4 or 
5 times retry sometimes, make it in fact much much slower.
Yong
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Any suggestion about "sendMessageReliably failed because ack was not 
received within 120 sec"
Date: Thu, 20 Aug 2015 20:49:52 -0400




Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with 
Spark, we are more confident to avoid "Lost Executor" due to memory issue. So 
far, most of our jobs won't fail or slow down due to "Lost executor".
But sometimes, I observed that individual tasks failed due to 
"sendMessageReliably failed because ack was not received within 120 sec". 
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster 
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We 
are conservative for the Spark jobs, with low number of cores  + big 
parallelism/partitions to control the memory usage in the job, so far we are 
happen to avoid "lost executor".
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 20G --total-executor-cores 168 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 
--conf spark.default.parallelism=6000 --conf 
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no 
cache needed, so I make the storage memoryFraction very lownio is much robust 
than netty in our experience
For this big daily job generating over 20000 of tasks, they all could finish 
without this issue, but sometimes, for the same job, tasks keep failing due to 
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry 
maybe part of life for distribute environment, but I want to know what root 
cause could behind it and how to avoid it.
Do I increase "spark.core.connection.ack.wait.timeout" to fix this error? When 
this happened, I saw there is no executor lost, all are alive. 
Below is the message in the log, for example, it complained about timeout to 
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, 
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)  
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)  at 
org.apache.spark.scheduler.Task.run(Task.scala:64)  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
 at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929)
  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928)
  at scala.Option.foreach(Option.scala:236)  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928)
  at 
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
  at 
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
  at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)  ... 
1 more
Then I check the GC log of executor of host-121, I didn't see any obvious 
problem, as following:
12327.896: [GC [PSYoungGen: 4660710K->2330106K(4660736K)] 
13954033K->12324769K(18641920K), 0.9448550 secs] [Times: user=16.40 sys=0.03, 
real=0.94 secs] 
12331.085: [GC [PSYoungGen: 4660730K->2330106K(4660736K)] 
14655393K->12988787K(18641920K), 0.8691010 secs] [Times: user=14.76 sys=0.03, 
real=0.86 secs] 
12334.869: [GC [PSYoungGen: 4660730K->2330080K(4660736K)] 
15319411K->13631046K(18641920K), 0.8940860 secs] [Times: user=15.20 sys=0.04, 
real=0.89 secs] 
12339.050: [GC [PSYoungGen: 4660704K->1990303K(4660736K)] 
15961670K->13964847K(18641920K), 0.7512230 secs] [Times: user=13.25 sys=0.02, 
real=0.75 secs] 
12345.046: [GC [PSYoungGen: 4320927K->994234K(4660736K)] 
16295471K->14101940K(18641920K), 0.8231710 secs] [Times: user=14.37 sys=0.04, 
real=0.83 secs] 
12345.869: [Full GC [PSYoungGen: 994234K->0K(4660736K)] [ParOldGen: 
13107705K->11473827K(13981184K)] 14101940K->11473827K(18641920K) [PSPermGen: 
56819K->56819K(57344K)], 8.7725670 secs] [Times: user=143.61 sys=0.13, 
real=8.77 secs] 
12361.444: [GC [PSYoungGen: 2330624K->544K(4660736K)] 
13804451K->11474371K(18641920K), 0.0183270 secs] [Times: user=0.26 sys=0.00, 
real=0.02 secs] 
12368.445: [GC [PSYoungGen: 2331168K->352K(4660736K)] 
13804995K->11474320K(18641920K), 0.0330490 secs] [Times: user=0.43 sys=0.02, 
real=0.03 secs] 
12375.528: [GC [PSYoungGen: 2330976K->352K(4660736K)] 
13804944K->11474368K(18641920K), 0.0253410 secs] [Times: user=0.35 sys=0.00, 
real=0.03 secs] 
12382.833: [GC [PSYoungGen: 2330976K->384K(4660736K)] 
13804992K->11474424K(18641920K), 0.0190140 secs] [Times: user=0.28 sys=0.00, 
real=0.02 secs] 
12390.006: [GC [PSYoungGen: 2331008K->576K(4660736K)] 
13805048K->11474632K(18641920K), 0.0166370 secs] [Times: user=0.25 sys=0.01, 
real=0.02 secs] 
12397.345: [GC [PSYoungGen: 2331200K->416K(4660736K)] 
13805256K->11474504K(18641920K), 0.0159600 secs] [Times: user=0.24 sys=0.00, 
real=0.01 secs] 
12405.098: [GC [PSYoungGen: 2331040K->416K(4660736K)] 
13805128K->11474568K(18641920K), 0.0162000 secs] [Times: user=0.23 sys=0.00, 
real=0.01 secs] 
12412.492: [GC [PSYoungGen: 2331040K->416K(4660736K)] 
13805192K->11474608K(18641920K), 0.0281690 secs] [Times: user=0.44 sys=0.00, 
real=0.03 secs] 
12419.666: [GC [PSYoungGen: 2331040K->416K(4660736K)] 
13805232K->11474624K(18641920K), 0.0155110 secs] [Times: user=0.23 sys=0.00, 
real=0.02 secs] 
12427.648: [GC [PSYoungGen: 2331040K->384K(4660736K)] 
13805248K->11474616K(18641920K), 0.0474190 secs] [Times: user=0.51 sys=0.01, 
real=0.04 secs] 
12435.858: [GC [PSYoungGen: 2331008K->320K(4660736K)] 
13805240K->11474576K(18641920K), 0.0271780 secs] [Times: user=0.32 sys=0.00, 
real=0.03 secs] So the small GC finished tens of milli-second, and full gc just 
be done in several seconds. 
Any idea how to avoid this case?
Thanks
Yong
                                                                                
  

Reply via email to