Author: ekoontz
Date: Mon Oct 8 19:22:32 2012
New Revision: 1395733
URL: http://svn.apache.org/viewvc?rev=1395733&view=rev
Log:
Keep track of the task id in ChannelRotater to send requests without knowing
the worker id upfront (aching via ekoontz)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 8 19:22:32 2012
@@ -2,6 +2,10 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-360: Keep track of the task id in ChannelRotater to send
+ requests without knowing the worker id upfront (aching via
+ ekoontz)
+
GIRAPH-307: InputSplit list can be long with many workers
(and locality info) and should not be re-created every time a
worker calls reserveInputSplit() (ereisman via majakabiljo)
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
Mon Oct 8 19:22:32 2012
@@ -30,7 +30,22 @@ public class ChannelRotater {
/** Index of last used channel */
private int index = 0;
/** Channel list */
- private List<Channel> channelList = Lists.newArrayList();
+ private final List<Channel> channelList = Lists.newArrayList();
+ /** Task id of this channel */
+ private final Integer taskId;
+
+ /**
+ * Constructor
+ *
+ * @param taskId Id of the task these channels as associated with
+ */
+ public ChannelRotater(Integer taskId) {
+ this.taskId = taskId;
+ }
+
+ public Integer getTaskId() {
+ return taskId;
+ }
/**
* Add a channel to the rotation
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
Mon Oct 8 19:22:32 2012
@@ -25,7 +25,6 @@ import com.google.common.util.concurrent
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -270,16 +269,21 @@ public class NettyClient {
private final ChannelFuture future;
/** Address of the future */
private final InetSocketAddress address;
+ /** Task id */
+ private final Integer taskId;
/**
* Constructor.
*
* @param future Immutable future
* @param address Immutable address
+ * @param taskId Immutable taskId
*/
- ChannelFutureAddress(ChannelFuture future, InetSocketAddress address) {
+ ChannelFutureAddress(
+ ChannelFuture future, InetSocketAddress address, Integer taskId) {
this.future = future;
this.address = address;
+ this.taskId = taskId;
}
}
@@ -288,11 +292,12 @@ public class NettyClient {
*
* @param addresses Addresses to connect to (if haven't already connected)
*/
- public void connectAllAddresses(Set<InetSocketAddress> addresses) {
+ public void connectAllAddresses(Map<InetSocketAddress, Integer> addresses) {
List<ChannelFutureAddress> waitingConnectionList =
Lists.newArrayListWithCapacity(addresses.size() * channelsPerServer);
- for (InetSocketAddress address : addresses) {
+ for (Map.Entry<InetSocketAddress, Integer> entry : addresses.entrySet()) {
context.progress();
+ InetSocketAddress address = entry.getKey();
if (address == null || address.getHostName() == null ||
address.getHostName().isEmpty()) {
throw new IllegalStateException("connectAllAddresses: Null address " +
@@ -312,7 +317,8 @@ public class NettyClient {
ChannelFuture connectionFuture = bootstrap.connect(address);
waitingConnectionList.add(
- new ChannelFutureAddress(connectionFuture, address));
+ new ChannelFutureAddress(
+ connectionFuture, address, entry.getValue()));
}
}
@@ -333,7 +339,7 @@ public class NettyClient {
ChannelFuture connectionFuture =
bootstrap.connect(waitingConnection.address);
nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
- waitingConnection.address));
+ waitingConnection.address, waitingConnection.taskId));
++failures;
} else {
Channel channel = future.getChannel();
@@ -350,7 +356,7 @@ public class NettyClient {
ChannelRotater rotater =
addressChannelMap.get(waitingConnection.address);
if (rotater == null) {
- rotater = new ChannelRotater();
+ rotater = new ChannelRotater(waitingConnection.taskId);
addressChannelMap.put(waitingConnection.address, rotater);
}
rotater.addChannel(future.getChannel());
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
---
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
(original)
+++
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
Mon Oct 8 19:22:32 2012
@@ -18,6 +18,8 @@
package org.apache.giraph.comm.netty;
+import com.google.common.collect.Maps;
+import java.util.Map;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.graph.WorkerInfo;
@@ -25,11 +27,9 @@ import org.apache.hadoop.mapreduce.Mappe
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.net.InetSocketAddress;
import java.util.Collection;
-import java.util.Set;
/**
* Netty implementation of {@link MasterClient}
@@ -56,10 +56,10 @@ public class NettyMasterClient implement
public void fixWorkerAddresses(Iterable<WorkerInfo> workers) {
this.workers.clear();
Iterables.addAll(this.workers, workers);
- Set<InetSocketAddress> addresses =
- Sets.newHashSetWithExpectedSize(this.workers.size());
+ Map<InetSocketAddress, Integer> addresses =
+ Maps.newHashMapWithExpectedSize(this.workers.size());
for (WorkerInfo worker : workers) {
- addresses.add(worker.getInetSocketAddress());
+ addresses.put(worker.getInetSocketAddress(), worker.getTaskId());
}
nettyClient.connectAllAddresses(addresses);
}
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
Mon Oct 8 19:22:32 2012
@@ -195,7 +195,7 @@ public class NettyServer {
}
});
- int taskId = conf.getInt("mapred.task.partition", -1);
+ int taskId = conf.getTaskPartition();
int numTasks = conf.getInt("mapred.map.tasks", 1);
// number of workers + 1 for master
int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) +
1;
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
---
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
(original)
+++
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
Mon Oct 8 19:22:32 2012
@@ -18,8 +18,6 @@
package org.apache.giraph.comm.netty;
-import com.google.common.collect.Sets;
-import java.util.Set;
import java.util.Iterator;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
@@ -137,8 +135,8 @@ public class NettyWorkerClient<I extends
public void fixPartitionIdToSocketAddrMap() {
// 1. Fix all the cached inet addresses (remove all changed entries)
// 2. Connect to any new RPC servers
- Set<InetSocketAddress> addresses =
- Sets.newHashSetWithExpectedSize(service.getPartitionOwners().size());
+ Map<InetSocketAddress, Integer> addressTaskIdMap =
+ Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
InetSocketAddress address =
partitionIndexAddressMap.get(
@@ -161,16 +159,19 @@ public class NettyWorkerClient<I extends
// No need to connect to myself
if (service.getWorkerInfo().getTaskId() !=
partitionOwner.getWorkerInfo().getTaskId()) {
- addresses.add(getInetSocketAddress(partitionOwner.getWorkerInfo(),
- partitionOwner.getPartitionId()));
+ addressTaskIdMap.put(
+ getInetSocketAddress(partitionOwner.getWorkerInfo(),
+ partitionOwner.getPartitionId()),
+ partitionOwner.getWorkerInfo().getTaskId());
}
}
boolean useNetty = conf.getBoolean(GiraphConfiguration.USE_NETTY,
GiraphConfiguration.USE_NETTY_DEFAULT);
if (useNetty) {
- addresses.add(service.getMasterInfo().getInetSocketAddress());
+ addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
+ null);
}
- nettyClient.connectAllAddresses(addresses);
+ nettyClient.connectAllAddresses(addressTaskIdMap);
}
/**
@@ -226,7 +227,7 @@ public class NettyWorkerClient<I extends
* When doing the request, short circuit if it is local
*
* @param workerInfo Worker info
- * @param remoteServerAddress Remote server address (checked against local)
+ * @param remoteServerAddress Remote server address
* @param writableRequest Request to either submit or run locally
*/
private void doRequest(WorkerInfo workerInfo,
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Mon
Oct 8 19:22:32 2012
@@ -18,8 +18,8 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Sets;
-import java.util.Set;
+import com.google.common.collect.Maps;
+import java.util.Map;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -85,7 +85,9 @@ public class ConnectionTest {
server.start();
NettyClient client = new NettyClient(context, conf);
- client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server.getMyAddress(), -1);
+ client.connectAllAddresses(addressIdMap);
client.stop();
server.stop();
@@ -119,11 +121,11 @@ public class ConnectionTest {
server3.start();
NettyClient client = new NettyClient(context, conf);
- Set<InetSocketAddress> serverAddresses = Sets.newHashSet();
- serverAddresses.add(server1.getMyAddress());
- serverAddresses.add(server2.getMyAddress());
- serverAddresses.add(server3.getMyAddress());
- client.connectAllAddresses(serverAddresses);
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server1.getMyAddress(), -1);
+ addressIdMap.put(server2.getMyAddress(), -1);
+ addressIdMap.put(server3.getMyAddress(), -1);
+ client.connectAllAddresses(addressIdMap);
client.stop();
server1.stop();
@@ -152,12 +154,14 @@ public class ConnectionTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server.getMyAddress(), -1);
NettyClient client1 = new NettyClient(context, conf);
- client1.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ client1.connectAllAddresses(addressIdMap);
NettyClient client2 = new NettyClient(context, conf);
- client2.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ client2.connectAllAddresses(addressIdMap);
NettyClient client3 = new NettyClient(context, conf);
- client3.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ client3.connectAllAddresses(addressIdMap);
client1.stop();
client2.stop();
Modified:
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
Mon Oct 8 19:22:32 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm;
+import java.net.InetSocketAddress;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -140,7 +141,9 @@ public class RequestFailureTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server.getMyAddress(), -1);
+ client.connectAllAddresses(addressIdMap);
// Send the request 2x
WritableRequest request1 = getRequest();
@@ -176,7 +179,9 @@ public class RequestFailureTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server.getMyAddress(), -1);
+ client.connectAllAddresses(addressIdMap);
// Send the request 2x, but should only be processed once
WritableRequest request1 = getRequest();
@@ -211,7 +216,9 @@ public class RequestFailureTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server.getMyAddress(), -1);
+ client.connectAllAddresses(addressIdMap);
// Send the request 2x, but should only be processed once
WritableRequest request1 = getRequest();
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Mon Oct
8 19:22:32 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm;
+import java.net.InetSocketAddress;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -101,7 +102,9 @@ public class RequestTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server.getMyAddress(), -1);
+ client.connectAllAddresses(addressIdMap);
}
@Test