Hi Thomas,

sorry for such a late reply.  I don't have any super-useful advice, but
this seems like something that is important to follow up on.  to answer
your immediate question, No, there should not be any hard limit to the
number of tasks that MapOutputTracker can handle.  Though of course as
things get bigger, the overheads increase which is why you might hit
timeouts.

Two other minor suggestions:
(1) increase spark.akka.askTimeout -- thats the timeout you are running
into, it defaults to 30 seconds
(2) as you've noted, you've needed to play w/ other timeouts b/c of long GC
pauses -- its possible some GC tuning might help, though its a bit of a
black art so its hard to say what you can try.  You cold always try
Concurrent Mark Swee to avoid the long pauses, but of course that will
probably hurt overall performance.

can you share any more details of what you are trying to do?

Since you're fetching shuffle blocks in a shuffle map task, I guess you've
got two shuffles back-to-back, eg.
someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}.  Do you
expect to be doing a lot of GC in between the two shuffles?? -eg., in the
little example I have, if there were lots of objects being created in the
map & filter steps that will make it out of the eden space.  One possible
solution to this would be to force the first shuffle to complete, before
running any of the subsequent transformations, eg. by forcing
materialization to the cache first

val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK)
intermediateRDD.count() // force the shuffle to complete, without trying to
do our complicated downstream logic at the same time

val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...}

Also, can you share your data size?  Do you expect the shuffle to be
skewed, or do you think it will be well-balanced?  Not that I'll have any
suggestions for you based on the answer, but it may help us reproduce it
and try to fix whatever the root cause is.

thanks,
Imran



On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber <thomas.ger...@radius.com>
wrote:

> I meant spark.default.parallelism of course.
>
> On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber <thomas.ger...@radius.com>
> wrote:
>
>> Follow up:
>> We re-retried, this time after *decreasing* spark.parallelism. It was set
>> to 16000 before, (5 times the number of cores in our cluster). It is now
>> down to 6400 (2 times the number of cores).
>>
>> And it got past the point where it failed before.
>>
>> Does the MapOutputTracker have a limit on the number of tasks it can
>> track?
>>
>>
>> On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber <thomas.ger...@radius.com>
>> wrote:
>>
>>> Hello,
>>>
>>> We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
>>> workers). We use spark-submit to start an application.
>>>
>>> We got the following error which leads to a failed stage:
>>>
>>> Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
>>> most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
>>> ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
>>> communicating with MapOutputTracker
>>>
>>>
>>> We tried the whole application again, and it failed on the same stage
>>> (but it got more tasks completed on that stage) with the same error.
>>>
>>> We then looked at executors stderr, and all show similar logs, on both
>>> runs (see below). As far as we can tell, executors and master have disk
>>> space left.
>>>
>>> *Any suggestion on where to look to understand why the communication
>>> with the MapOutputTracker fails?*
>>>
>>> Thanks
>>> Thomas
>>> ====
>>> In case it matters, our akka settings:
>>> spark.akka.frameSize 50
>>> spark.akka.threads 8
>>> // those below are 10* the default, to cope with large GCs
>>> spark.akka.timeout 1000
>>> spark.akka.heartbeat.pauses 60000
>>> spark.akka.failure-detector.threshold 3000.0
>>> spark.akka.heartbeat.interval 10000
>>>
>>> Appendix: executor logs, where it starts going awry
>>>
>>> 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 
>>> 298525
>>> 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
>>> 298525)
>>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
>>> curMem=5543008799, maxMem=18127202549
>>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
>>> bytes in memory (estimated size 1473.0 B, free 11.7 GB)
>>> 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
>>> broadcast_339_piece0
>>> 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 
>>> took 224 ms
>>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
>>> curMem=5543010272, maxMem=18127202549
>>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
>>> memory (estimated size 2.5 KB, free 11.7 GB)
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
>>> actor = 
>>> Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:30 ERROR MapOutputTrackerWorker: Error communicating with 
>>> MapOutputTracker
>>> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>>>     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>     at 
>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>     at 
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>     at scala.concurrent.Await$.result(package.scala:107)
>>>     at 
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:112)
>>>     at 
>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:163)
>>>     at 
>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>     at 
>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>>     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>     at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>     at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> 15/03/04 11:45:30 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
>>> actor = 
>>> Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
>>> 15/03/04 11:45:30 ERROR Executor: Exception in task 32.0 in stage 140.0 
>>> (TID 295474)
>>> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>>>     at 
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:116)
>>>     at 
>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:163)
>>>     at 
>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>
>>> ===
>>> and then later a lot of those:
>>> ===
>>>
>>> 15/03/04 11:51:50 ERROR TransportRequestHandler: Error sending result 
>>> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=29906093434, 
>>> chunkIndex=25}, 
>>> buffer=FileSegmentManagedBuffer{file=/mnt/spark/spark-3f8c4cbe-a1f8-4a66-ac17-0a3d3daaffaf/spark-92cb6108-35af-4ad0-82f6-ac904b677eff/spark-8fc6043c-df95-4c48-9215-5b9907014b55/spark-99219c49-778b-4b5f-8454-24d2d3b82b81/0d/shuffle_18_6718_0.data,
>>>  offset=182070, length=166}} to /10.0.12.24:33174; closing connection
>>> java.nio.channels.ClosedChannelException
>>>
>>>
>>
>

Reply via email to