Updated Branches: refs/heads/trunk 8a8c9378c -> f0453d05d
GIRAPH-456: Log where master is on every host (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f0453d05 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f0453d05 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f0453d05 Branch: refs/heads/trunk Commit: f0453d05dea972aa772c0056c19fc0870f64839b Parents: 8a8c937 Author: Nitay Joffe <[email protected]> Authored: Tue Dec 18 22:30:29 2012 -0800 Committer: Nitay Joffe <[email protected]> Committed: Wed Dec 19 14:56:44 2012 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/graph/BspServiceMaster.java | 1 + .../org/apache/giraph/graph/BspServiceWorker.java | 4 +- .../java/org/apache/giraph/graph/MasterInfo.java | 7 +-- .../java/org/apache/giraph/graph/TaskInfo.java | 36 ++++++++++- .../java/org/apache/giraph/graph/WorkerInfo.java | 50 +-------------- .../org/apache/giraph/comm/ConnectionTest.java | 13 +++-- .../org/apache/giraph/comm/RequestFailureTest.java | 2 +- .../java/org/apache/giraph/comm/RequestTest.java | 2 +- .../org/apache/giraph/comm/SaslConnectionTest.java | 3 +- 10 files changed, 54 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 99e2c77..256c53a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-456: Log where master is on every host (nitay) + GIRAPH-141: Multigraph support in Giraph (apresta) GIRAPH-452: Fix hcatalog jar (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java index 5638039..ee64a46 100644 --- a/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java +++ b/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java @@ -805,6 +805,7 @@ public class BspServiceMaster<I extends WritableComparable, masterServer = new NettyMasterServer(getConfiguration(), this, getContext()); masterInfo.setInetSocketAddress(masterServer.getMyAddress()); + masterInfo.setTaskId(getTaskPartition()); masterClient = new NettyMasterClient(getContext(), getConfiguration(), this); http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java b/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java index 7bbf0a4..6e97e6c 100644 --- a/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java +++ b/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java @@ -161,10 +161,11 @@ public class BspServiceWorker<I extends WritableComparable, registerBspEvent(partitionExchangeChildrenChanged); workerGraphPartitioner = getGraphPartitionerFactory().createWorkerGraphPartitioner(); - workerInfo = new WorkerInfo(getTaskPartition()); + workerInfo = new WorkerInfo(); workerServer = new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context); workerInfo.setInetSocketAddress(workerServer.getMyAddress()); + workerInfo.setTaskId(getTaskPartition()); workerClient = new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this); @@ -673,6 +674,7 @@ else[HADOOP_NON_SECURE]*/ masterInfo = addressesAndPartitions.getMasterInfo(); if (LOG.isInfoEnabled()) { + LOG.info("startSuperstep: " + masterInfo); LOG.info("startSuperstep: Ready for computation on superstep " + getSuperstep() + " since worker " + "selection and vertex range assignments are done in " + http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java ---------------------------------------------------------------------- diff --git a/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java b/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java index d38e52a..50ad6aa 100644 --- a/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java +++ b/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java @@ -29,12 +29,7 @@ public class MasterInfo extends TaskInfo { } @Override - public int getTaskId() { - return -1; - } - - @Override public String toString() { - return "Master(hostname=" + getHostname() + ", port=" + getPort() + ")"; + return "Master(" + super.toString() + ")"; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java ---------------------------------------------------------------------- diff --git a/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java b/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java index 316e180..536af0f 100644 --- a/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java +++ b/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java @@ -33,6 +33,8 @@ public abstract class TaskInfo implements Writable { private String hostname; /** Port that the IPC server is using */ private int port; + /** Task partition id */ + private int taskId = -1; /** * Constructor @@ -78,11 +80,31 @@ public abstract class TaskInfo implements Writable { } /** + * Set task partition id of this task + * + * @param taskId partition id + */ + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + /** * Get task partition id of this task * * @return Task partition id of this task */ - public abstract int getTaskId(); + public int getTaskId() { + return taskId; + } + + /** + * Get hostname and task id + * + * @return Hostname and task id + */ + public String getHostnameId() { + return getHostname() + "_" + getTaskId(); + } @Override public boolean equals(Object other) { @@ -90,7 +112,8 @@ public abstract class TaskInfo implements Writable { TaskInfo taskInfo = (TaskInfo) other; if (hostname.equals(taskInfo.getHostname()) && (getTaskId() == taskInfo.getTaskId()) && - (port == taskInfo.getPort())) { + (port == taskInfo.getPort() && + (taskId == taskInfo.getTaskId()))) { return true; } } @@ -98,15 +121,24 @@ public abstract class TaskInfo implements Writable { } @Override + public String toString() { + return "hostname=" + getHostname() + + ", MRtaskID=" + getTaskId() + + ", port=" + getPort(); + } + + @Override public void readFields(DataInput input) throws IOException { hostname = input.readUTF(); port = input.readInt(); + taskId = input.readInt(); } @Override public void write(DataOutput output) throws IOException { output.writeUTF(hostname); output.writeInt(port); + output.writeInt(taskId); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java ---------------------------------------------------------------------- diff --git a/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java b/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java index f51fbac..c9571ba 100644 --- a/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java +++ b/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java @@ -18,66 +18,18 @@ package org.apache.giraph.graph; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.net.InetSocketAddress; - /** * Information about a worker that is sent to the master and other workers. */ public class WorkerInfo extends TaskInfo { - /** Task Partition (Worker) ID of this task */ - private int taskId; - /** Hostname + "_" + id for easier debugging */ - private String hostnameId; - /** * Constructor for reflection */ public WorkerInfo() { } - /** - * Constructor with parameters. - * - * @param taskId the task partition for this worker - */ - public WorkerInfo(int taskId) { - this.taskId = taskId; - } - - @Override - public int getTaskId() { - return taskId; - } - - @Override - public void setInetSocketAddress(InetSocketAddress address) { - super.setInetSocketAddress(address); - hostnameId = getHostname() + "_" + getTaskId(); - } - - public String getHostnameId() { - return hostnameId; - } - @Override public String toString() { - return "Worker(hostname=" + getHostname() + ", MRtaskID=" + - getTaskId() + ", port=" + getPort() + ")"; - } - - @Override - public void readFields(DataInput input) throws IOException { - super.readFields(input); - taskId = input.readInt(); - hostnameId = getHostname() + "_" + getTaskId(); - } - - @Override - public void write(DataOutput output) throws IOException { - super.write(output); - output.writeInt(taskId); + return "Worker(" + super.toString() + ")"; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java index 4d47c94..14e590c 100644 --- a/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java +++ b/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java @@ -74,7 +74,7 @@ public class ConnectionTest { ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData = MockUtils.createNewServerData(conf, context); - WorkerInfo workerInfo = new WorkerInfo(-1); + WorkerInfo workerInfo = new WorkerInfo(); NettyServer server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, @@ -106,20 +106,23 @@ public class ConnectionTest { RequestServerHandler.Factory requestServerHandlerFactory = new WorkerRequestServerHandler.Factory(serverData); - WorkerInfo workerInfo1 = new WorkerInfo(1); + WorkerInfo workerInfo1 = new WorkerInfo(); + workerInfo1.setTaskId(1); NettyServer server1 = new NettyServer(conf, requestServerHandlerFactory, workerInfo1, context); server1.start(); workerInfo1.setInetSocketAddress(server1.getMyAddress()); - WorkerInfo workerInfo2 = new WorkerInfo(2); + WorkerInfo workerInfo2 = new WorkerInfo(); + workerInfo1.setTaskId(2); NettyServer server2 = new NettyServer(conf, requestServerHandlerFactory, workerInfo2, context); server2.start(); workerInfo2.setInetSocketAddress(server2.getMyAddress()); - WorkerInfo workerInfo3 = new WorkerInfo(3); + WorkerInfo workerInfo3 = new WorkerInfo(); + workerInfo1.setTaskId(3); NettyServer server3 = new NettyServer(conf, requestServerHandlerFactory, workerInfo3, context); @@ -150,7 +153,7 @@ public class ConnectionTest { ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData = MockUtils.createNewServerData(conf, context); - WorkerInfo workerInfo = new WorkerInfo(-1); + WorkerInfo workerInfo = new WorkerInfo(); NettyServer server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, context); http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java ---------------------------------------------------------------------- diff --git a/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java index 963f00d..4b41f63 100644 --- a/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java +++ b/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java @@ -162,7 +162,7 @@ public class RequestFailureTest { private void checkSendingTwoRequests() throws IOException { // Start the service serverData = MockUtils.createNewServerData(conf, context); - WorkerInfo workerInfo = new WorkerInfo(-1); + WorkerInfo workerInfo = new WorkerInfo(); server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, context); http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java index f3d3e35..d3bf7c3 100644 --- a/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -95,7 +95,7 @@ public class RequestTest { // Start the service serverData = MockUtils.createNewServerData(conf, context); - workerInfo = new WorkerInfo(-1); + workerInfo = new WorkerInfo(); server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, context); http://git-wip-us.apache.org/repos/asf/giraph/blob/f0453d05/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java ---------------------------------------------------------------------- diff --git a/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java index 8b3873a..0f9c64e 100644 --- a/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java +++ b/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java @@ -85,7 +85,8 @@ public class SaslConnectionTest { when(mockedSaslServerFactory.newHandler(conf)). thenReturn(mockedSaslServerHandler); - WorkerInfo workerInfo = new WorkerInfo(-1); + WorkerInfo workerInfo = new WorkerInfo(); + workerInfo.setTaskId(-1); NettyServer server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData),
