GIRAPH-1033: Remove zookeeper from input splits handling Summary: Currently we use zookeeper for handling input splits, by having each worker checking each split, and when a lot of splits are used this becomes very slow. We should have master coordinate input splits allocation instead, making the complexity proportional to #splits instead of #workers*#splits. Master holds all the splits and worker send requests to him asking for splits when they need them.
Test Plan: Run a job with 200 machines and 200k small splits - without this change input superstep takes 30 minutes, and with it less than 2 minutes. Also verified correctness on sample job. mvn clean verify passes. Differential Revision: https://reviews.facebook.net/D48531 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5b0cd0e0 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5b0cd0e0 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5b0cd0e0 Branch: refs/heads/trunk Commit: 5b0cd0e0a2ddbf722b6140d28474295c8376e561 Parents: 47da751 Author: Maja Kabiljo <[email protected]> Authored: Mon Oct 12 10:56:39 2015 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Oct 19 10:13:43 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/giraph/bsp/BspService.java | 334 +++---------------- .../giraph/bsp/CentralizedServiceMaster.java | 4 +- .../giraph/bsp/CentralizedServiceWorker.java | 8 + .../org/apache/giraph/comm/MasterClient.java | 9 + .../giraph/comm/netty/NettyMasterClient.java | 6 + .../handler/MasterRequestServerHandler.java | 22 +- .../comm/requests/AskForInputSplitRequest.java | 76 +++++ .../giraph/comm/requests/MasterRequest.java | 6 +- .../requests/ReplyWithInputSplitRequest.java | 81 +++++ .../giraph/comm/requests/RequestType.java | 6 +- .../requests/SendReducedToMasterRequest.java | 6 +- .../giraph/graph/FinishedSuperstepStats.java | 2 +- .../apache/giraph/graph/InputSplitEvents.java | 85 ----- .../apache/giraph/graph/InputSplitPaths.java | 88 ----- .../apache/giraph/graph/VertexEdgeCount.java | 20 +- .../java/org/apache/giraph/io/InputType.java | 31 ++ .../apache/giraph/master/BspServiceMaster.java | 281 +++------------- .../giraph/master/MasterAggregatorHandler.java | 2 +- .../giraph/master/MasterGlobalCommHandler.java | 76 +++++ .../giraph/master/MasterGlobalCommUsage.java | 49 +-- .../MasterGlobalCommUsageAggregators.java | 69 ++++ .../input/BasicInputSplitsMasterOrganizer.java | 46 +++ .../input/InputSplitsMasterOrganizer.java | 32 ++ ...LocalityAwareInputSplitsMasterOrganizer.java | 125 +++++++ .../MappingInputSplitsMasterOrganizer.java | 64 ++++ .../master/input/MasterInputSplitsHandler.java | 140 ++++++++ .../giraph/master/input/package-info.java | 21 ++ .../apache/giraph/partition/PartitionUtils.java | 2 +- .../apache/giraph/worker/BspServiceWorker.java | 187 ++--------- .../giraph/worker/EdgeInputSplitsCallable.java | 16 +- .../worker/EdgeInputSplitsCallableFactory.java | 13 +- .../giraph/worker/FullInputSplitCallable.java | 210 ------------ .../giraph/worker/InputSplitPathOrganizer.java | 142 -------- .../giraph/worker/InputSplitsCallable.java | 77 ++--- .../giraph/worker/InputSplitsHandler.java | 208 ------------ .../worker/MappingInputSplitsCallable.java | 28 +- .../MappingInputSplitsCallableFactory.java | 34 +- .../worker/VertexInputSplitsCallable.java | 16 +- .../VertexInputSplitsCallableFactory.java | 13 +- .../giraph/worker/WorkerInputSplitsHandler.java | 108 ++++++ .../java/org/apache/giraph/TestBspBasic.java | 69 ++-- 41 files changed, 1164 insertions(+), 1648 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 0a5a7ba..15e4dbe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -21,8 +21,6 @@ package org.apache.giraph.bsp; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphTaskManager; -import org.apache.giraph.graph.InputSplitEvents; -import org.apache.giraph.graph.InputSplitPaths; import org.apache.giraph.job.JobProgressTracker; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.utils.CheckpointingUtils; @@ -77,59 +75,13 @@ public abstract class BspService<I extends WritableComparable, /** Master job state znode above base dir */ public static final String MASTER_JOB_STATE_NODE = "/_masterJobState"; - /** Mapping input split directory about base dir */ - public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir"; - /** Mapping input split done directory about base dir */ - public static final String MAPPING_INPUT_SPLIT_DONE_DIR = - "/_mappingInputSplitDoneDir"; - /** Denotes a reserved mapping input split */ - public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE = - "/_mappingInputSplitReserved"; - /** Denotes a finished mapping input split */ - public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE = - "/_mappingInputSplitFinished"; - /** Denotes that all the mapping input splits are are ready for consumption */ - public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE = - "/_mappingInputSplitsAllReady"; - /** Denotes that all the mapping input splits are done. */ - public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE = - "/_mappingInputSplitsAllDone"; - - /** Vertex input split directory about base dir */ - public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir"; - /** Vertex input split done directory about base dir */ - public static final String VERTEX_INPUT_SPLIT_DONE_DIR = - "/_vertexInputSplitDoneDir"; - /** Denotes a reserved vertex input split */ - public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE = - "/_vertexInputSplitReserved"; - /** Denotes a finished vertex input split */ - public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE = - "/_vertexInputSplitFinished"; - /** Denotes that all the vertex input splits are are ready for consumption */ - public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE = - "/_vertexInputSplitsAllReady"; - /** Denotes that all the vertex input splits are done. */ - public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE = - "/_vertexInputSplitsAllDone"; - - /** Edge input split directory about base dir */ - public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir"; - /** Edge input split done directory about base dir */ - public static final String EDGE_INPUT_SPLIT_DONE_DIR = - "/_edgeInputSplitDoneDir"; - /** Denotes a reserved edge input split */ - public static final String EDGE_INPUT_SPLIT_RESERVED_NODE = - "/_edgeInputSplitReserved"; - /** Denotes a finished edge input split */ - public static final String EDGE_INPUT_SPLIT_FINISHED_NODE = - "/_edgeInputSplitFinished"; - /** Denotes that all the edge input splits are are ready for consumption */ - public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE = - "/_edgeInputSplitsAllReady"; - /** Denotes that all the edge input splits are done. */ - public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE = - "/_edgeInputSplitsAllDone"; + /** Input splits worker done directory */ + public static final String INPUT_SPLITS_WORKER_DONE_DIR = + "/_inputSplitsWorkerDoneDir"; + /** Input splits all done node*/ + public static final String INPUT_SPLITS_ALL_DONE_NODE = + "/_inputSplitsAllDone"; + /** Directory of attempts of this application */ public static final String APPLICATION_ATTEMPTS_DIR = "/_applicationAttemptsDir"; @@ -192,18 +144,10 @@ public abstract class BspService<I extends WritableComparable, protected final String basePath; /** Path to the job state determined by the master (informative only) */ protected final String masterJobStatePath; - /** ZooKeeper paths for mapping input splits. */ - protected final InputSplitPaths mappingInputSplitsPaths; - /** ZooKeeper paths for vertex input splits. */ - protected final InputSplitPaths vertexInputSplitsPaths; - /** ZooKeeper paths for edge input splits. */ - protected final InputSplitPaths edgeInputSplitsPaths; - /** Mapping input splits events */ - protected final InputSplitEvents mappingInputSplitsEvents; - /** Vertex input split events. */ - protected final InputSplitEvents vertexInputSplitsEvents; - /** Edge input split events. */ - protected final InputSplitEvents edgeInputSplitsEvents; + /** Input splits worker done directory */ + protected final String inputSplitsWorkerDonePath; + /** Input splits all done node */ + protected final String inputSplitsAllDonePath; /** Path to the application attempts) */ protected final String applicationAttemptsPath; /** Path to the cleaned up notifications */ @@ -226,6 +170,10 @@ public abstract class BspService<I extends WritableComparable, private final BspEvent addressesAndPartitionsReadyChanged; /** Application attempt changed */ private final BspEvent applicationAttemptChanged; + /** Input splits worker done */ + private final BspEvent inputSplitsWorkerDoneEvent; + /** Input splits all done */ + private final BspEvent inputSplitsAllDoneEvent; /** Superstep finished synchronization */ private final BspEvent superstepFinished; /** Master election changed for any waited on attempt */ @@ -269,23 +217,20 @@ public abstract class BspService<I extends WritableComparable, public BspService( Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) { - this.mappingInputSplitsEvents = new InputSplitEvents(context); - this.vertexInputSplitsEvents = new InputSplitEvents(context); - this.edgeInputSplitsEvents = new InputSplitEvents(context); this.connectedEvent = new PredicateLock(context); this.workerHealthRegistrationChanged = new PredicateLock(context); this.addressesAndPartitionsReadyChanged = new PredicateLock(context); this.applicationAttemptChanged = new PredicateLock(context); + this.inputSplitsWorkerDoneEvent = new PredicateLock(context); + this.inputSplitsAllDoneEvent = new PredicateLock(context); this.superstepFinished = new PredicateLock(context); this.masterElectionChildrenChanged = new PredicateLock(context); this.cleanedUpChildrenChanged = new PredicateLock(context); registerBspEvent(connectedEvent); registerBspEvent(workerHealthRegistrationChanged); - registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged()); - registerBspEvent(vertexInputSplitsEvents.getStateChanged()); - registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged()); - registerBspEvent(edgeInputSplitsEvents.getStateChanged()); + registerBspEvent(inputSplitsWorkerDoneEvent); + registerBspEvent(inputSplitsAllDoneEvent); registerBspEvent(addressesAndPartitionsReadyChanged); registerBspEvent(applicationAttemptChanged); registerBspEvent(superstepFinished); @@ -311,16 +256,8 @@ public abstract class BspService<I extends WritableComparable, getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, basePath); masterJobStatePath = basePath + MASTER_JOB_STATE_NODE; - mappingInputSplitsPaths = new InputSplitPaths(basePath, - MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR, - MAPPING_INPUT_SPLITS_ALL_READY_NODE, - MAPPING_INPUT_SPLITS_ALL_DONE_NODE); - vertexInputSplitsPaths = new InputSplitPaths(basePath, - VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR, - VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE); - edgeInputSplitsPaths = new InputSplitPaths(basePath, - EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR, - EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE); + inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR; + inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE; applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; cleanedUpPath = basePath + CLEANED_UP_DIR; @@ -433,24 +370,6 @@ public abstract class BspService<I extends WritableComparable, } /** - * Get the input split events for edge input. - * - * @return InputSplitEvents for edge input. - */ - public InputSplitEvents getEdgeInputSplitsEvents() { - return edgeInputSplitsEvents; - } - - /** - * Get the input split events for vertex input. - * - * @return InputSplitEvents for vertex input. - */ - public InputSplitEvents getVertexInputSplitsEvents() { - return vertexInputSplitsEvents; - } - - /** * Generate the worker information "healthy" directory path for a * superstep * @@ -655,6 +574,14 @@ public abstract class BspService<I extends WritableComparable, return applicationAttemptChanged; } + public final BspEvent getInputSplitsWorkerDoneEvent() { + return inputSplitsWorkerDoneEvent; + } + + public final BspEvent getInputSplitsAllDoneEvent() { + return inputSplitsAllDoneEvent; + } + public final BspEvent getSuperstepFinishedEvent() { return superstepFinished; } @@ -952,9 +879,20 @@ public abstract class BspService<I extends WritableComparable, } workerHealthRegistrationChanged.signal(); eventProcessed = true; - } else if (processMappingEvent(event) || processVertexEvent(event) || - processEdgeEvent(event)) { - return; + } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: all input splits done"); + } + inputSplitsAllDoneEvent.signal(); + eventProcessed = true; + } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: worker done reading input splits"); + } + inputSplitsWorkerDoneEvent.signal(); + eventProcessed = true; } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) && event.getType() == EventType.NodeCreated) { if (LOG.isInfoEnabled()) { @@ -1001,192 +939,6 @@ public abstract class BspService<I extends WritableComparable, } /** - * Process WatchedEvent for Mapping Inputsplits - * - * @param event watched event - * @return true if event processed - */ - public final boolean processMappingEvent(WatchedEvent event) { - boolean eventProcessed = false; - if (event.getPath().equals( - mappingInputSplitsPaths.getAllReadyPath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: inputSplitsReadyChanged " + - "(input splits ready)"); - } - mappingInputSplitsEvents.getAllReadyChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: mappingInputSplitsStateChanged " + - "(made a reservation)"); - } - mappingInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeDeleted)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: mappingInputSplitsStateChanged " + - "(lost a reservation)"); - } - mappingInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: mappingInputSplitsStateChanged " + - "(finished inputsplit)"); - } - mappingInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: mappingInputSplitsDoneStateChanged " + - "(worker finished sending)"); - } - mappingInputSplitsEvents.getDoneStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().equals( - mappingInputSplitsPaths.getAllDonePath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: mappingInputSplitsAllDoneChanged " + - "(all entries sent from input splits)"); - } - mappingInputSplitsEvents.getAllDoneChanged().signal(); - eventProcessed = true; - } - return eventProcessed; - } - - /** - * Process WatchedEvent for Vertex Inputsplits - * - * @param event watched event - * @return true if event processed - */ - public final boolean processVertexEvent(WatchedEvent event) { - boolean eventProcessed = false; - if (event.getPath().equals( - vertexInputSplitsPaths.getAllReadyPath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: inputSplitsReadyChanged " + - "(input splits ready)"); - } - vertexInputSplitsEvents.getAllReadyChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: vertexInputSplitsStateChanged " + - "(made a reservation)"); - } - vertexInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeDeleted)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: vertexInputSplitsStateChanged " + - "(lost a reservation)"); - } - vertexInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: vertexInputSplitsStateChanged " + - "(finished inputsplit)"); - } - vertexInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: vertexInputSplitsDoneStateChanged " + - "(worker finished sending)"); - } - vertexInputSplitsEvents.getDoneStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().equals( - vertexInputSplitsPaths.getAllDonePath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: vertexInputSplitsAllDoneChanged " + - "(all vertices sent from input splits)"); - } - vertexInputSplitsEvents.getAllDoneChanged().signal(); - eventProcessed = true; - } - return eventProcessed; - } - - /** - * Process WatchedEvent for Edge Inputsplits - * - * @param event watched event - * @return true if event processed - */ - public final boolean processEdgeEvent(WatchedEvent event) { - boolean eventProcessed = false; - if (event.getPath().equals( - edgeInputSplitsPaths.getAllReadyPath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: edgeInputSplitsReadyChanged " + - "(input splits ready)"); - } - edgeInputSplitsEvents.getAllReadyChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: edgeInputSplitsStateChanged " + - "(made a reservation)"); - } - edgeInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeDeleted)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: edgeInputSplitsStateChanged " + - "(lost a reservation)"); - } - edgeInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: edgeInputSplitsStateChanged " + - "(finished inputsplit)"); - } - edgeInputSplitsEvents.getStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: edgeInputSplitsDoneStateChanged " + - "(worker finished sending)"); - } - edgeInputSplitsEvents.getDoneStateChanged().signal(); - eventProcessed = true; - } else if (event.getPath().equals( - edgeInputSplitsPaths.getAllDonePath()) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: edgeInputSplitsAllDoneChanged " + - "(all edges sent from input splits)"); - } - edgeInputSplitsEvents.getAllDoneChanged().signal(); - eventProcessed = true; - } - return eventProcessed; - } - - /** * Get the last saved superstep. * * @return Last good superstep number http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java index 1e8d519..f05a79d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -22,8 +22,8 @@ import java.io.IOException; import java.util.List; import org.apache.giraph.master.AggregatorToGlobalCommTranslation; -import org.apache.giraph.master.MasterAggregatorHandler; import org.apache.giraph.master.MasterCompute; +import org.apache.giraph.master.MasterGlobalCommHandler; import org.apache.giraph.master.MasterInfo; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; @@ -144,7 +144,7 @@ public interface CentralizedServiceMaster<I extends WritableComparable, * * @return Global communication handler */ - MasterAggregatorHandler getGlobalCommHandler(); + MasterGlobalCommHandler getGlobalCommHandler(); /** * Handler for aggregators to reduce/broadcast translation http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index f6d77d0..94cd265 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -30,6 +30,7 @@ import org.apache.giraph.metrics.GiraphTimerContext; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.worker.WorkerInputSplitsHandler; import org.apache.giraph.worker.WorkerAggregatorHandler; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerInfo; @@ -252,4 +253,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable, * @return number of partitions owned */ int getNumPartitionsOwned(); + + /** + * Get input splits handler used during input + * + * @return Input splits handler + */ + WorkerInputSplitsHandler getInputSplitsHandler(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java index aea93fd..244dd74 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java @@ -20,6 +20,7 @@ package org.apache.giraph.comm; import java.io.IOException; +import org.apache.giraph.comm.requests.WritableRequest; import org.apache.hadoop.io.Writable; /** @@ -54,6 +55,14 @@ public interface MasterClient { void flush(); /** + * Send a request to a remote server (should be already connected) + * + * @param destTaskId Destination worker id + * @param request Request to send + */ + void sendWritableRequest(int destTaskId, WritableRequest request); + + /** * Closes all connections. */ void closeConnections(); http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java index e110782..9b348e8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java @@ -26,6 +26,7 @@ import org.apache.giraph.comm.MasterClient; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.SendGlobalCommCache; import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest; +import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; @@ -117,6 +118,11 @@ public class NettyMasterClient implements MasterClient { } @Override + public void sendWritableRequest(int destTaskId, WritableRequest request) { + nettyClient.sendWritableRequest(destTaskId, request); + } + + @Override public void closeConnections() { nettyClient.stop(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java index 02c72f7..9aa88ae 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java @@ -21,13 +21,13 @@ package org.apache.giraph.comm.netty.handler; import org.apache.giraph.comm.requests.MasterRequest; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.TaskInfo; -import org.apache.giraph.master.MasterAggregatorHandler; +import org.apache.giraph.master.MasterGlobalCommHandler; /** Handler for requests on master */ public class MasterRequestServerHandler extends RequestServerHandler<MasterRequest> { /** Aggregator handler */ - private final MasterAggregatorHandler aggregatorHandler; + private final MasterGlobalCommHandler commHandler; /** * Constructor @@ -35,22 +35,22 @@ public class MasterRequestServerHandler extends * @param workerRequestReservedMap Worker request reservation map * @param conf Configuration * @param myTaskInfo Current task info - * @param aggregatorHandler Master aggregator handler + * @param commHandler Master communication handler * @param exceptionHandler Handles uncaught exceptions */ public MasterRequestServerHandler( WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo, - MasterAggregatorHandler aggregatorHandler, + MasterGlobalCommHandler commHandler, Thread.UncaughtExceptionHandler exceptionHandler) { super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler); - this.aggregatorHandler = aggregatorHandler; + this.commHandler = commHandler; } @Override public void processRequest(MasterRequest request) { - request.doRequest(aggregatorHandler); + request.doRequest(commHandler); } /** @@ -58,15 +58,15 @@ public class MasterRequestServerHandler extends */ public static class Factory implements RequestServerHandler.Factory { /** Master aggregator handler */ - private final MasterAggregatorHandler aggregatorHandler; + private final MasterGlobalCommHandler commHandler; /** * Constructor * - * @param aggregatorHandler Master aggregator handler + * @param commHandler Master global communication handler */ - public Factory(MasterAggregatorHandler aggregatorHandler) { - this.aggregatorHandler = aggregatorHandler; + public Factory(MasterGlobalCommHandler commHandler) { + this.commHandler = commHandler; } @Override @@ -76,7 +76,7 @@ public class MasterRequestServerHandler extends TaskInfo myTaskInfo, Thread.UncaughtExceptionHandler exceptionHandler) { return new MasterRequestServerHandler(workerRequestReservedMap, conf, - myTaskInfo, aggregatorHandler, exceptionHandler); + myTaskInfo, commHandler, exceptionHandler); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java new file mode 100644 index 0000000..5d9e4e6 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import org.apache.giraph.master.MasterGlobalCommHandler; +import org.apache.giraph.io.InputType; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A request which workers will send to master to ask it to give them splits + */ +public class AskForInputSplitRequest extends WritableRequest + implements MasterRequest { + /** Type of split we are requesting */ + private InputType splitType; + /** Task id of worker which requested the split */ + private int workerTaskId; + + /** + * Constructor + * + * @param splitType Type of split we are requesting + * @param workerTaskId Task id of worker which requested the split + */ + public AskForInputSplitRequest(InputType splitType, int workerTaskId) { + this.splitType = splitType; + this.workerTaskId = workerTaskId; + } + + /** + * Constructor used for reflection only + */ + public AskForInputSplitRequest() { + } + + @Override + public void doRequest(MasterGlobalCommHandler commHandler) { + commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId); + } + + @Override + void readFieldsRequest(DataInput in) throws IOException { + splitType = InputType.values()[in.readInt()]; + workerTaskId = in.readInt(); + } + + @Override + void writeRequest(DataOutput out) throws IOException { + out.writeInt(splitType.ordinal()); + out.writeInt(workerTaskId); + } + + @Override + public RequestType getType() { + return RequestType.ASK_FOR_INPUT_SPLIT_REQUEST; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java index 7fedcc5..43632b0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java @@ -18,7 +18,7 @@ package org.apache.giraph.comm.requests; -import org.apache.giraph.master.MasterAggregatorHandler; +import org.apache.giraph.master.MasterGlobalCommHandler; /** * Interface for requests sent to master to extend @@ -27,7 +27,7 @@ public interface MasterRequest { /** * Execute the request * - * @param aggregatorHandler Master aggregator handler + * @param commHandler Master communication handler */ - void doRequest(MasterAggregatorHandler aggregatorHandler); + void doRequest(MasterGlobalCommHandler commHandler); } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java new file mode 100644 index 0000000..6b50562 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import org.apache.giraph.comm.ServerData; +import org.apache.giraph.io.InputType; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A request which master will send to workers to give them splits + */ +public class ReplyWithInputSplitRequest extends WritableRequest + implements WorkerRequest { + /** Type of input split */ + private InputType splitType; + /** Serialized input split */ + private byte[] serializedInputSplit; + + /** + * Constructor + * + * @param splitType Type of input split + * @param serializedInputSplit Serialized input split + */ + public ReplyWithInputSplitRequest(InputType splitType, + byte[] serializedInputSplit) { + this.splitType = splitType; + this.serializedInputSplit = serializedInputSplit; + } + + /** + * Constructor used for reflection only + */ + public ReplyWithInputSplitRequest() { + } + + @Override + void readFieldsRequest(DataInput in) throws IOException { + splitType = InputType.values()[in.readInt()]; + int size = in.readInt(); + serializedInputSplit = new byte[size]; + in.readFully(serializedInputSplit); + } + + @Override + void writeRequest(DataOutput out) throws IOException { + out.writeInt(splitType.ordinal()); + out.writeInt(serializedInputSplit.length); + out.write(serializedInputSplit); + } + + @Override + public void doRequest(ServerData serverData) { + serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit( + splitType, serializedInputSplit); + } + + @Override + public RequestType getType() { + return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java index 343a2de..bebac28 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java @@ -60,7 +60,11 @@ else[HADOOP_NON_SECURE]*/ /** Send aggregators from worker owner to other workers */ SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class), /** Send message from worker to worker */ - SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class); + SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class), + /** Send request for input split from worker to master */ + ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class), + /** Send request with granted input split from master to workers */ + REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class); /** Class of request which this type corresponds to */ private final Class<? extends WritableRequest> requestClass; http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java index 7171f04..3a1bd64 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java @@ -20,7 +20,7 @@ package org.apache.giraph.comm.requests; import java.io.IOException; -import org.apache.giraph.master.MasterAggregatorHandler; +import org.apache.giraph.master.MasterGlobalCommHandler; /** * Request to send final aggregated values from worker which owns @@ -45,9 +45,9 @@ public class SendReducedToMasterRequest extends ByteArrayRequest } @Override - public void doRequest(MasterAggregatorHandler aggregatorHandler) { + public void doRequest(MasterGlobalCommHandler commHandler) { try { - aggregatorHandler.acceptReducedValues(getDataInput()); + commHandler.getAggregatorHandler().acceptReducedValues(getDataInput()); } catch (IOException e) { throw new IllegalStateException("doRequest: " + "IOException occurred while processing request", e); http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java index cfb9799..c53b34f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java @@ -51,7 +51,7 @@ public class FinishedSuperstepStats extends VertexEdgeCount { long numEdges, boolean mustLoadCheckpoint, CheckpointStatus checkpointStatus) { - super(numVertices, numEdges); + super(numVertices, numEdges, 0); this.localVertexCount = numLocalVertices; this.allVerticesHalted = allVerticesHalted; this.mustLoadCheckpoint = mustLoadCheckpoint; http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java deleted file mode 100644 index 23be1c4..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.giraph.zk.BspEvent; -import org.apache.giraph.zk.PredicateLock; -import org.apache.hadoop.util.Progressable; - -/** - * Simple container of input split events. - */ -public class InputSplitEvents { - /** Input splits are ready for consumption by workers */ - private final BspEvent allReadyChanged; - /** Input split reservation or finished notification and synchronization */ - private final BspEvent stateChanged; - /** Input splits are done being processed by workers */ - private final BspEvent allDoneChanged; - /** Input split done by a worker finished notification and synchronization */ - private final BspEvent doneStateChanged; - - /** - * Constructor. - * - * @param progressable {@link Progressable} to report progress - */ - public InputSplitEvents(Progressable progressable) { - allReadyChanged = new PredicateLock(progressable); - stateChanged = new PredicateLock(progressable); - allDoneChanged = new PredicateLock(progressable); - doneStateChanged = new PredicateLock(progressable); - } - - /** - * Get event for input splits all ready - * - * @return {@link BspEvent} for input splits all ready - */ - public BspEvent getAllReadyChanged() { - return allReadyChanged; - } - - /** - * Get event for input splits state - * - * @return {@link BspEvent} for input splits state - */ - public BspEvent getStateChanged() { - return stateChanged; - } - - /** - * Get event for input splits all done - * - * @return {@link BspEvent} for input splits all done - */ - public BspEvent getAllDoneChanged() { - return allDoneChanged; - } - - /** - * Get event for input split done - * - * @return {@link BspEvent} for input split done - */ - public BspEvent getDoneStateChanged() { - return doneStateChanged; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java deleted file mode 100644 index 4cf005e..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -/** - * Simple container of input split paths for coordination via ZooKeeper. - */ -public class InputSplitPaths { - /** Path to the input splits written by the master */ - private final String path; - /** Path to the input splits all ready to be processed by workers */ - private final String allReadyPath; - /** Path to the input splits done */ - private final String donePath; - /** Path to the input splits all done to notify the workers to proceed */ - private final String allDonePath; - - /** - * Constructor. - * - * @param basePath Base path - * @param dir Input splits path - * @param doneDir Input split done path - * @param allReadyNode Input splits all ready path - * @param allDoneNode Input splits all done path - */ - public InputSplitPaths(String basePath, - String dir, - String doneDir, - String allReadyNode, - String allDoneNode) { - path = basePath + dir; - allReadyPath = basePath + allReadyNode; - donePath = basePath + doneDir; - allDonePath = basePath + allDoneNode; - } - - /** - * Get path to the input splits. - * - * @return Path to input splits - */ - public String getPath() { - return path; - } - - /** - * Get path to the input splits all ready. - * - * @return Path to input splits all ready - */ - public String getAllReadyPath() { - return allReadyPath; - } - - /** Get path to the input splits done. - * - * @return Path to input splits done - */ - public String getDonePath() { - return donePath; - } - - /** - * Get path to the input splits all done. - * - * @return Path to input splits all done - */ - public String getAllDonePath() { - return allDonePath; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java index c2d13cc..1c871f0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java @@ -26,6 +26,8 @@ public class VertexEdgeCount { private final long vertexCount; /** Immutable edges */ private final long edgeCount; + /** Immutable mappings */ + private final long mappingCount; /** * Default constructor. @@ -33,6 +35,7 @@ public class VertexEdgeCount { public VertexEdgeCount() { vertexCount = 0; edgeCount = 0; + mappingCount = 0; } /** @@ -40,10 +43,12 @@ public class VertexEdgeCount { * * @param vertexCount Final number of vertices. * @param edgeCount Final number of edges. + * @param mappingCount Final number of mappings. */ - public VertexEdgeCount(long vertexCount, long edgeCount) { + public VertexEdgeCount(long vertexCount, long edgeCount, long mappingCount) { this.vertexCount = vertexCount; this.edgeCount = edgeCount; + this.mappingCount = mappingCount; } public long getVertexCount() { @@ -54,6 +59,10 @@ public class VertexEdgeCount { return edgeCount; } + public long getMappingCount() { + return mappingCount; + } + /** * Increment the both the vertex edge count with a {@link VertexEdgeCount}. * @@ -64,7 +73,8 @@ public class VertexEdgeCount { VertexEdgeCount vertexEdgeCount) { return new VertexEdgeCount( vertexCount + vertexEdgeCount.getVertexCount(), - edgeCount + vertexEdgeCount.getEdgeCount()); + edgeCount + vertexEdgeCount.getEdgeCount(), + mappingCount + vertexEdgeCount.getMappingCount()); } /** @@ -78,11 +88,13 @@ public class VertexEdgeCount { long vertexCount, long edgeCount) { return new VertexEdgeCount( this.vertexCount + vertexCount, - this.edgeCount + edgeCount); + this.edgeCount + edgeCount, + this.mappingCount + mappingCount); } @Override public String toString() { - return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")"; + return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + + (mappingCount > 0 ? ", m=" + mappingCount : "") + ")"; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/io/InputType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/InputType.java b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java new file mode 100644 index 0000000..26ee966 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +/** + * Type of input + */ +public enum InputType { + /** Vertex input */ + VERTEX, + /** Edge input */ + EDGE, + /** Mapping input */ + MAPPING +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 0b56a4f..0e7bb9d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -21,12 +21,8 @@ package org.apache.giraph.master; import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT; import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA; import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT; -import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; import java.io.PrintStream; import java.nio.charset.Charset; @@ -38,9 +34,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import net.iharder.Base64; @@ -66,12 +59,12 @@ import org.apache.giraph.graph.GlobalStats; import org.apache.giraph.graph.GraphFunctions; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.GraphTaskManager; -import org.apache.giraph.graph.InputSplitEvents; -import org.apache.giraph.graph.InputSplitPaths; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.VertexInputFormat; +import org.apache.giraph.io.InputType; +import org.apache.giraph.master.input.MasterInputSplitsHandler; import org.apache.giraph.metrics.AggregatedMetrics; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphTimer; @@ -88,8 +81,6 @@ import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.utils.CheckpointingUtils; import org.apache.giraph.utils.JMapHistoDumper; -import org.apache.giraph.utils.LogStacktraceCallable; -import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.ReactiveJMapHistoDumper; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.WritableUtils; @@ -99,7 +90,6 @@ import org.apache.giraph.zk.PredicateLock; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobID; @@ -170,7 +160,7 @@ public class BspServiceMaster<I extends WritableComparable, private final List<PartitionStats> allPartitionStatsList = new ArrayList<PartitionStats>(); /** Handler for global communication */ - private MasterAggregatorHandler globalCommHandler; + private MasterGlobalCommHandler globalCommHandler; /** Handler for aggregators to reduce/broadcast translation */ private AggregatorToGlobalCommTranslation aggregatorTranslation; /** Master class */ @@ -331,7 +321,7 @@ public class BspServiceMaster<I extends WritableComparable, */ private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat, int minSplitCountHint, - String inputSplitType) { + InputType inputSplitType) { String logPrefix = "generate" + inputSplitType + "InputSplits"; List<InputSplit> splits; try { @@ -604,46 +594,25 @@ public class BspServiceMaster<I extends WritableComparable, * Common method for creating vertex/edge input splits. * * @param inputFormat The vertex/edge input format - * @param inputSplitPaths ZooKeeper input split paths * @param inputSplitType Type of input split (for logging purposes) * @return Number of splits. Returns -1 on failure to create * valid input splits. */ private int createInputSplits(GiraphInputFormat inputFormat, - InputSplitPaths inputSplitPaths, - String inputSplitType) { + InputType inputSplitType) { ImmutableClassesGiraphConfiguration conf = getConfiguration(); String logPrefix = "create" + inputSplitType + "InputSplits"; // Only the 'master' should be doing this. Wait until the number of // processes that have reported health exceeds the minimum percentage. // If the minimum percentage is not met, fail the job. Otherwise // generate the input splits - String inputSplitsPath = inputSplitPaths.getPath(); - try { - if (getZkExt().exists(inputSplitsPath, false) != null) { - LOG.info(inputSplitsPath + " already exists, no need to create"); - return Integer.parseInt( - new String(getZkExt().getData(inputSplitsPath, false, null), - Charset.defaultCharset())); - } - } catch (KeeperException.NoNodeException e) { - if (LOG.isInfoEnabled()) { - LOG.info(logPrefix + ": Need to create the input splits at " + - inputSplitsPath); - } - } catch (KeeperException e) { - throw new IllegalStateException(logPrefix + ": KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException(logPrefix + ": InterruptedException", e); - } - - // When creating znodes, in case the master has already run, resume - // where it left off. List<WorkerInfo> healthyWorkerInfoList = checkWorkers(); if (healthyWorkerInfoList == null) { setJobStateFailed("Not enough healthy workers to create input splits"); return -1; } + globalCommHandler.getInputSplitsHandler().initialize(masterClient, + healthyWorkerInfoList); // Create at least as many splits as the total number of input threads. int minSplitCountHint = healthyWorkerInfoList.size() * @@ -671,54 +640,8 @@ public class BspServiceMaster<I extends WritableComparable, "some threads will be not used"); } - // Write input splits to zookeeper in parallel - int inputSplitThreadCount = conf.getInt(NUM_MASTER_ZK_INPUT_SPLIT_THREADS, - DEFAULT_INPUT_SPLIT_THREAD_COUNT); - if (LOG.isInfoEnabled()) { - LOG.info(logPrefix + ": Starting to write input split data " + - "to zookeeper with " + inputSplitThreadCount + " threads"); - } - try { - getZkExt().createExt(inputSplitsPath, null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - false); - } catch (KeeperException e) { - LOG.info(logPrefix + ": Node " + - inputSplitsPath + " keeper exception " + e); - } catch (InterruptedException e) { - throw new IllegalStateException(logPrefix + ' ' + e.getMessage(), e); - } - ExecutorService taskExecutor = - Executors.newFixedThreadPool(inputSplitThreadCount); - boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf); - for (int i = 0; i < splitList.size(); ++i) { - InputSplit inputSplit = splitList.get(i); - taskExecutor.submit(new LogStacktraceCallable<Void>( - new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i, - writeLocations))); - } - taskExecutor.shutdown(); - ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext()); - if (LOG.isInfoEnabled()) { - LOG.info(logPrefix + ": Done writing input split data to zookeeper"); - } - - // Let workers know they can start trying to load the input splits - try { - getZkExt().createExt(inputSplitPaths.getAllReadyPath(), - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - false); - } catch (KeeperException.NodeExistsException e) { - LOG.info(logPrefix + ": Node " + - inputSplitPaths.getAllReadyPath() + " already exists."); - } catch (KeeperException e) { - throw new IllegalStateException(logPrefix + ": KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException(logPrefix + ": IllegalStateException", e); - } + globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType, + splitList, inputFormat); return splitList.size(); } @@ -730,8 +653,7 @@ public class BspServiceMaster<I extends WritableComparable, } MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat = getConfiguration().createWrappedMappingInputFormat(); - return createInputSplits(mappingInputFormat, mappingInputSplitsPaths, - "Mapping"); + return createInputSplits(mappingInputFormat, InputType.MAPPING); } @Override @@ -742,8 +664,7 @@ public class BspServiceMaster<I extends WritableComparable, } VertexInputFormat<I, V, E> vertexInputFormat = getConfiguration().createWrappedVertexInputFormat(); - return createInputSplits(vertexInputFormat, vertexInputSplitsPaths, - "Vertex"); + return createInputSplits(vertexInputFormat, InputType.VERTEX); } @Override @@ -754,8 +675,7 @@ public class BspServiceMaster<I extends WritableComparable, } EdgeInputFormat<I, E> edgeInputFormat = getConfiguration().createWrappedEdgeInputFormat(); - return createInputSplits(edgeInputFormat, edgeInputSplitsPaths, - "Edge"); + return createInputSplits(edgeInputFormat, InputType.EDGE); } @Override @@ -764,7 +684,7 @@ public class BspServiceMaster<I extends WritableComparable, } @Override - public MasterAggregatorHandler getGlobalCommHandler() { + public MasterGlobalCommHandler getGlobalCommHandler() { return globalCommHandler; } @@ -838,7 +758,7 @@ public class BspServiceMaster<I extends WritableComparable, }); - globalCommHandler.readFields(finalizedStream); + globalCommHandler.getAggregatorHandler().readFields(finalizedStream); aggregatorTranslation.readFields(finalizedStream); masterCompute.readFields(finalizedStream); finalizedStream.close(); @@ -911,12 +831,15 @@ public class BspServiceMaster<I extends WritableComparable, if (masterChildArr.get(0).equals(myBid)) { GiraphStats.getInstance().getCurrentMasterTaskPartition(). setValue(getTaskPartition()); - globalCommHandler = new MasterAggregatorHandler( - getConfiguration(), getContext()); + + globalCommHandler = new MasterGlobalCommHandler( + new MasterAggregatorHandler(getConfiguration(), getContext()), + new MasterInputSplitsHandler( + getConfiguration().useInputSplitLocality())); aggregatorTranslation = new AggregatorToGlobalCommTranslation( getConfiguration(), globalCommHandler); - globalCommHandler.initialize(this); + globalCommHandler.getAggregatorHandler().initialize(this); masterCompute = getConfiguration().createMasterCompute(); masterCompute.setMasterService(this); @@ -1128,7 +1051,7 @@ public class BspServiceMaster<I extends WritableComparable, for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo)); } - globalCommHandler.write(finalizedOutputStream); + globalCommHandler.getAggregatorHandler().write(finalizedOutputStream); aggregatorTranslation.write(finalizedOutputStream); masterCompute.write(finalizedOutputStream); finalizedOutputStream.close(); @@ -1265,12 +1188,8 @@ public class BspServiceMaster<I extends WritableComparable, @Override public void restartFromCheckpoint(long checkpoint) { // Process: - // 1. Remove all old input split data - // 2. Increase the application attempt and set to the correct checkpoint - // 3. Send command to all workers to restart their tasks - zkDeleteNode(vertexInputSplitsPaths.getPath()); - zkDeleteNode(edgeInputSplitsPaths.getPath()); - + // 1. Increase the application attempt and set to the correct checkpoint + // 2. Send command to all workers to restart their tasks setApplicationAttempt(getApplicationAttempt() + 1); setCachedSuperstep(checkpoint); setRestartedSuperstep(checkpoint); @@ -1493,37 +1412,32 @@ public class BspServiceMaster<I extends WritableComparable, /** * Coordinate the exchange of vertex/edge input splits among workers. - * - * @param inputSplitPaths Input split paths - * @param inputSplitEvents Input split events - * @param inputSplitsType Type of input splits (for logging purposes) */ - private void coordinateInputSplits(InputSplitPaths inputSplitPaths, - InputSplitEvents inputSplitEvents, - String inputSplitsType) { + private void coordinateInputSplits() { // Coordinate the workers finishing sending their vertices/edges to the // correct workers and signal when everything is done. - String logPrefix = "coordinate" + inputSplitsType + "InputSplits"; - if (!barrierOnWorkerList(inputSplitPaths.getDonePath(), + if (!barrierOnWorkerList(inputSplitsWorkerDonePath, chosenWorkerInfoList, - inputSplitEvents.getDoneStateChanged(), + getInputSplitsWorkerDoneEvent(), false)) { - throw new IllegalStateException(logPrefix + ": Worker failed during " + - "input split (currently not supported)"); + throw new IllegalStateException("coordinateInputSplits: Worker failed " + + "during input split (currently not supported)"); } try { - getZkExt().createExt(inputSplitPaths.getAllDonePath(), + getZkExt().createExt(inputSplitsAllDonePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false); } catch (KeeperException.NodeExistsException e) { LOG.info("coordinateInputSplits: Node " + - inputSplitPaths.getAllDonePath() + " already exists."); + inputSplitsAllDonePath + " already exists."); } catch (KeeperException e) { - throw new IllegalStateException(logPrefix + ": KeeperException", e); + throw new IllegalStateException( + "coordinateInputSplits: KeeperException", e); } catch (InterruptedException e) { - throw new IllegalStateException(logPrefix + ": IllegalStateException", e); + throw new IllegalStateException( + "coordinateInputSplits: IllegalStateException", e); } } @@ -1543,7 +1457,7 @@ public class BspServiceMaster<I extends WritableComparable, */ private void initializeAggregatorInputSuperstep() throws InterruptedException { - globalCommHandler.prepareSuperstep(); + globalCommHandler.getAggregatorHandler().prepareSuperstep(); prepareMasterCompute(getSuperstep()); try { @@ -1559,9 +1473,9 @@ public class BspServiceMaster<I extends WritableComparable, "initializeAggregatorInputSuperstep: Failed in access", e); } aggregatorTranslation.postMasterCompute(); - globalCommHandler.finishSuperstep(); + globalCommHandler.getAggregatorHandler().finishSuperstep(); - globalCommHandler.sendDataToOwners(masterClient); + globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient); } /** @@ -1627,7 +1541,7 @@ public class BspServiceMaster<I extends WritableComparable, // We need to finalize aggregators from previous superstep if (getSuperstep() >= 0) { aggregatorTranslation.postMasterCompute(); - globalCommHandler.finishSuperstep(); + globalCommHandler.getAggregatorHandler().finishSuperstep(); } masterClient.openConnections(); @@ -1663,25 +1577,13 @@ public class BspServiceMaster<I extends WritableComparable, // We need to send aggregators to worker owners after new worker assignments if (getSuperstep() >= 0) { - globalCommHandler.sendDataToOwners(masterClient); + globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient); } if (getSuperstep() == INPUT_SUPERSTEP) { // Initialize aggregators before coordinating initializeAggregatorInputSuperstep(); - if (getConfiguration().hasMappingInputFormat()) { - coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents, - "Mapping"); - } - // vertex loading and edge loading - if (getConfiguration().hasVertexInputFormat()) { - coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents, - "Vertex"); - } - if (getConfiguration().hasEdgeInputFormat()) { - coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents, - "Edge"); - } + coordinateInputSplits(); } String finishedWorkerPath = @@ -1695,7 +1597,7 @@ public class BspServiceMaster<I extends WritableComparable, // Collect aggregator values, then run the master.compute() and // finally save the aggregator values - globalCommHandler.prepareSuperstep(); + globalCommHandler.getAggregatorHandler().prepareSuperstep(); aggregatorTranslation.prepareSuperstep(); SuperstepClasses superstepClasses = @@ -1761,7 +1663,8 @@ public class BspServiceMaster<I extends WritableComparable, } else { superstepState = SuperstepState.THIS_SUPERSTEP_DONE; } - globalCommHandler.writeAggregators(getSuperstep(), superstepState); + globalCommHandler.getAggregatorHandler().writeAggregators( + getSuperstep(), superstepState); return superstepState; } @@ -2009,7 +1912,7 @@ public class BspServiceMaster<I extends WritableComparable, failJob(new Exception("Checkpoint and halt requested. " + "Killing this job.")); } - globalCommHandler.close(); + globalCommHandler.getAggregatorHandler().close(); masterClient.closeConnections(); masterServer.close(); } @@ -2122,100 +2025,4 @@ public class BspServiceMaster<I extends WritableComparable, gs.getAggregateSentMessageBytes() .increment(globalStats.getMessageBytesCount()); } - - /** - * Task that writes a given input split to zookeeper. - * Upon failure call() throws an exception. - */ - private class WriteInputSplit implements Callable<Void> { - /** Input format */ - private final GiraphInputFormat inputFormat; - /** Input split which we are going to write */ - private final InputSplit inputSplit; - /** Input splits path */ - private final String inputSplitsPath; - /** Index of the input split */ - private final int index; - /** Whether to write locality information */ - private final boolean writeLocations; - - /** - * Constructor - * - * @param inputFormat Input format - * @param inputSplit Input split which we are going to write - * @param inputSplitsPath Input splits path - * @param index Index of the input split - * @param writeLocations whether to write the input split's locations (to - * be used by workers for prioritizing local splits - * when reading) - */ - public WriteInputSplit(GiraphInputFormat inputFormat, - InputSplit inputSplit, - String inputSplitsPath, - int index, - boolean writeLocations) { - this.inputFormat = inputFormat; - this.inputSplit = inputSplit; - this.inputSplitsPath = inputSplitsPath; - this.index = index; - this.writeLocations = writeLocations; - } - - @Override - public Void call() { - String inputSplitPath = null; - try { - ByteArrayOutputStream byteArrayOutputStream = - new ByteArrayOutputStream(); - DataOutput outputStream = - new DataOutputStream(byteArrayOutputStream); - - if (writeLocations) { - String[] splitLocations = inputSplit.getLocations(); - StringBuilder locations = null; - if (splitLocations != null) { - int splitListLength = - Math.min(splitLocations.length, localityLimit); - locations = new StringBuilder(); - for (String location : splitLocations) { - locations.append(location) - .append(--splitListLength > 0 ? "\t" : ""); - } - } - Text.writeString(outputStream, - locations == null ? "" : locations.toString()); - } - - inputFormat.writeInputSplit(inputSplit, outputStream); - inputSplitPath = inputSplitsPath + "/" + index; - getZkExt().createExt(inputSplitPath, - byteArrayOutputStream.toByteArray(), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - - if (LOG.isDebugEnabled()) { - LOG.debug("call: Created input split " + - "with index " + index + " serialized as " + - byteArrayOutputStream.toString(Charset.defaultCharset().name())); - } - } catch (KeeperException.NodeExistsException e) { - if (LOG.isInfoEnabled()) { - LOG.info("call: Node " + - inputSplitPath + " already exists."); - } - } catch (KeeperException e) { - throw new IllegalStateException( - "call: KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "call: IllegalStateException", e); - } catch (IOException e) { - throw new IllegalStateException( - "call: IOException", e); - } - return null; - } - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java index 5558cee..8ca3d3a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -42,7 +42,7 @@ import com.google.common.collect.Maps; /** Handler for reduce/broadcast on the master */ public class MasterAggregatorHandler - implements MasterGlobalCommUsage, Writable { + implements MasterGlobalCommUsageAggregators, Writable { /** Class logger */ private static final Logger LOG = Logger.getLogger(MasterAggregatorHandler.class); http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java new file mode 100644 index 0000000..717a24d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master; + +import org.apache.giraph.master.input.MasterInputSplitsHandler; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.hadoop.io.Writable; + +/** + * Handler for all master communications + */ +public class MasterGlobalCommHandler implements MasterGlobalCommUsage { + /** Aggregator handler */ + private final MasterAggregatorHandler aggregatorHandler; + /** Input splits handler*/ + private final MasterInputSplitsHandler inputSplitsHandler; + + /** + * Constructor + * + * @param aggregatorHandler Aggregator handler + * @param inputSplitsHandler Input splits handler + */ + public MasterGlobalCommHandler( + MasterAggregatorHandler aggregatorHandler, + MasterInputSplitsHandler inputSplitsHandler) { + this.aggregatorHandler = aggregatorHandler; + this.inputSplitsHandler = inputSplitsHandler; + } + + public MasterAggregatorHandler getAggregatorHandler() { + return aggregatorHandler; + } + + public MasterInputSplitsHandler getInputSplitsHandler() { + return inputSplitsHandler; + } + + @Override + public <S, R extends Writable> void registerReducer(String name, + ReduceOperation<S, R> reduceOp) { + aggregatorHandler.registerReducer(name, reduceOp); + } + + @Override + public <S, R extends Writable> void registerReducer(String name, + ReduceOperation<S, R> reduceOp, R globalInitialValue) { + aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue); + } + + @Override + public <R extends Writable> R getReduced(String name) { + return aggregatorHandler.getReduced(name); + } + + @Override + public void broadcast(String name, Writable value) { + aggregatorHandler.broadcast(name, value); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java index 7ee9048..60b1809 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java @@ -17,52 +17,9 @@ */ package org.apache.giraph.master; -import org.apache.giraph.reducers.ReduceOperation; -import org.apache.hadoop.io.Writable; - /** - * Master compute can access reduce and broadcast methods - * through this interface, from masterCompute method. + * All global master communication */ -public interface MasterGlobalCommUsage { - /** - * Register reducer to be reduced in the next worker computation, - * using given name and operations. - * @param name Name of the reducer - * @param reduceOp Reduce operations - * @param <S> Single value type - * @param <R> Reduced value type - */ - <S, R extends Writable> void registerReducer( - String name, ReduceOperation<S, R> reduceOp); - - /** - * Register reducer to be reduced in the next worker computation, using - * given name and operations, starting globally from globalInitialValue. - * (globalInitialValue is reduced only once, each worker will still start - * from neutral initial value) - * - * @param name Name of the reducer - * @param reduceOp Reduce operations - * @param globalInitialValue Global initial value - * @param <S> Single value type - * @param <R> Reduced value type - */ - <S, R extends Writable> void registerReducer( - String name, ReduceOperation<S, R> reduceOp, R globalInitialValue); - - /** - * Get reduced value from previous worker computation. - * @param name Name of the reducer - * @return Reduced value - * @param <R> Reduced value type - */ - <R extends Writable> R getReduced(String name); - - /** - * Broadcast given value to all workers for next computation. - * @param name Name of the broadcast object - * @param value Value - */ - void broadcast(String name, Writable value); +public interface MasterGlobalCommUsage + extends MasterGlobalCommUsageAggregators { } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java new file mode 100644 index 0000000..62c1f3f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master; + +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.hadoop.io.Writable; + +/** + * Master compute can access reduce and broadcast methods + * through this interface, from masterCompute method. + */ +public interface MasterGlobalCommUsageAggregators { + /** + * Register reducer to be reduced in the next worker computation, + * using given name and operations. + * @param name Name of the reducer + * @param reduceOp Reduce operations + * @param <S> Single value type + * @param <R> Reduced value type + */ + <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp); + + /** + * Register reducer to be reduced in the next worker computation, using + * given name and operations, starting globally from globalInitialValue. + * (globalInitialValue is reduced only once, each worker will still start + * from neutral initial value) + * + * @param name Name of the reducer + * @param reduceOp Reduce operations + * @param globalInitialValue Global initial value + * @param <S> Single value type + * @param <R> Reduced value type + */ + <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp, R globalInitialValue); + + /** + * Get reduced value from previous worker computation. + * @param name Name of the reducer + * @return Reduced value + * @param <R> Reduced value type + */ + <R extends Writable> R getReduced(String name); + + /** + * Broadcast given value to all workers for next computation. + * @param name Name of the broadcast object + * @param value Value + */ + void broadcast(String name, Writable value); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java new file mode 100644 index 0000000..5168e32 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master.input; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Input splits organizer for vertex and edge input splits on master, which + * doesn't use locality information + */ +public class BasicInputSplitsMasterOrganizer + implements InputSplitsMasterOrganizer { + /** Available splits queue */ + private final ConcurrentLinkedQueue<byte[]> splits; + + /** + * Constructor + * + * @param serializedSplits Splits + */ + public BasicInputSplitsMasterOrganizer(List<byte[]> serializedSplits) { + splits = new ConcurrentLinkedQueue<>(serializedSplits); + } + + @Override + public byte[] getSerializedSplitFor(int workerTaskId) { + return splits.poll(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java new file mode 100644 index 0000000..d5a0131 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master.input; + +/** + * Interface for different input split organizers on master + */ +public interface InputSplitsMasterOrganizer { + /** + * @param workerTaskId Id of worker requesting split + * + * @return Get next split for the worker, or null if all splits were taken + * already + */ + byte[] getSerializedSplitFor(int workerTaskId); +}
