Hi,
I'm looking to the method fetchOutputs from the ReduceTask.java, but there's
a part of the method that I don't understand. Inside the method, in the
synchronized (scheduledCopies) {...}, I don't understand what's happening
inside the curly brackets? What's the purpose of this part of code?
Here's the method:
[code]
public boolean fetchOutputs() throws IOException {
(...)
// loop until we get all required outputs
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null)
{
(...)
// Put the hash entries for the failed fetches.
Iterator<MapOutputLocation> locItr = retryFetches.iterator();
while (locItr.hasNext()) {
MapOutputLocation loc = locItr.next();
List<MapOutputLocation> locList =
mapLocations.get(loc.getHost());
// Check if the list exists. Map output location mapping is
cleared
// once the jobtracker restarts and is rebuilt from scratch.
// Note that map-output-location mapping will be recreated and
hence
// we continue with the hope that we might find some locations
// from the rebuild map.
if (locList != null) {
// Add to the beginning of the list so that this map is
//tried again before the others and we can hasten the
//re-execution of this map should there be a problem
locList.add(0, loc);
}
}
if (retryFetches.size() > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + retryFetches.size() +
" map-outputs from previous failures");
}
// clear the "failed" fetches hashmap
retryFetches.clear();
// now walk through the cache and schedule what we can
int numScheduled = 0;
int numDups = 0;
#### HERE ########
Q1: I don't understand what is going on inside this syncronized?
What's the purpose of this code?
synchronized (scheduledCopies) {
// Randomize the map output locations to prevent
// all reduce-tasks swamping the same tasktracker
List<String> hostList = new ArrayList<String>();
hostList.addAll(mapLocations.keySet());
Collections.shuffle(hostList, this.random);
Iterator<String> hostsItr = hostList.iterator();
while (hostsItr.hasNext()) {
String host = hostsItr.next();
List<MapOutputLocation> knownOutputsByLoc =
mapLocations.get(host);
// Check if the list exists. Map output location mapping is
// cleared once the jobtracker restarts and is rebuilt from
// scratch.
// Note that map-output-location mapping will be recreated and
// hence we continue with the hope that we might find some
// locations from the rebuild map and add then for fetching.
if (knownOutputsByLoc == null || knownOutputsByLoc.size() ==
0) {
continue;
}
//Identify duplicate hosts here
if (uniqueHosts.contains(host)) {
numDups += knownOutputsByLoc.size();
continue;
}
Long penaltyEnd = penaltyBox.get(host);
boolean penalized = false;
if (penaltyEnd != null) {
if (currentTime < penaltyEnd.longValue()) {
penalized = true;
} else {
penaltyBox.remove(host);
}
}
if (penalized)
continue;
synchronized (knownOutputsByLoc) {
locItr = knownOutputsByLoc.iterator();
while (locItr.hasNext()) {
MapOutputLocation loc = locItr.next();
// Do not schedule fetches from OBSOLETE maps
if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
locItr.remove();
continue;
}
uniqueHosts.add(host);
scheduledCopies.add(loc);
locItr.remove(); // remove from knownOutputs
numInFlight++; numScheduled++;
break; //we have a map from this host
}
}
}
scheduledCopies.notifyAll();
}
if (numScheduled > 0 || logNow) {
LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
" outputs (" + penaltyBox.size() +
" slow hosts and" + numDups + " dup hosts)");
}
if (penaltyBox.size() > 0 && logNow) {
LOG.info("Penalized(slow) Hosts: ");
for (String host : penaltyBox.keySet()) {
LOG.info(host + " Will be considered after: " +
((penaltyBox.get(host) - currentTime)/1000) + "
seconds.");
}
}
// if we have no copies in flight and we can't schedule anything
// new, just wait for a bit
try {
if (numInFlight == 0 && numScheduled == 0) {
// we should indicate progress as we don't want TT to think
// we're stuck and kill us
reporter.progress();
Thread.sleep(5000);
}
} catch (InterruptedException e) { } // IGNORE
while (numInFlight > 0 && mergeThrowable == null) {
LOG.debug(reduceTask.getTaskID() + " numInFlight = " +
numInFlight);
//the call to getCopyResult will either
//1) return immediately with a null or a valid CopyResult
object,
// or
//2) if the numInFlight is above maxInFlight, return with a
// CopyResult object after getting a notification from a
// fetcher thread,
//So, when getCopyResult returns null, we can be sure that
//we aren't busy enough and we should go and get more
mapcompletion
//events from the tasktracker
CopyResult cr = getCopyResult(numInFlight);
if (cr == null) {
break;
}
if (cr.getSuccess()) { // a successful copy
numCopied++;
lastProgressTime = System.currentTimeMillis();
reduceShuffleBytes.increment(cr.getSize());
long secsSinceStart =
(System.currentTimeMillis()-startTime)/1000+1;
float mbs =
((float)reduceShuffleBytes.getCounter())/(1024*1024);
float transferRate = mbs/secsSinceStart;
copyPhase.startNextPhase();
copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
+ " at " +
mbpsFormat.format(transferRate) + "
MB/s)");
// Note successful fetch for this mapId to invalidate
// (possibly) old fetch-failures
fetchFailedMaps.remove(cr.getLocation().getTaskId());
} else if (cr.isObsolete()) {
//ignore
LOG.info(reduceTask.getTaskID() +
" Ignoring obsolete copy result for Map Task: " +
cr.getLocation().getTaskAttemptId() + " from host: "
+
cr.getHost());
} else {
retryFetches.add(cr.getLocation());
// note the failed-fetch
TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
TaskID mapId = cr.getLocation().getTaskId();
totalFailures++;
Integer noFailedFetches =
mapTaskToFailedFetchesMap.get(mapTaskId);
noFailedFetches =
(noFailedFetches == null) ? 1 : (noFailedFetches + 1);
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId);
// did the fetch fail too many times?
// using a hybrid technique for notifying the jobtracker.
// a. the first notification is sent after max-retries
// b. subsequent notifications are sent after 2 retries.
if ((noFailedFetches >= maxFetchRetriesPerMap)
&& ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
LOG.info("Failed to fetch map-output from " + mapTaskId +
" even after MAX_FETCH_RETRIES_PER_MAP retries...
"
+ " reporting to the JobTracker");
}
}
// note unique failed-fetch maps
if (noFailedFetches == maxFetchRetriesPerMap) {
fetchFailedMaps.add(mapId);
// did we have too many unique failed-fetch maps?
// and did we fail on too many fetch attempts?
// and did we progress enough
// or did we wait for too long without any progress?
// check if the reducer is healthy
boolean reducerHealthy =
(((float)totalFailures / (totalFailures + numCopied))
< MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)numCopied / numMaps)
>= MIN_REQUIRED_PROGRESS_PERCENT);
// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
int stallDuration =
(int)(System.currentTimeMillis() - lastProgressTime);
// duration for which the reducer ran with progress
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);
// min time the reducer should run without getting killed
int minShuffleRunDuration =
(shuffleProgressDuration > maxMapRuntime)
? shuffleProgressDuration
: maxMapRuntime;
boolean reducerStalled =
(((float)stallDuration / minShuffleRunDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);
// kill if not healthy and has insufficient progress
if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
fetchFailedMaps.size() == (numMaps -
copiedMapOutputs.size()))
&& !reducerHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
LOG.fatal("Shuffle failed with too many fetch failures " +
"and insufficient progress!" +
"Killing task " + getTaskID() + ".");
umbilical.shuffleError(getTaskID(),
"Exceeded
MAX_FAILED_UNIQUE_FETCHES;"
+ " bailing-out.");
}
}
// back off exponentially until num_retries <= max_retries
// back off by max_backoff/2 on subsequent failed attempts
currentTime = System.currentTimeMillis();
int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +
(currentBackOff/1000) + " seconds");
}
uniqueHosts.remove(cr.getHost());
numInFlight--;
}
}
(..)
}
[/code]
Thanks,
--
Pedro