Updated Branches: refs/heads/trunk a3a6e9a8f -> 1cd1f347f
GIRAPH-816: ByteArrayPartition not combining vertex edges (cmuchins via majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1cd1f347 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1cd1f347 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1cd1f347 Branch: refs/heads/trunk Commit: 1cd1f347f0e3d126f6568f0a246c0dce1de20122 Parents: a3a6e9a Author: Maja Kabiljo <[email protected]> Authored: Tue Feb 11 10:37:13 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Tue Feb 11 10:37:13 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 1 + .../giraph/partition/ByteArrayPartition.java | 34 ++++++++------ .../giraph/partition/TestPartitionStores.java | 48 ++++++++++++++++++++ 3 files changed, 69 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/1cd1f347/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index feb9193..3e1acc2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,7 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-816: ByteArrayPartition not combining vertex edges (cmuchins via majakabiljo) GIRAPH-844: TextInputFormat for SimpleShortestPaths (ssc) http://git-wip-us.apache.org/repos/asf/giraph/blob/1cd1f347/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java index cef39cd..048fca2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java @@ -151,17 +151,7 @@ public class ByteArrayPartition<I extends WritableComparable, representativeVertex, useUnsafeSerialization, getConf()); WritableUtils.reinitializeVertexFromByteArray(entry.getValue(), representativeCombinerVertex, useUnsafeSerialization, getConf()); - getVertexValueCombiner().combine(representativeVertex.getValue(), - representativeCombinerVertex.getValue()); - - // Add the edges to the representative vertex - for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) { - representativeVertex.addEdge(edge); - } - - byte[] vertexData = WritableUtils.writeVertexToByteArray( - representativeCombinerVertex, useUnsafeSerialization, getConf()); - vertexMap.put(entry.getKey(), vertexData); + combine(representativeVertex, representativeCombinerVertex); } } } @@ -179,12 +169,28 @@ public class ByteArrayPartition<I extends WritableComparable, WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes, representativeVertex, useUnsafeSerialization, getConf()); + combine(representativeVertex, vertex); + return false; + } + + /** + * Combine two vertices together and store the serialized bytes + * in the vertex map. + * + * @param representativeVertex existing vertex + * @param representativeCombinerVertex new vertex to combine + */ + private void combine(Vertex<I, V, E> representativeVertex, + Vertex<I, V, E> representativeCombinerVertex) { getVertexValueCombiner().combine(representativeVertex.getValue(), - vertex.getValue()); - vertexMap.put(vertex.getId(), + representativeCombinerVertex.getValue()); + // Add the edges to the representative vertex + for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) { + representativeVertex.addEdge(edge); + } + vertexMap.put(representativeCombinerVertex.getId(), WritableUtils.writeVertexToByteArray( representativeVertex, useUnsafeSerialization, getConf())); - return false; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/1cd1f347/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 0dd9b9c..08f4544 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -238,4 +238,52 @@ public class TestPartitionStores { partitionStore.deletePartition(2); assertEquals(2, partitionStore.getNumPartitions()); } + + @Test + public void testEdgeCombineWithSimplePartition() throws IOException { + testEdgeCombine(SimplePartition.class); + } + + @Test + public void testEdgeCombineWithByteArrayPartition() throws IOException { + testEdgeCombine(ByteArrayPartition.class); + } + + private void testEdgeCombine(Class<? extends Partition> partitionClass) + throws IOException { + Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex(); + v1.initialize(new IntWritable(1), new IntWritable(1)); + Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex(); + v2.initialize(new IntWritable(2), new IntWritable(2)); + Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex(); + v3.initialize(new IntWritable(3), new IntWritable(3)); + Vertex<IntWritable, IntWritable, NullWritable> v1e2 = conf.createVertex(); + v1e2.initialize(new IntWritable(1), new IntWritable(1)); + v1e2.addEdge(EdgeFactory.create(new IntWritable(2))); + Vertex<IntWritable, IntWritable, NullWritable> v1e3 = conf.createVertex(); + v1e3.initialize(new IntWritable(1), new IntWritable(1)); + v1e3.addEdge(EdgeFactory.create(new IntWritable(3))); + + GiraphConfiguration newconf = new GiraphConfiguration(conf); + newconf.setPartitionClass(partitionClass); + Partition<IntWritable, IntWritable, NullWritable> partition = + (new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, + NullWritable>(newconf)).createPartition(1, context); + assertEquals(partitionClass, partition.getClass()); + partition.putVertex(v1); + partition.putVertex(v2); + partition.putVertex(v3); + assertEquals(3, partition.getVertexCount()); + assertEquals(0, partition.getEdgeCount()); + partition.putOrCombine(v1e2); + assertEquals(3, partition.getVertexCount()); + assertEquals(1, partition.getEdgeCount()); + partition.putOrCombine(v1e3); + assertEquals(3, partition.getVertexCount()); + assertEquals(2, partition.getEdgeCount()); + v1 = partition.getVertex(new IntWritable(1)); + assertEquals(new IntWritable(1), v1.getId()); + assertEquals(new IntWritable(1), v1.getValue()); + assertEquals(2, v1.getNumEdges()); + } }
