Author: aching
Date: Mon Jul 16 22:42:24 2012
New Revision: 1362298
URL: http://svn.apache.org/viewvc?rev=1362298&view=rev
Log:
GIRAPH-224: Netty server-side combiner (apresta via aching).
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Jul 16 22:42:24 2012
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-224: Netty server-side combiner (apresta via aching).
+
GIRAPH-251: Allow to access the distributed cache from Vertexes and
WorkerContext (Gianmarco De Francisci Morales via aching).
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
Mon Jul 16 22:42:24 2012
@@ -18,11 +18,6 @@
package org.apache.giraph.comm;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
@@ -38,6 +33,11 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
/**
* Netty worker server that implement {@link WorkerServer} and contains
* the actual {@link ServerData}.
@@ -61,8 +61,7 @@ public class NettyWorkerServer<I extends
/** Netty server that does that actual I/O */
private final NettyServer<I, V, E, M> nettyServer;
/** Server data storage */
- private final ServerData<I, V, E, M> serverData =
- new ServerData<I, V, E, M>();
+ private final ServerData<I, V, E, M> serverData;
/**
* Constructor to start the server.
@@ -74,6 +73,7 @@ public class NettyWorkerServer<I extends
CentralizedServiceWorker<I, V, E, M> service) {
this.conf = conf;
this.service = service;
+ serverData = new ServerData<I, V, E, M>(conf);
nettyServer = new NettyServer<I, V, E, M>(conf, serverData);
nettyServer.start();
}
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
---
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
(original)
+++
giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
Mon Jul 16 22:42:24 2012
@@ -18,15 +18,6 @@
package org.apache.giraph.comm;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.giraph.comm.RequestRegistry.Type;
import org.apache.giraph.graph.BspUtils;
import org.apache.hadoop.conf.Configuration;
@@ -37,6 +28,15 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Send a collection of vertex messages for a partition.
*
@@ -132,6 +132,17 @@ public class SendPartitionMessagesReques
}
synchronized (messages) {
messages.addAll(entry.getValue());
+ if (serverData.getCombiner() != null) {
+ try {
+ messages = Lists.newArrayList(
+ serverData.getCombiner().combine(entry.getKey(), messages));
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "doRequest: Combiner failed to combine messages " + messages,
+ e);
+ }
+ transientMessages.put(entry.getKey(), messages);
+ }
}
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Mon Jul
16 22:42:24 2012
@@ -18,14 +18,17 @@
package org.apache.giraph.comm;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Anything that the server stores
*
@@ -37,6 +40,8 @@ import org.apache.hadoop.io.WritableComp
@SuppressWarnings("rawtypes")
public class ServerData<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
+ /** Combiner instance, can be null */
+ private VertexCombiner<I, M> combiner;
/**
* Map of partition ids to incoming vertices from other workers.
* (Synchronized on values)
@@ -61,6 +66,18 @@ public class ServerData<I extends Writab
vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
/**
+ * Constructor.
+ * @param conf Configuration (used to instantiate the combiner).
+ */
+ public ServerData(Configuration conf) {
+ if (BspUtils.getVertexCombinerClass(conf) == null) {
+ combiner = null;
+ } else {
+ combiner = BspUtils.createVertexCombiner(conf);
+ }
+ }
+
+ /**
* Get the partition vertices (synchronize on the values)
*
* @return Partition vertices
@@ -88,4 +105,12 @@ public class ServerData<I extends Writab
getVertexMutations() {
return vertexMutations;
}
+
+ /**
+ * Get the combiner instance.
+ * @return The combiner.
+ */
+ public VertexCombiner<I, M> getCombiner() {
+ return combiner;
+ }
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Mon
Jul 16 22:42:24 2012
@@ -18,16 +18,18 @@
package org.apache.giraph.comm;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Test;
+
import static org.mockito.Mockito.mock;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.junit.Test;
/**
* Test the netty connections
@@ -45,7 +47,7 @@ public class ConnectionTest {
Configuration conf = new Configuration();
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+ new ServerData<IntWritable, IntWritable, IntWritable,
IntWritable>(conf);
NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
conf, serverData);
@@ -72,7 +74,7 @@ public class ConnectionTest {
Configuration conf = new Configuration();
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+ new ServerData<IntWritable, IntWritable, IntWritable,
IntWritable>(conf);
NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -112,7 +114,8 @@ public class ConnectionTest {
Configuration conf = new Configuration();
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+ new ServerData<IntWritable, IntWritable, IntWritable,
+ IntWritable>(conf);
NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
conf, serverData);
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1362298&r1=1362297&r2=1362298&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Mon Jul
16 22:42:24 2012
@@ -18,18 +18,6 @@
package org.apache.giraph.comm;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
@@ -43,9 +31,22 @@ import org.apache.hadoop.mapreduce.Mappe
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Test all the different netty requests.
*/
@@ -91,7 +92,7 @@ public class RequestTest {
// Start the service
serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+ new ServerData<IntWritable, IntWritable, IntWritable,
IntWritable>(conf);
server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
conf, serverData);