Author: aching
Date: Mon Jul 16 22:42:24 2012
New Revision: 1362298

URL: http://svn.apache.org/viewvc?rev=1362298&view=rev
Log:
GIRAPH-224: Netty server-side combiner (apresta via aching).


Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
    
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Jul 16 22:42:24 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-224: Netty server-side combiner (apresta via aching).
+
   GIRAPH-251: Allow to access the distributed cache from Vertexes and
   WorkerContext (Gianmarco De Francisci Morales via aching).
 

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java 
Mon Jul 16 22:42:24 2012
@@ -18,11 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
@@ -38,6 +33,11 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
 /**
  * Netty worker server that implement {@link WorkerServer} and contains
  * the actual {@link ServerData}.
@@ -61,8 +61,7 @@ public class NettyWorkerServer<I extends
   /** Netty server that does that actual I/O */
   private final NettyServer<I, V, E, M> nettyServer;
   /** Server data storage */
-  private final ServerData<I, V, E, M> serverData =
-      new ServerData<I, V, E, M>();
+  private final ServerData<I, V, E, M> serverData;
 
   /**
    * Constructor to start the server.
@@ -74,6 +73,7 @@ public class NettyWorkerServer<I extends
       CentralizedServiceWorker<I, V, E, M> service) {
     this.conf = conf;
     this.service = service;
+    serverData = new ServerData<I, V, E, M>(conf);
     nettyServer = new NettyServer<I, V, E, M>(conf, serverData);
     nettyServer.start();
   }

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- 
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
 (original)
+++ 
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
 Mon Jul 16 22:42:24 2012
@@ -18,15 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.giraph.comm.RequestRegistry.Type;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +28,15 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * Send a collection of vertex messages for a partition.
  *
@@ -132,6 +132,17 @@ public class SendPartitionMessagesReques
       }
       synchronized (messages) {
         messages.addAll(entry.getValue());
+        if (serverData.getCombiner() != null) {
+          try {
+            messages = Lists.newArrayList(
+                serverData.getCombiner().combine(entry.getKey(), messages));
+          } catch (IOException e) {
+            throw new IllegalStateException(
+                "doRequest: Combiner failed to combine messages " + messages,
+                e);
+          }
+          transientMessages.put(entry.getKey(), messages);
+        }
       }
     }
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Mon Jul 
16 22:42:24 2012
@@ -18,14 +18,17 @@
 
 package org.apache.giraph.comm;
 
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * Anything that the server stores
  *
@@ -37,6 +40,8 @@ import org.apache.hadoop.io.WritableComp
 @SuppressWarnings("rawtypes")
 public class ServerData<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
+  /** Combiner instance, can be null */
+  private VertexCombiner<I, M> combiner;
   /**
    * Map of partition ids to incoming vertices from other workers.
    * (Synchronized on values)
@@ -61,6 +66,18 @@ public class ServerData<I extends Writab
   vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
 
   /**
+   * Constructor.
+   * @param conf Configuration (used to instantiate the combiner).
+   */
+  public ServerData(Configuration conf) {
+    if (BspUtils.getVertexCombinerClass(conf) == null) {
+      combiner = null;
+    } else {
+      combiner = BspUtils.createVertexCombiner(conf);
+    }
+  }
+
+  /**
    * Get the partition vertices (synchronize on the values)
    *
    * @return Partition vertices
@@ -88,4 +105,12 @@ public class ServerData<I extends Writab
   getVertexMutations() {
     return vertexMutations;
   }
+
+  /**
+   * Get the combiner instance.
+   * @return The combiner.
+   */
+  public VertexCombiner<I, M> getCombiner() {
+    return combiner;
+  }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Mon 
Jul 16 22:42:24 2012
@@ -18,16 +18,18 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Test;
+
 import static org.mockito.Mockito.mock;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.junit.Test;
 
 /**
  * Test the netty connections
@@ -45,7 +47,7 @@ public class ConnectionTest {
 
     Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+        new ServerData<IntWritable, IntWritable, IntWritable, 
IntWritable>(conf);
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
             conf, serverData);
@@ -72,7 +74,7 @@ public class ConnectionTest {
 
     Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+        new ServerData<IntWritable, IntWritable, IntWritable, 
IntWritable>(conf);
 
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -112,7 +114,8 @@ public class ConnectionTest {
 
     Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+        new ServerData<IntWritable, IntWritable, IntWritable,
+            IntWritable>(conf);
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
             conf, serverData);

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Mon Jul 
16 22:42:24 2012
@@ -18,18 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
@@ -43,9 +31,22 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * Test all the different netty requests.
  */
@@ -91,7 +92,7 @@ public class RequestTest {
 
     // Start the service
     serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+        new ServerData<IntWritable, IntWritable, IntWritable, 
IntWritable>(conf);
     server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
             conf, serverData);


Reply via email to