Updated Branches:
  refs/heads/trunk a3a6e9a8f -> 1cd1f347f

GIRAPH-816: ByteArrayPartition not combining vertex edges (cmuchins via 
majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1cd1f347
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1cd1f347
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1cd1f347

Branch: refs/heads/trunk
Commit: 1cd1f347f0e3d126f6568f0a246c0dce1de20122
Parents: a3a6e9a
Author: Maja Kabiljo <[email protected]>
Authored: Tue Feb 11 10:37:13 2014 -0800
Committer: Maja Kabiljo <[email protected]>
Committed: Tue Feb 11 10:37:13 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  1 +
 .../giraph/partition/ByteArrayPartition.java    | 34 ++++++++------
 .../giraph/partition/TestPartitionStores.java   | 48 ++++++++++++++++++++
 3 files changed, 69 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/1cd1f347/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index feb9193..3e1acc2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,7 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-816: ByteArrayPartition not combining vertex edges (cmuchins via 
majakabiljo)
 
   GIRAPH-844: TextInputFormat for SimpleShortestPaths (ssc)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1cd1f347/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java 
b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index cef39cd..048fca2 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -151,17 +151,7 @@ public class ByteArrayPartition<I extends 
WritableComparable,
             representativeVertex, useUnsafeSerialization, getConf());
         WritableUtils.reinitializeVertexFromByteArray(entry.getValue(),
             representativeCombinerVertex, useUnsafeSerialization, getConf());
-        getVertexValueCombiner().combine(representativeVertex.getValue(),
-            representativeCombinerVertex.getValue());
-
-        // Add the edges to the representative vertex
-        for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
-          representativeVertex.addEdge(edge);
-        }
-
-        byte[] vertexData = WritableUtils.writeVertexToByteArray(
-            representativeCombinerVertex, useUnsafeSerialization, getConf());
-        vertexMap.put(entry.getKey(), vertexData);
+        combine(representativeVertex, representativeCombinerVertex);
       }
     }
   }
@@ -179,12 +169,28 @@ public class ByteArrayPartition<I extends 
WritableComparable,
 
     WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
         representativeVertex, useUnsafeSerialization, getConf());
+    combine(representativeVertex, vertex);
+    return false;
+  }
+
+  /**
+   * Combine two vertices together and store the serialized bytes
+   * in the vertex map.
+   *
+   * @param representativeVertex existing vertex
+   * @param representativeCombinerVertex new vertex to combine
+   */
+  private void combine(Vertex<I, V, E> representativeVertex,
+      Vertex<I, V, E> representativeCombinerVertex) {
     getVertexValueCombiner().combine(representativeVertex.getValue(),
-        vertex.getValue());
-    vertexMap.put(vertex.getId(),
+        representativeCombinerVertex.getValue());
+    // Add the edges to the representative vertex
+    for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
+      representativeVertex.addEdge(edge);
+    }
+    vertexMap.put(representativeCombinerVertex.getId(),
         WritableUtils.writeVertexToByteArray(
             representativeVertex, useUnsafeSerialization, getConf()));
-    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/1cd1f347/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 0dd9b9c..08f4544 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -238,4 +238,52 @@ public class TestPartitionStores {
     partitionStore.deletePartition(2);
     assertEquals(2, partitionStore.getNumPartitions());
   }
+  
+  @Test
+  public void testEdgeCombineWithSimplePartition() throws IOException {
+    testEdgeCombine(SimplePartition.class);
+  }
+  
+  @Test
+  public void testEdgeCombineWithByteArrayPartition() throws IOException {
+    testEdgeCombine(ByteArrayPartition.class);
+  }
+  
+  private void testEdgeCombine(Class<? extends Partition> partitionClass)
+      throws IOException {
+    Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
+    v1.initialize(new IntWritable(1), new IntWritable(1));
+    Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
+    v2.initialize(new IntWritable(2), new IntWritable(2));
+    Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
+    v3.initialize(new IntWritable(3), new IntWritable(3));
+    Vertex<IntWritable, IntWritable, NullWritable> v1e2 = conf.createVertex();
+    v1e2.initialize(new IntWritable(1), new IntWritable(1));
+    v1e2.addEdge(EdgeFactory.create(new IntWritable(2)));
+    Vertex<IntWritable, IntWritable, NullWritable> v1e3 = conf.createVertex();
+    v1e3.initialize(new IntWritable(1), new IntWritable(1));
+    v1e3.addEdge(EdgeFactory.create(new IntWritable(3)));
+
+    GiraphConfiguration newconf = new GiraphConfiguration(conf);
+    newconf.setPartitionClass(partitionClass);
+    Partition<IntWritable, IntWritable, NullWritable> partition =
+        (new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
+            NullWritable>(newconf)).createPartition(1, context);
+    assertEquals(partitionClass, partition.getClass());
+    partition.putVertex(v1);
+    partition.putVertex(v2);
+    partition.putVertex(v3);
+    assertEquals(3, partition.getVertexCount());
+    assertEquals(0, partition.getEdgeCount());
+    partition.putOrCombine(v1e2);
+    assertEquals(3, partition.getVertexCount());
+    assertEquals(1, partition.getEdgeCount());
+    partition.putOrCombine(v1e3);
+    assertEquals(3, partition.getVertexCount());
+    assertEquals(2, partition.getEdgeCount());
+    v1 = partition.getVertex(new IntWritable(1));
+    assertEquals(new IntWritable(1), v1.getId());
+    assertEquals(new IntWritable(1), v1.getValue());
+    assertEquals(2, v1.getNumEdges());
+  }
 }

Reply via email to