http://git-wip-us.apache.org/repos/asf/giraph/blob/303386f7/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index f2ccb24..d5ad62b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -31,16 +31,16 @@ import org.apache.giraph.comm.netty.NettyWorkerClient; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; import org.apache.giraph.comm.netty.NettyWorkerServer; import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.graph.AddressesAndPartitionsWritable; +import org.apache.giraph.graph.GraphState; import org.apache.giraph.bsp.BspService; +import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.graph.InputSplitPaths; +import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.FinishedSuperstepStats; +import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.GlobalStats; -import org.apache.giraph.graph.GraphMapper; -import org.apache.giraph.graph.GraphState; -import org.apache.giraph.graph.InputSplitEvents; -import org.apache.giraph.graph.InputSplitPaths; import org.apache.giraph.vertex.Vertex; -import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; import org.apache.giraph.partition.Partition; @@ -159,7 +159,7 @@ public class BspServiceWorker<I extends WritableComparable, * @param serverPortList ZooKeeper server port list * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper * @param context Mapper context - * @param graphMapper Graph mapper + * @param graphTaskManager GraphTaskManager for this compute node * @throws IOException * @throws InterruptedException */ @@ -167,9 +167,9 @@ public class BspServiceWorker<I extends WritableComparable, String serverPortList, int sessionMsecTimeout, Mapper<?, ?, ?, ?>.Context context, - GraphMapper<I, V, E, M> graphMapper) + GraphTaskManager<I, V, E, M> graphTaskManager) throws IOException, InterruptedException { - super(serverPortList, sessionMsecTimeout, context, graphMapper); + super(serverPortList, sessionMsecTimeout, context, graphTaskManager); partitionExchangeChildrenChanged = new PredicateLock(context); registerBspEvent(partitionExchangeChildrenChanged); workerGraphPartitioner = @@ -294,7 +294,7 @@ public class BspServiceWorker<I extends WritableComparable, false, false, true); GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>( - INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(), + INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(), null, null); VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory = @@ -321,7 +321,7 @@ public class BspServiceWorker<I extends WritableComparable, false, false, true); GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>( - INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(), + INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(), null, null); EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory = @@ -461,7 +461,7 @@ public class BspServiceWorker<I extends WritableComparable, // Add the partitions that this worker owns GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0, - getContext(), getGraphMapper(), null, null); + getContext(), getGraphTaskManager(), null, null); Collection<? extends PartitionOwner> masterSetPartitionOwners = startSuperstep(graphState); workerGraphPartitioner.updatePartitionOwners( @@ -695,7 +695,7 @@ else[HADOOP_NON_SECURE]*/ } getContext().setStatus("startSuperstep: " + - getGraphMapper().getGraphFunctions().toString() + + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep()); return addressesAndPartitions.getPartitionOwners(); @@ -719,7 +719,7 @@ else[HADOOP_NON_SECURE]*/ // 6. Wait for the master's global stats, and check if done waitForRequestsToFinish(); - graphState.getGraphMapper().notifyFinishedCommunication(); + graphState.getGraphTaskManager().notifyFinishedCommunication(); long workerSentMessages = 0; for (PartitionStats partitionStats : partitionStatsList) { @@ -747,7 +747,7 @@ else[HADOOP_NON_SECURE]*/ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "finishSuperstep: (waiting for rest " + "of workers) " + - getGraphMapper().getGraphFunctions().toString() + + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep()); @@ -765,7 +765,7 @@ else[HADOOP_NON_SECURE]*/ } incrCachedSuperstep(); getContext().setStatus("finishSuperstep: (all workers done) " + - getGraphMapper().getGraphFunctions().toString() + + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep()); @@ -952,7 +952,7 @@ else[HADOOP_NON_SECURE]*/ public void storeCheckpoint() throws IOException { LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "storeCheckpoint: Starting checkpoint " + - getGraphMapper().getGraphFunctions().toString() + + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep());
http://git-wip-us.apache.org/repos/asf/giraph/blob/303386f7/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index acd4e2d..d09ca2b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -131,7 +131,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable, context, configuration, bspServiceWorker); this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(), graphState.getTotalNumVertices(), graphState.getTotalNumEdges(), - context, graphState.getGraphMapper(), workerClientRequestProcessor, + context, graphState.getGraphTaskManager(), workerClientRequestProcessor, null); this.useLocality = configuration.getBoolean( GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
