Repository: giraph Updated Branches: refs/heads/trunk 5a04dc554 -> e4aa99d3f
Increase info-logging while waiting for straggler workers Summary: Keep logging info messages while waiting for task-time-out Test Plan: All unit tests are passing. Manual tests to ensure desired functionality is observed. Reviewers: maja.kabiljo Subscribers: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D55467 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e4aa99d3 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e4aa99d3 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e4aa99d3 Branch: refs/heads/trunk Commit: e4aa99d3f603e70c7db3a55ae5d59470c1a37f58 Parents: 5a04dc5 Author: Tyler Serdar Bulut <[email protected]> Authored: Tue Mar 15 12:10:36 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu Mar 17 09:56:00 2016 -0700 ---------------------------------------------------------------------- .../apache/giraph/master/BspServiceMaster.java | 106 ++++++++++++------- 1 file changed, 65 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/e4aa99d3/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index cc70b17..e9ece66 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -1275,41 +1275,47 @@ public class BspServiceMaster<I extends WritableComparable, } String workerInfoHealthyPath = getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep()); - List<String> finishedHostnameIdList; + List<String> finishedHostnameIdList = new ArrayList<>(); long nextInfoMillis = System.currentTimeMillis(); final int defaultTaskTimeoutMsec = 10 * 60 * 1000; // from TaskTracker + final int waitBetweenLogInfoMsec = 30 * 1000; final int taskTimeoutMsec = getContext().getConfiguration().getInt( - "mapred.task.timeout", defaultTaskTimeoutMsec); + "mapred.task.timeout", defaultTaskTimeoutMsec) / 2; + long lastRegularRunTimeMsec = 0; + int eventLoopTimeout = Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec); + boolean logInfoOnlyRun = false; List<WorkerInfo> deadWorkers = new ArrayList<>(); while (true) { - try { - finishedHostnameIdList = - getZkExt().getChildrenExt(finishedWorkerPath, - true, - false, - false); - } catch (KeeperException e) { - throw new IllegalStateException( - "barrierOnWorkerList: KeeperException - Couldn't get " + - "children of " + finishedWorkerPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "barrierOnWorkerList: IllegalException - Couldn't get " + - "children of " + finishedWorkerPath, e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("barrierOnWorkerList: Got finished worker list = " + - finishedHostnameIdList + ", size = " + - finishedHostnameIdList.size() + - ", worker list = " + - workerInfoList + ", size = " + - workerInfoList.size() + - " from " + finishedWorkerPath); + if (! logInfoOnlyRun) { + try { + finishedHostnameIdList = + getZkExt().getChildrenExt(finishedWorkerPath, + true, + false, + false); + } catch (KeeperException e) { + throw new IllegalStateException( + "barrierOnWorkerList: KeeperException - Couldn't get " + + "children of " + finishedWorkerPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "barrierOnWorkerList: IllegalException - Couldn't get " + + "children of " + finishedWorkerPath, e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("barrierOnWorkerList: Got finished worker list = " + + finishedHostnameIdList + ", size = " + + finishedHostnameIdList.size() + + ", worker list = " + + workerInfoList + ", size = " + + workerInfoList.size() + + " from " + finishedWorkerPath); + } } if (LOG.isInfoEnabled() && (System.currentTimeMillis() > nextInfoMillis)) { - nextInfoMillis = System.currentTimeMillis() + 30000; + nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec; LOG.info("barrierOnWorkerList: " + finishedHostnameIdList.size() + " out of " + workerInfoList.size() + @@ -1322,29 +1328,47 @@ public class BspServiceMaster<I extends WritableComparable, LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers); } } - getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " - " + - finishedHostnameIdList.size() + - " finished out of " + - workerInfoList.size() + - " on superstep " + getSuperstep()); - if (finishedHostnameIdList.containsAll(hostnameIdList)) { - break; - } - for (WorkerInfo deadWorker : deadWorkers) { - if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) { - LOG.error("barrierOnWorkerList: no results arived from " + - "worker that was pronounced dead: " + deadWorker + - " on superstep " + getSuperstep()); - return false; + if (! logInfoOnlyRun) { + getContext().setStatus(getGraphTaskManager().getGraphFunctions() + + " - " + + finishedHostnameIdList.size() + + " finished out of " + + workerInfoList.size() + + " on superstep " + getSuperstep()); + if (finishedHostnameIdList.containsAll(hostnameIdList)) { + break; + } + + for (WorkerInfo deadWorker : deadWorkers) { + if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) { + LOG.error("barrierOnWorkerList: no results arived from " + + "worker that was pronounced dead: " + deadWorker + + " on superstep " + getSuperstep()); + return false; + } } + + // wall-clock time skew is ignored + lastRegularRunTimeMsec = System.currentTimeMillis(); } // Wait for a signal or timeout - event.waitMsecs(taskTimeoutMsec / 2); + boolean eventTriggered = event.waitMsecs(eventLoopTimeout); + long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() - + lastRegularRunTimeMsec; event.reset(); getContext().progress(); + if (eventTriggered || + taskTimeoutMsec == eventLoopTimeout || + elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) { + logInfoOnlyRun = false; + } else { + logInfoOnlyRun = true; + continue; + } + // Did a worker die? try { deadWorkers.addAll(superstepChosenWorkerAlive(
