Author: ekoontz
Date: Mon Oct  8 19:22:32 2012
New Revision: 1395733

URL: http://svn.apache.org/viewvc?rev=1395733&view=rev
Log:
Keep track of the task id in ChannelRotater to send requests without knowing 
the worker id upfront (aching via ekoontz)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct  8 19:22:32 2012
@@ -2,6 +2,10 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-360: Keep track of the task id in ChannelRotater to send
+  requests without knowing the worker id upfront (aching via
+  ekoontz)
+
   GIRAPH-307: InputSplit list can be long with many workers 
   (and locality info) and should not be re-created every time a 
   worker calls reserveInputSplit() (ereisman via majakabiljo)

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java 
Mon Oct  8 19:22:32 2012
@@ -30,7 +30,22 @@ public class ChannelRotater {
   /** Index of last used channel */
   private int index = 0;
   /** Channel list */
-  private List<Channel> channelList = Lists.newArrayList();
+  private final List<Channel> channelList = Lists.newArrayList();
+  /** Task id of this channel */
+  private final Integer taskId;
+
+  /**
+   * Constructor
+   *
+   * @param taskId Id of the task these channels as associated with
+   */
+  public ChannelRotater(Integer taskId) {
+    this.taskId = taskId;
+  }
+
+  public Integer getTaskId() {
+    return taskId;
+  }
 
   /**
    * Add a channel to the rotation

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java 
Mon Oct  8 19:22:32 2012
@@ -25,7 +25,6 @@ import com.google.common.util.concurrent
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -270,16 +269,21 @@ public class NettyClient {
     private final ChannelFuture future;
     /** Address of the future */
     private final InetSocketAddress address;
+    /** Task id */
+    private final Integer taskId;
 
     /**
      * Constructor.
      *
      * @param future Immutable future
      * @param address Immutable address
+     * @param taskId Immutable taskId
      */
-    ChannelFutureAddress(ChannelFuture future, InetSocketAddress address) {
+    ChannelFutureAddress(
+        ChannelFuture future, InetSocketAddress address, Integer taskId) {
       this.future = future;
       this.address = address;
+      this.taskId = taskId;
     }
   }
 
@@ -288,11 +292,12 @@ public class NettyClient {
    *
    * @param addresses Addresses to connect to (if haven't already connected)
    */
-  public void connectAllAddresses(Set<InetSocketAddress> addresses) {
+  public void connectAllAddresses(Map<InetSocketAddress, Integer> addresses) {
     List<ChannelFutureAddress> waitingConnectionList =
         Lists.newArrayListWithCapacity(addresses.size() * channelsPerServer);
-    for (InetSocketAddress address : addresses) {
+    for (Map.Entry<InetSocketAddress, Integer> entry : addresses.entrySet()) {
       context.progress();
+      InetSocketAddress address = entry.getKey();
       if (address == null || address.getHostName() == null ||
           address.getHostName().isEmpty()) {
         throw new IllegalStateException("connectAllAddresses: Null address " +
@@ -312,7 +317,8 @@ public class NettyClient {
         ChannelFuture connectionFuture = bootstrap.connect(address);
 
         waitingConnectionList.add(
-            new ChannelFutureAddress(connectionFuture, address));
+            new ChannelFutureAddress(
+                connectionFuture, address, entry.getValue()));
       }
     }
 
@@ -333,7 +339,7 @@ public class NettyClient {
           ChannelFuture connectionFuture =
               bootstrap.connect(waitingConnection.address);
           nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
-              waitingConnection.address));
+              waitingConnection.address, waitingConnection.taskId));
           ++failures;
         } else {
           Channel channel = future.getChannel();
@@ -350,7 +356,7 @@ public class NettyClient {
           ChannelRotater rotater =
               addressChannelMap.get(waitingConnection.address);
           if (rotater == null) {
-            rotater = new ChannelRotater();
+            rotater = new ChannelRotater(waitingConnection.taskId);
             addressChannelMap.put(waitingConnection.address, rotater);
           }
           rotater.addChannel(future.getChannel());

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java 
(original)
+++ 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java 
Mon Oct  8 19:22:32 2012
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm.netty;
 
+import com.google.common.collect.Maps;
+import java.util.Map;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.graph.WorkerInfo;
@@ -25,11 +27,9 @@ import org.apache.hadoop.mapreduce.Mappe
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
-import java.util.Set;
 
 /**
  * Netty implementation of {@link MasterClient}
@@ -56,10 +56,10 @@ public class NettyMasterClient implement
   public void fixWorkerAddresses(Iterable<WorkerInfo> workers) {
     this.workers.clear();
     Iterables.addAll(this.workers, workers);
-    Set<InetSocketAddress> addresses =
-        Sets.newHashSetWithExpectedSize(this.workers.size());
+    Map<InetSocketAddress, Integer> addresses =
+        Maps.newHashMapWithExpectedSize(this.workers.size());
     for (WorkerInfo worker : workers) {
-      addresses.add(worker.getInetSocketAddress());
+      addresses.put(worker.getInetSocketAddress(), worker.getTaskId());
     }
     nettyClient.connectAllAddresses(addresses);
   }

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java 
Mon Oct  8 19:22:32 2012
@@ -195,7 +195,7 @@ public class NettyServer {
       }
     });
 
-    int taskId = conf.getInt("mapred.task.partition", -1);
+    int taskId = conf.getTaskPartition();
     int numTasks = conf.getInt("mapred.map.tasks", 1);
     // number of workers + 1 for master
     int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) + 
1;

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java 
(original)
+++ 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java 
Mon Oct  8 19:22:32 2012
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.comm.netty;
 
-import com.google.common.collect.Sets;
-import java.util.Set;
 import java.util.Iterator;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
@@ -137,8 +135,8 @@ public class NettyWorkerClient<I extends
   public void fixPartitionIdToSocketAddrMap() {
     // 1. Fix all the cached inet addresses (remove all changed entries)
     // 2. Connect to any new RPC servers
-    Set<InetSocketAddress> addresses =
-        Sets.newHashSetWithExpectedSize(service.getPartitionOwners().size());
+    Map<InetSocketAddress, Integer> addressTaskIdMap =
+        Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
     for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
       InetSocketAddress address =
           partitionIndexAddressMap.get(
@@ -161,16 +159,19 @@ public class NettyWorkerClient<I extends
       // No need to connect to myself
       if (service.getWorkerInfo().getTaskId() !=
           partitionOwner.getWorkerInfo().getTaskId()) {
-        addresses.add(getInetSocketAddress(partitionOwner.getWorkerInfo(),
-            partitionOwner.getPartitionId()));
+        addressTaskIdMap.put(
+            getInetSocketAddress(partitionOwner.getWorkerInfo(),
+                partitionOwner.getPartitionId()),
+            partitionOwner.getWorkerInfo().getTaskId());
       }
     }
     boolean useNetty = conf.getBoolean(GiraphConfiguration.USE_NETTY,
         GiraphConfiguration.USE_NETTY_DEFAULT);
     if (useNetty) {
-      addresses.add(service.getMasterInfo().getInetSocketAddress());
+      addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
+                           null);
     }
-    nettyClient.connectAllAddresses(addresses);
+    nettyClient.connectAllAddresses(addressTaskIdMap);
   }
 
   /**
@@ -226,7 +227,7 @@ public class NettyWorkerClient<I extends
    * When doing the request, short circuit if it is local
    *
    * @param workerInfo Worker info
-   * @param remoteServerAddress Remote server address (checked against local)
+   * @param remoteServerAddress Remote server address
    * @param writableRequest Request to either submit or run locally
    */
   private void doRequest(WorkerInfo workerInfo,

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Mon 
Oct  8 19:22:32 2012
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.comm;
 
-import com.google.common.collect.Sets;
-import java.util.Set;
+import com.google.common.collect.Maps;
+import java.util.Map;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -85,7 +85,9 @@ public class ConnectionTest {
     server.start();
 
     NettyClient client = new NettyClient(context, conf);
-    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server.getMyAddress(), -1);
+    client.connectAllAddresses(addressIdMap);
 
     client.stop();
     server.stop();
@@ -119,11 +121,11 @@ public class ConnectionTest {
     server3.start();
 
     NettyClient client = new NettyClient(context, conf);
-    Set<InetSocketAddress> serverAddresses = Sets.newHashSet();
-    serverAddresses.add(server1.getMyAddress());
-    serverAddresses.add(server2.getMyAddress());
-    serverAddresses.add(server3.getMyAddress());
-    client.connectAllAddresses(serverAddresses);
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server1.getMyAddress(), -1);
+    addressIdMap.put(server2.getMyAddress(), -1);
+    addressIdMap.put(server3.getMyAddress(), -1);
+    client.connectAllAddresses(addressIdMap);
 
     client.stop();
     server1.stop();
@@ -152,12 +154,14 @@ public class ConnectionTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
 
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server.getMyAddress(), -1);
     NettyClient client1 = new NettyClient(context, conf);
-    client1.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    client1.connectAllAddresses(addressIdMap);
     NettyClient client2 = new NettyClient(context, conf);
-    client2.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    client2.connectAllAddresses(addressIdMap);
     NettyClient client3 = new NettyClient(context, conf);
-    client3.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    client3.connectAllAddresses(addressIdMap);
 
     client1.stop();
     client2.stop();

Modified: 
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java 
Mon Oct  8 19:22:32 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm;
 
+import java.net.InetSocketAddress;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -140,7 +141,9 @@ public class RequestFailureTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server.getMyAddress(), -1);
+    client.connectAllAddresses(addressIdMap);
 
     // Send the request 2x
     WritableRequest request1 = getRequest();
@@ -176,7 +179,9 @@ public class RequestFailureTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server.getMyAddress(), -1);
+    client.connectAllAddresses(addressIdMap);
 
     // Send the request 2x, but should only be processed once
     WritableRequest request1 = getRequest();
@@ -211,7 +216,9 @@ public class RequestFailureTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server.getMyAddress(), -1);
+    client.connectAllAddresses(addressIdMap);
 
     // Send the request 2x, but should only be processed once
     WritableRequest request1 = getRequest();

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1395733&r1=1395732&r2=1395733&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Mon Oct  
8 19:22:32 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm;
 
+import java.net.InetSocketAddress;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -101,7 +102,9 @@ public class RequestTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server.getMyAddress(), -1);
+    client.connectAllAddresses(addressIdMap);
   }
 
   @Test


Reply via email to