Repository: giraph Updated Branches: refs/heads/trunk 4b743f163 -> c8078a2f3
Gets rid of spammy ZooKeeper messages regarding 'lost reservation' at the end of the workers' log Summary: Once a Giraph job with more than 1 worker completes running, ZooKeeper usually causes a lot of unnecessary messages mistakenly informing 'lost reservation' of input splits. This is happening because one worker closes EPHEMERAL ZooKeeper node and this causes an event on other workers about each existing input split. Since there is one instance of EPHEMERAL ZooKeeper, once it is closed it translate as deletion/lost of input splits in other workers. These events are watched by ZooKeeperExt in other workers causing 'lost reservation' messages, one for each input split, at the end of the worker log. A fix to this problem is to avoid watching aforementioned events once computation passes the INPUT_SUPERSTEP. [JIRA number is GIRAPH-1009] Test Plan: Running a Giraph job on more than 1 worker without this patch causes the 'lost reservation' messages at the end of worker log. Applying this patch eliminates these messages. 'mvn clean verify' returns successfully with this patch. Reviewers: dionysis.logothetis, avery.ching Reviewed By: avery.ching Differential Revision: https://reviews.facebook.net/D38949 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c8078a2f Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c8078a2f Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c8078a2f Branch: refs/heads/trunk Commit: c8078a2f3348f53d2abd276977323aff74c34f1c Parents: 4b743f1 Author: Hassan Eslami <[email protected]> Authored: Tue May 26 12:56:20 2015 -0700 Committer: Avery Ching <[email protected]> Committed: Tue May 26 13:05:39 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 +++ .../apache/giraph/worker/BspServiceWorker.java | 22 ++++++++++++++++---- .../giraph/worker/InputSplitsHandler.java | 17 +++++++++++---- 3 files changed, 34 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c8078a2f/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index bb5e7e8..8836f3c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-1009: Spammy 'lost reservation' messages from ZooKeeper in workers' log at the end of + the computation. (heslami via aching) + GIRAPH-1008: Create Computation per thread instead of per partition (majakabiljo) GIRAPH-1004: Allow changing hadoop output format (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/c8078a2f/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 2f1c2ef..5ec1872 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -180,6 +180,11 @@ public class BspServiceWorker<I extends WritableComparable, /** Time spent waiting on requests to finish */ private GiraphTimer waitRequestsTimer; + /** InputSplit handlers used in INPUT_SUPERSTEP for vertex splits */ + private InputSplitsHandler vertexSplitsHandler; + /** InputSplit handlers used in INPUT_SUPERSTEP for edge splits */ + private InputSplitsHandler edgeSplitsHandler; + /** * Constructor for setting up the worker. * @@ -235,6 +240,8 @@ public class BspServiceWorker<I extends WritableComparable, null; GiraphMetrics.get().addSuperstepResetObserver(this); + vertexSplitsHandler = null; + edgeSplitsHandler = null; } @Override @@ -390,7 +397,7 @@ public class BspServiceWorker<I extends WritableComparable, new InputSplitPathOrganizer(getZkExt(), inputSplitPathList, getWorkerInfo().getHostname(), getConfiguration().useInputSplitLocality()); - InputSplitsHandler splitsHandler = new InputSplitsHandler( + vertexSplitsHandler = new InputSplitsHandler( splitOrganizer, getZkExt(), getContext(), @@ -403,7 +410,7 @@ public class BspServiceWorker<I extends WritableComparable, getContext(), getConfiguration(), this, - splitsHandler, + vertexSplitsHandler, getZkExt()); return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory); @@ -424,7 +431,7 @@ public class BspServiceWorker<I extends WritableComparable, new InputSplitPathOrganizer(getZkExt(), inputSplitPathList, getWorkerInfo().getHostname(), getConfiguration().useInputSplitLocality()); - InputSplitsHandler splitsHandler = new InputSplitsHandler( + edgeSplitsHandler = new InputSplitsHandler( splitOrganizer, getZkExt(), getContext(), @@ -437,7 +444,7 @@ public class BspServiceWorker<I extends WritableComparable, getContext(), getConfiguration(), this, - splitsHandler, + edgeSplitsHandler, getZkExt()); return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory). @@ -895,6 +902,13 @@ public class BspServiceWorker<I extends WritableComparable, if (getSuperstep() != INPUT_SUPERSTEP) { postSuperstepCallbacks(); + } else { + if (getConfiguration().hasVertexInputFormat()) { + vertexSplitsHandler.setDoneReadingGraph(true); + } + if (getConfiguration().hasEdgeInputFormat()) { + edgeSplitsHandler.setDoneReadingGraph(true); + } } globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor); http://git-wip-us.apache.org/repos/asf/giraph/blob/c8078a2f/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java index f7d11a3..e2099eb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java @@ -54,6 +54,9 @@ public class InputSplitsHandler implements Watcher { private final String inputSplitReservedNode; /** ZooKeeper input split finished node. */ private final String inputSplitFinishedNode; + /** Specifies if we finished execution of INPUT_SUPERSTEP. The variable may + * be accessed via different threads. */ + private volatile boolean doneReadingGraph; /** * Constructor @@ -73,10 +76,14 @@ public class InputSplitsHandler implements Watcher { this.context = context; this.inputSplitReservedNode = inputSplitReservedNode; this.inputSplitFinishedNode = inputSplitFinishedNode; + this.doneReadingGraph = false; } + public void setDoneReadingGraph(boolean doneReadingGraph) { + this.doneReadingGraph = doneReadingGraph; + } - /** + /** * Try to reserve an InputSplit for loading. While InputSplits exists that * are not finished, wait until they are. * @@ -182,10 +189,12 @@ public class InputSplitsHandler implements Watcher { "state " + event.getState() + ", event type " + event.getType()); return; } - // Check if the reservation for the input split was lost - // (some worker died) + // Check if the reservation for the input split was lost in INPUT_SUPERSTEP + // (some worker died). If INPUT_SUPERSTEP has already completed, we ignore + // this event. if (event.getPath().endsWith(inputSplitReservedNode) && - event.getType() == Watcher.Event.EventType.NodeDeleted) { + event.getType() == Watcher.Event.EventType.NodeDeleted && + !doneReadingGraph) { synchronized (pathList) { String split = event.getPath(); split = split.substring(0, split.indexOf(inputSplitReservedNode));
