Updated Branches: refs/heads/trunk b022dce9f -> 1c967b8e0
GIRAPH-498: We should check input splits status from zookeeeper once per worker, not once per split thread (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1c967b8e Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1c967b8e Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1c967b8e Branch: refs/heads/trunk Commit: 1c967b8e0b282deb9a3d49231f410ffcabd9e9de Parents: b022dce Author: Maja Kabiljo <[email protected]> Authored: Tue Feb 5 10:34:54 2013 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Tue Feb 5 10:36:21 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 3 + .../apache/giraph/conf/GiraphConfiguration.java | 10 + .../org/apache/giraph/worker/BspServiceWorker.java | 28 ++- .../giraph/worker/EdgeInputSplitsCallable.java | 14 +- .../worker/EdgeInputSplitsCallableFactory.java | 21 +- .../giraph/worker/InputSplitPathOrganizer.java | 103 +------- .../apache/giraph/worker/InputSplitsCallable.java | 167 +------------ .../apache/giraph/worker/InputSplitsHandler.java | 199 +++++++++++++++ .../giraph/worker/VertexInputSplitsCallable.java | 14 +- .../worker/VertexInputSplitsCallableFactory.java | 21 +- .../test/java/org/apache/giraph/TestBspBasic.java | 8 +- 11 files changed, 280 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d50e1e5..926e404 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-498: We should check input splits status from zookeeeper once per worker, + not once per split thread (majakabiljo) + GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman) GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache (claudio) http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 796047d..79b12d3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -645,4 +645,14 @@ public class GiraphConfiguration extends Configuration set("mapreduce.job.credentials.binary", hadoopTokenFilePath); } } + + /** + * Check if we want to prioritize input splits which reside on the host. + * + * @return True iff we want to use input split locality + */ + public boolean useInputSplitLocality() { + return getBoolean(GiraphConstants.USE_INPUT_SPLIT_LOCALITY, + GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index a48c5ea..ff1033e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -297,14 +297,24 @@ public class BspServiceWorker<I extends WritableComparable, INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(), null, null); + InputSplitPathOrganizer splitOrganizer = + new InputSplitPathOrganizer(getZkExt(), + inputSplitPathList, getWorkerInfo().getHostname(), + getConfiguration().useInputSplitLocality()); + InputSplitsHandler splitsHandler = new InputSplitsHandler( + splitOrganizer, + getZkExt(), + getContext(), + BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE, + BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE); + VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory = new VertexInputSplitsCallableFactory<I, V, E, M>( getContext(), graphState, getConfiguration(), this, - inputSplitPathList, - getWorkerInfo(), + splitsHandler, getZkExt()); return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory); @@ -324,14 +334,24 @@ public class BspServiceWorker<I extends WritableComparable, INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(), null, null); + InputSplitPathOrganizer splitOrganizer = + new InputSplitPathOrganizer(getZkExt(), + inputSplitPathList, getWorkerInfo().getHostname(), + getConfiguration().useInputSplitLocality()); + InputSplitsHandler splitsHandler = new InputSplitsHandler( + splitOrganizer, + getZkExt(), + getContext(), + BspService.EDGE_INPUT_SPLIT_RESERVED_NODE, + BspService.EDGE_INPUT_SPLIT_FINISHED_NODE); + EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory = new EdgeInputSplitsCallableFactory<I, V, E, M>( getContext(), graphState, getConfiguration(), this, - inputSplitPathList, - getWorkerInfo(), + splitsHandler, getZkExt()); return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory). http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 7d40dfb..80c341c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -39,7 +39,6 @@ import org.apache.log4j.Logger; import com.yammer.metrics.core.Counter; import java.io.IOException; -import java.util.List; /** * Load as many edge input splits as possible. @@ -73,9 +72,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, * @param graphState Graph state * @param configuration Configuration * @param bspServiceWorker service worker - * @param inputSplitPathList List of the paths of the input splits - * @param workerInfo This worker's info - * @param threadId Id of input split thread + * @param splitsHandler Handler for input splits * @param zooKeeperExt Handle to ZooKeeperExt */ public EdgeInputSplitsCallable( @@ -83,15 +80,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, GraphState<I, V, E, M> graphState, ImmutableClassesGiraphConfiguration<I, V, E, M> configuration, BspServiceWorker<I, V, E, M> bspServiceWorker, - List<String> inputSplitPathList, - WorkerInfo workerInfo, - int threadId, + InputSplitsHandler splitsHandler, ZooKeeperExt zooKeeperExt) { super(context, graphState, configuration, bspServiceWorker, - inputSplitPathList, workerInfo, threadId, zooKeeperExt, - BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE, - BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE, - bspServiceWorker.getEdgeInputSplitsEvents()); + splitsHandler, zooKeeperExt); inputSplitMaxEdges = configuration.getInputSplitMaxEdges(); http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java index 1adcd73..9297ac1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java @@ -25,8 +25,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.util.List; - /** * Factory for {@link EdgeInputSplitsCallable}s. * @@ -46,10 +44,8 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration; /** {@link BspServiceWorker} we're running on. */ private final BspServiceWorker<I, V, E, M> bspServiceWorker; - /** List of input split paths. */ - private final List<String> inputSplitPathList; - /** Worker info. */ - private final WorkerInfo workerInfo; + /** Handler for input splits */ + private final InputSplitsHandler splitsHandler; /** {@link ZooKeeperExt} for this worker. */ private final ZooKeeperExt zooKeeperExt; @@ -60,8 +56,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, * @param graphState Graph state * @param configuration Configuration * @param bspServiceWorker Calling {@link BspServiceWorker} - * @param inputSplitPathList List of input split paths - * @param workerInfo Worker info + * @param splitsHandler Handler for input splits * @param zooKeeperExt {@link ZooKeeperExt} for this worker */ public EdgeInputSplitsCallableFactory( @@ -69,16 +64,14 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, GraphState<I, V, E, M> graphState, ImmutableClassesGiraphConfiguration<I, V, E, M> configuration, BspServiceWorker<I, V, E, M> bspServiceWorker, - List<String> inputSplitPathList, - WorkerInfo workerInfo, + InputSplitsHandler splitsHandler, ZooKeeperExt zooKeeperExt) { this.context = context; this.graphState = graphState; this.configuration = configuration; this.bspServiceWorker = bspServiceWorker; - this.inputSplitPathList = inputSplitPathList; - this.workerInfo = workerInfo; this.zooKeeperExt = zooKeeperExt; + this.splitsHandler = splitsHandler; } @Override @@ -88,9 +81,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, graphState, configuration, bspServiceWorker, - inputSplitPathList, - workerInfo, - threadId, + splitsHandler, zooKeeperExt); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java index bfaefd2..463601c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java @@ -17,8 +17,8 @@ */ package org.apache.giraph.worker; -import com.google.common.base.Objects; import com.google.common.collect.Lists; + import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; @@ -42,15 +42,13 @@ import java.util.List; * down on the number of ZK reads workers perform before locating * an unclaimed InputSplit. */ -public class InputSplitPathOrganizer implements Iterable<String> { +public class InputSplitPathOrganizer { /** The worker's local ZooKeeperExt ref */ private final ZooKeeperExt zooKeeper; /** The List of InputSplit znode paths */ private final List<String> pathList; /** The worker's hostname */ private final String hostName; - /** The adjusted base offset by which to iterate on the path list */ - private int baseOffset; /** * Constructor @@ -58,16 +56,13 @@ public class InputSplitPathOrganizer implements Iterable<String> { * @param zooKeeper the worker's ZkExt * @param zkPathList the path to read from * @param hostName the worker's host name (for matching) - * @param port the port number for this worker - * @param threadId id of the input split thread * @param useLocality whether to prioritize local input splits */ public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper, - final String zkPathList, final String hostName, final int port, - final int threadId, final boolean useLocality) - throws KeeperException, InterruptedException { + final String zkPathList, final String hostName, + final boolean useLocality) throws KeeperException, InterruptedException { this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true), - hostName, port, threadId, useLocality); + hostName, useLocality); } /** @@ -76,48 +71,29 @@ public class InputSplitPathOrganizer implements Iterable<String> { * @param zooKeeper the worker's ZkExt * @param inputSplitPathList path of input splits to read from * @param hostName the worker's host name (for matching) - * @param port the port number for this worker - * @param threadId id of the input split thread * @param useLocality whether to prioritize local input splits */ public InputSplitPathOrganizer( final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList, - final String hostName, final int port, final int threadId, - final boolean useLocality) - throws KeeperException, InterruptedException { + final String hostName, final boolean useLocality) { this.zooKeeper = zooKeeper; this.pathList = Lists.newArrayList(inputSplitPathList); this.hostName = hostName; + // Shuffle input splits in case several workers exist on this host + Collections.shuffle(pathList); if (useLocality) { - prioritizeLocalInputSplits(port, threadId); - } else { - this.baseOffset = computeBaseOffset(port, threadId); + prioritizeLocalInputSplits(); } } /** - * Compute base offset to start iterating from, - * in order to avoid collisions with other workers/threads. - * - * @param port the port number for this worker - * @param threadId id of the input split thread - * @return the offset to start iterating from - */ - private int computeBaseOffset(final int port, final int threadId) { - return pathList.isEmpty() ? 0 : - Math.abs(Objects.hashCode(hostName, port, threadId) % pathList.size()); - } - - /** * Re-order list of InputSplits so files local to this worker node's * disk are the first it will iterate over when attempting to claim * a split to read. This will increase locality of data reads with greater * probability as the % of total nodes in the cluster hosting data and workers * BOTH increase towards 100%. Replication increases our chances of a "hit." - * @param port the port number to hash against - * @param threadId the threadId to hash against */ - private void prioritizeLocalInputSplits(final int port, final int threadId) { + private void prioritizeLocalInputSplits() { List<String> sortedList = new ArrayList<String>(); String hosts; for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) { @@ -136,13 +112,7 @@ public class InputSplitPathOrganizer implements Iterable<String> { iterator.remove(); // remove local block from list } } - // shuffle the local blocks in case several workers exist on this host - Collections.shuffle(sortedList); - // set the base offset for the split iterator based on the insertion - // point of the local list items back into the nonlocal split list. - baseOffset = computeBaseOffset(port, threadId); - // re-insert local paths at "adjusted index zero" for caller to iterate on - pathList.addAll(baseOffset, sortedList); + pathList.addAll(0, sortedList); } /** @@ -162,54 +132,11 @@ public class InputSplitPathOrganizer implements Iterable<String> { } /** - * Utility accessor for Input Split znode path list size - * - * @return the size of <code>this.pathList</code> - */ - public int getPathListSize() { - return this.pathList.size(); - } - - /** - * Iterator for the pathList + * Get the ordered input splits paths. * - * @return an iterator for our list of input split paths + * @return Ordered input splits paths */ - public Iterator<String> iterator() { - return new PathListIterator(); - } - - /** - * Iterator for path list that handles the locality and hash offsetting. - */ - public class PathListIterator implements Iterator<String> { - /** the current iterator index */ - private int currentIndex = 0; - - /** - * Do we have more list to iterate upon? - * - * @return true if more path strings are available - */ - @Override - public boolean hasNext() { - return currentIndex < pathList.size(); - } - - /** - * Return the next pathList element - * - * @return the next input split path - */ - @Override - public String next() { - return pathList.get((baseOffset + currentIndex++) % pathList.size()); - } - - /** Just a placeholder; should not do anything! */ - @Override - public void remove() { - throw new UnsupportedOperationException("Remove is not allowed."); - } + public Iterable<String> getPathList() { + return pathList; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index d09ca2b..5487ab7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -20,10 +20,8 @@ package org.apache.giraph.worker; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; -import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; -import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; @@ -36,15 +34,11 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.Stat; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; -import java.util.List; import java.util.concurrent.Callable; /** @@ -82,17 +76,11 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * Stores and processes the list of InputSplits advertised * in a tree of child znodes by the master. */ - private final InputSplitPathOrganizer splitOrganizer; + private final InputSplitsHandler splitsHandler; /** ZooKeeperExt handle */ private final ZooKeeperExt zooKeeperExt; /** Get the start time in nanos */ private final long startNanos = TIME.getNanoseconds(); - /** ZooKeeper input split reserved node. */ - private final String inputSplitReservedNode; - /** ZooKeeper input split finished node. */ - private final String inputSplitFinishedNode; - /** Input split events. */ - private final InputSplitEvents inputSplitEvents; /** Whether to prioritize local input splits. */ private final boolean useLocality; @@ -104,26 +92,16 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * @param graphState Graph state * @param configuration Configuration * @param bspServiceWorker service worker - * @param inputSplitPathList List of the paths of the input splits - * @param workerInfo This worker's info - * @param threadId Id of input split thread + * @param splitsHandler Handler for input splits * @param zooKeeperExt Handle to ZooKeeperExt - * @param inputSplitReservedNode Path to input split reserved - * @param inputSplitFinishedNode Path to input split finsished - * @param inputSplitEvents Input split events */ public InputSplitsCallable( Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState, ImmutableClassesGiraphConfiguration<I, V, E, M> configuration, BspServiceWorker<I, V, E, M> bspServiceWorker, - List<String> inputSplitPathList, - WorkerInfo workerInfo, - int threadId, - ZooKeeperExt zooKeeperExt, - String inputSplitReservedNode, - String inputSplitFinishedNode, - InputSplitEvents inputSplitEvents) { + InputSplitsHandler splitsHandler, + ZooKeeperExt zooKeeperExt) { this.zooKeeperExt = zooKeeperExt; this.context = context; this.workerClientRequestProcessor = @@ -133,24 +111,9 @@ public abstract class InputSplitsCallable<I extends WritableComparable, graphState.getTotalNumVertices(), graphState.getTotalNumEdges(), context, graphState.getGraphTaskManager(), workerClientRequestProcessor, null); - this.useLocality = configuration.getBoolean( - GiraphConstants.USE_INPUT_SPLIT_LOCALITY, - GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT); - try { - splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt, - inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(), - threadId, this.useLocality); - } catch (KeeperException e) { - throw new IllegalStateException( - "InputSplitsCallable: KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "InputSplitsCallable: InterruptedException", e); - } + this.useLocality = configuration.useInputSplitLocality(); + this.splitsHandler = splitsHandler; this.configuration = configuration; - this.inputSplitReservedNode = inputSplitReservedNode; - this.inputSplitFinishedNode = inputSplitFinishedNode; - this.inputSplitEvents = inputSplitEvents; } // CHECKSTYLE: resume ParameterNumberCheck @@ -174,7 +137,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable, String inputSplitPath; int inputSplitsProcessed = 0; try { - while ((inputSplitPath = reserveInputSplit()) != null) { + while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) { vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount( loadInputSplit(inputSplitPath, graphState)); @@ -214,120 +177,6 @@ public abstract class InputSplitsCallable<I extends WritableComparable, } /** - * Try to reserve an InputSplit for loading. While InputSplits exists that - * are not finished, wait until they are. - * - * NOTE: iterations on the InputSplit list only halt for each worker when it - * has scanned the entire list once and found every split marked RESERVED. - * When a worker fails, its Ephemeral RESERVED znodes will disappear, - * allowing other iterating workers to claim it's previously read splits. - * Only when the last worker left iterating on the list fails can a danger - * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently - * causes job failure, this is OK. As the failure model evolves, this - * behavior might need to change. - * - * @return reserved InputSplit or null if no unfinished InputSplits exist - * @throws org.apache.zookeeper.KeeperException - * @throws InterruptedException - */ - private String reserveInputSplit() - throws KeeperException, InterruptedException { - String reservedInputSplitPath = null; - Stat reservedStat; - while (true) { - int reservedInputSplits = 0; - for (String nextSplitToClaim : splitOrganizer) { - context.progress(); - String tmpInputSplitReservedPath = nextSplitToClaim + - inputSplitReservedNode; - reservedStat = - zooKeeperExt.exists(tmpInputSplitReservedPath, true); - if (reservedStat == null) { - try { - // Attempt to reserve this InputSplit - zooKeeperExt.createExt(tmpInputSplitReservedPath, - null, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL, - false); - reservedInputSplitPath = nextSplitToClaim; - if (LOG.isInfoEnabled()) { - float percentFinished = - reservedInputSplits * 100.0f / - splitOrganizer.getPathListSize(); - LOG.info("reserveInputSplit: Reserved input " + - "split path " + reservedInputSplitPath + - ", overall roughly " + - + percentFinished + - "% input splits reserved"); - } - return reservedInputSplitPath; - } catch (KeeperException.NodeExistsException e) { - LOG.info("reserveInputSplit: Couldn't reserve " + - "(already reserved) inputSplit" + - " at " + tmpInputSplitReservedPath); - } catch (KeeperException e) { - throw new IllegalStateException( - "reserveInputSplit: KeeperException on reserve", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "reserveInputSplit: InterruptedException " + - "on reserve", e); - } - } else { - ++reservedInputSplits; - } - } - if (LOG.isInfoEnabled()) { - LOG.info("reserveInputSplit: reservedPath = " + - reservedInputSplitPath + ", " + reservedInputSplits + - " of " + splitOrganizer.getPathListSize() + - " InputSplits are finished."); - } - if (reservedInputSplits == splitOrganizer.getPathListSize()) { - return null; - } - context.progress(); - // Wait for either a reservation to go away or a notification that - // an InputSplit has finished. - context.progress(); - inputSplitEvents.getStateChanged().waitMsecs( - 60 * 1000); - inputSplitEvents.getStateChanged().reset(); - } - } - - /** - * Mark an input split path as completed by this worker. This notifies - * the master and the other workers that this input split has not only - * been reserved, but also marked processed. - * - * @param inputSplitPath Path to the input split. - */ - private void markInputSplitPathFinished(String inputSplitPath) { - String inputSplitFinishedPath = - inputSplitPath + inputSplitFinishedNode; - try { - zooKeeperExt.createExt(inputSplitFinishedPath, - null, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath + - " already exists!"); - } catch (KeeperException e) { - throw new IllegalStateException( - "markInputSplitPathFinished: KeeperException on " + - inputSplitFinishedPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "markInputSplitPathFinished: InterruptedException on " + - inputSplitFinishedPath, e); - } - } - - /** * Extract vertices from input split, saving them into a mini cache of * partitions. Periodically flush the cache of vertices when a limit is * reached in readVerticeFromInputSplit. @@ -354,7 +203,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable, LOG.info("loadFromInputSplit: Finished loading " + inputSplitPath + " " + vertexEdgeCount); } - markInputSplitPathFinished(inputSplitPath); + splitsHandler.markInputSplitPathFinished(inputSplitPath); return vertexEdgeCount; } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java new file mode 100644 index 0000000..f7d11a3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; + +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Stores the list of input split paths, and provides thread-safe way for + * reserving input splits. + */ +public class InputSplitsHandler implements Watcher { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(InputSplitsHandler.class); + + /** The List of InputSplit znode paths */ + private final List<String> pathList; + /** Current position in the path list */ + private final AtomicInteger currentIndex; + /** The worker's local ZooKeeperExt ref */ + private final ZooKeeperExt zooKeeper; + /** Context for reporting progress */ + private final Mapper<?, ?, ?, ?>.Context context; + /** ZooKeeper input split reserved node. */ + private final String inputSplitReservedNode; + /** ZooKeeper input split finished node. */ + private final String inputSplitFinishedNode; + + /** + * Constructor + * + * @param splitOrganizer Input splits organizer + * @param zooKeeper The worker's local ZooKeeperExt ref + * @param context Context for reporting progress + * @param inputSplitReservedNode ZooKeeper input split reserved node + * @param inputSplitFinishedNode ZooKeeper input split finished node + */ + public InputSplitsHandler(InputSplitPathOrganizer splitOrganizer, + ZooKeeperExt zooKeeper, Mapper<?, ?, ?, ?>.Context context, + String inputSplitReservedNode, String inputSplitFinishedNode) { + this.pathList = Lists.newArrayList(splitOrganizer.getPathList()); + this.currentIndex = new AtomicInteger(0); + this.zooKeeper = zooKeeper; + this.context = context; + this.inputSplitReservedNode = inputSplitReservedNode; + this.inputSplitFinishedNode = inputSplitFinishedNode; + } + + + /** + * Try to reserve an InputSplit for loading. While InputSplits exists that + * are not finished, wait until they are. + * + * NOTE: iterations on the InputSplit list only halt for each worker when it + * has scanned the entire list once and found every split marked RESERVED. + * When a worker fails, its Ephemeral RESERVED znodes will disappear, + * allowing other iterating workers to claim it's previously read splits. + * Only when the last worker left iterating on the list fails can a danger + * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently + * causes job failure, this is OK. As the failure model evolves, this + * behavior might need to change. We could add watches on + * inputSplitFinishedNodes and stop iterating only when all these nodes + * have been created. + * + * @return reserved InputSplit or null if no unfinished InputSplits exist + * @throws KeeperException + * @throws InterruptedException + */ + public String reserveInputSplit() throws KeeperException, + InterruptedException { + String reservedInputSplitPath; + Stat reservedStat; + while (true) { + int splitToTry = currentIndex.getAndIncrement(); + if (splitToTry >= pathList.size()) { + return null; + } + String nextSplitToClaim = pathList.get(splitToTry); + context.progress(); + String tmpInputSplitReservedPath = + nextSplitToClaim + inputSplitReservedNode; + reservedStat = + zooKeeper.exists(tmpInputSplitReservedPath, this); + if (reservedStat == null) { + try { + // Attempt to reserve this InputSplit + zooKeeper.createExt(tmpInputSplitReservedPath, + null, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL, + false); + reservedInputSplitPath = nextSplitToClaim; + if (LOG.isInfoEnabled()) { + float percentFinished = + splitToTry * 100.0f / pathList.size(); + LOG.info("reserveInputSplit: Reserved input " + + "split path " + reservedInputSplitPath + + ", overall roughly " + + +percentFinished + + "% input splits reserved"); + } + return reservedInputSplitPath; + } catch (KeeperException.NodeExistsException e) { + LOG.info("reserveInputSplit: Couldn't reserve " + + "(already reserved) inputSplit" + + " at " + tmpInputSplitReservedPath); + } catch (KeeperException e) { + throw new IllegalStateException( + "reserveInputSplit: KeeperException on reserve", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "reserveInputSplit: InterruptedException " + + "on reserve", e); + } + } + } + } + + /** + * Mark an input split path as completed by this worker. This notifies + * the master and the other workers that this input split has not only + * been reserved, but also marked processed. + * + * @param inputSplitPath Path to the input split. + */ + public void markInputSplitPathFinished(String inputSplitPath) { + String inputSplitFinishedPath = + inputSplitPath + inputSplitFinishedNode; + try { + zooKeeper.createExt(inputSplitFinishedPath, + null, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath + + " already exists!"); + } catch (KeeperException e) { + throw new IllegalStateException( + "markInputSplitPathFinished: KeeperException on " + + inputSplitFinishedPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "markInputSplitPathFinished: InterruptedException on " + + inputSplitFinishedPath, e); + } + } + + @Override + public void process(WatchedEvent event) { + if (event.getPath() == null) { + LOG.warn("process: Problem with zookeeper, got event with path null, " + + "state " + event.getState() + ", event type " + event.getType()); + return; + } + // Check if the reservation for the input split was lost + // (some worker died) + if (event.getPath().endsWith(inputSplitReservedNode) && + event.getType() == Watcher.Event.EventType.NodeDeleted) { + synchronized (pathList) { + String split = event.getPath(); + split = split.substring(0, split.indexOf(inputSplitReservedNode)); + pathList.add(split); + if (LOG.isInfoEnabled()) { + LOG.info("process: Input split " + split + " lost reservation"); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index a4f98e1..a192aeb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -20,7 +20,6 @@ package org.apache.giraph.worker; import com.yammer.metrics.core.Counter; import java.io.IOException; -import java.util.List; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.VertexEdgeCount; @@ -78,9 +77,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable, * @param graphState Graph state * @param configuration Configuration * @param bspServiceWorker service worker - * @param inputSplitPathList List of the paths of the input splits - * @param workerInfo This worker's info - * @param threadId Id of input split thread + * @param splitsHandler Handler for input splits * @param zooKeeperExt Handle to ZooKeeperExt */ public VertexInputSplitsCallable( @@ -88,15 +85,10 @@ public class VertexInputSplitsCallable<I extends WritableComparable, GraphState<I, V, E, M> graphState, ImmutableClassesGiraphConfiguration<I, V, E, M> configuration, BspServiceWorker<I, V, E, M> bspServiceWorker, - List<String> inputSplitPathList, - WorkerInfo workerInfo, - int threadId, + InputSplitsHandler splitsHandler, ZooKeeperExt zooKeeperExt) { super(context, graphState, configuration, bspServiceWorker, - inputSplitPathList, workerInfo, threadId, zooKeeperExt, - BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE, - BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE, - bspServiceWorker.getVertexInputSplitsEvents()); + splitsHandler, zooKeeperExt); inputSplitMaxVertices = configuration.getInputSplitMaxVertices(); this.bspServiceWorker = bspServiceWorker; http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java index 0d617dc..aebca81 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java @@ -25,8 +25,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.util.List; - /** * Factory for {@link VertexInputSplitsCallable}s. * @@ -46,10 +44,8 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration; /** {@link BspServiceWorker} we're running on. */ private final BspServiceWorker<I, V, E, M> bspServiceWorker; - /** List of input split paths. */ - private final List<String> inputSplitPathList; - /** Worker info. */ - private final WorkerInfo workerInfo; + /** Handler for input splits */ + private final InputSplitsHandler splitsHandler; /** {@link ZooKeeperExt} for this worker. */ private final ZooKeeperExt zooKeeperExt; @@ -60,8 +56,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, * @param graphState Graph state * @param configuration Configuration * @param bspServiceWorker Calling {@link BspServiceWorker} - * @param inputSplitPathList List of input split paths - * @param workerInfo Worker info + * @param splitsHandler Handler for input splits * @param zooKeeperExt {@link ZooKeeperExt} for this worker */ public VertexInputSplitsCallableFactory( @@ -69,16 +64,14 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, GraphState<I, V, E, M> graphState, ImmutableClassesGiraphConfiguration<I, V, E, M> configuration, BspServiceWorker<I, V, E, M> bspServiceWorker, - List<String> inputSplitPathList, - WorkerInfo workerInfo, + InputSplitsHandler splitsHandler, ZooKeeperExt zooKeeperExt) { this.context = context; this.graphState = graphState; this.configuration = configuration; this.bspServiceWorker = bspServiceWorker; - this.inputSplitPathList = inputSplitPathList; - this.workerInfo = workerInfo; this.zooKeeperExt = zooKeeperExt; + this.splitsHandler = splitsHandler; } @Override @@ -88,9 +81,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, graphState, configuration, bspServiceWorker, - inputSplitPathList, - workerInfo, - threadId, + splitsHandler, zooKeeperExt); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java index 987f51c..faf4126 100644 --- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java @@ -60,6 +60,7 @@ import org.apache.zookeeper.KeeperException; import org.junit.Test; import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; @@ -368,11 +369,8 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ when(zk.getData("remote2", false, null)).thenReturn(remote2); when(zk.getData("local", false, null)).thenReturn(local); InputSplitPathOrganizer lis = - new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0, true); - final List<String> resultList = new ArrayList<String>(); - for (String next : lis) { - resultList.add(next); - } + new InputSplitPathOrganizer(zk, testListName, localHost, true); + final List<String> resultList = Lists.newArrayList(lis.getPathList()); assertEquals("local", resultList.get(0)); }
