Added: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java?rev=1409973&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
 (added)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
 Thu Nov 15 20:17:38 2012
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.DynamicChannelBufferInputStream;
+import org.apache.giraph.utils.DynamicChannelBufferOutputStream;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test all the mutable vertices
+ */
+public class TestMutableVertex {
+  /** Number of repetitions */
+  public static final int REPS = 100;
+  /** Vertex classes to be tested filled in from setup() */
+  private Collection<
+      Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable>>> vertexClasses = Lists.newArrayList();
+
+  /**
+   * Simple instantiable class that extends
+   * {@link HashMapVertex}.
+   */
+  public static class IFDLHashMapVertex extends
+      HashMapVertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  /**
+   * Simple instantiable class that extends
+   * {@link EdgeListVertex}.
+   */
+  public static class IFDLEdgeListVertex extends
+      EdgeListVertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  /**
+   * Simple instantiable class that extends
+   * {@link RepresentativeVertex}.
+   */
+  public static class IFDLRepresentativeVertex extends
+      RepresentativeVertex<IntWritable, FloatWritable, DoubleWritable,
+                LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  @Before
+  public void setUp() {
+    vertexClasses.add(IFDLHashMapVertex.class);
+    vertexClasses.add(IFDLEdgeListVertex.class);
+    vertexClasses.add(IFDLRepresentativeVertex.class);
+  }
+
+  @Test
+  public void testInstantiate() throws IOException {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testInstantiateVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for instantiation
+   *
+   * @param vertexClass Vertex class to check
+   * @return Instantiated mutable vertex
+   */
+  private MutableVertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable> testInstantiateVertexClass(
+      Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable>> vertexClass) {
+    GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+    giraphConfiguration.setVertexClass(vertexClass);
+    ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
+    MutableVertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable> vertex =
+        (MutableVertex<IntWritable, FloatWritable,
+            DoubleWritable, LongWritable>)
+            immutableClassesGiraphConfiguration.createVertex();
+    assertNotNull(vertex);
+    return vertex;
+  }
+
+  @Test
+  public void testEdges() {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testEdgesVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for edges
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testEdgesVertexClass(Class<? extends Vertex<IntWritable,
+      FloatWritable, DoubleWritable, LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+    for (int i = 1000; i > 0; --i) {
+      edgeMap.put(new IntWritable(i), new DoubleWritable(i * 2.0));
+    }
+    vertex.initialize(null, null, edgeMap);
+    assertEquals(vertex.getNumEdges(), 1000);
+    for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) {
+      assertEquals(edge.getValue().get(),
+          edge.getTargetVertexId().get() * 2.0d, 0d);
+    }
+    assertEquals(vertex.removeEdge(new IntWritable(500)),
+        new DoubleWritable(1000));
+    assertEquals(vertex.getNumEdges(), 999);
+  }
+
+  @Test
+  public void testGetEdges() {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testGetEdgesVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for getting edges
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testGetEdgesVertexClass(Class<? extends Vertex<IntWritable,
+      FloatWritable, DoubleWritable, LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+    for (int i = 1000; i > 0; --i) {
+      edgeMap.put(new IntWritable(i), new DoubleWritable(i * 3.0));
+    }
+    vertex.initialize(null, null, edgeMap);
+    assertEquals(vertex.getNumEdges(), 1000);
+    assertEquals(vertex.getEdgeValue(new IntWritable(600)),
+        new DoubleWritable(600 * 3.0));
+    assertEquals(vertex.removeEdge(new IntWritable(600)),
+        new DoubleWritable(600 * 3.0));
+    assertEquals(vertex.getNumEdges(), 999);
+    assertEquals(vertex.getEdgeValue(new IntWritable(500)),
+        new DoubleWritable(500 * 3.0));
+    assertEquals(vertex.getEdgeValue(new IntWritable(700)),
+        new DoubleWritable(700 * 3.0));
+  }
+
+  @Test
+  public void testAddRemoveEdges() {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testAddRemoveEdgesVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for adding/removing edges
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testAddRemoveEdgesVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+    vertex.initialize(null, null, edgeMap);
+    assertEquals(vertex.getNumEdges(), 0);
+    assertTrue(vertex.addEdge(new IntWritable(2),
+        new DoubleWritable(2.0)));
+    assertEquals(vertex.getNumEdges(), 1);
+    assertEquals(vertex.getEdgeValue(new IntWritable(2)),
+        new DoubleWritable(2.0));
+    assertTrue(vertex.addEdge(new IntWritable(4),
+        new DoubleWritable(4.0)));
+    assertTrue(vertex.addEdge(new IntWritable(3),
+        new DoubleWritable(3.0)));
+    assertTrue(vertex.addEdge(new IntWritable(1),
+        new DoubleWritable(1.0)));
+    assertEquals(vertex.getNumEdges(), 4);
+    assertNull(vertex.getEdgeValue(new IntWritable(5)));
+    assertNull(vertex.getEdgeValue(new IntWritable(0)));
+    for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) {
+      assertEquals(edge.getTargetVertexId().get() * 1.0d,
+          edge.getValue().get(), 0d);
+    }
+    assertNotNull(vertex.removeEdge(new IntWritable(1)));
+    assertEquals(vertex.getNumEdges(), 3);
+    assertNotNull(vertex.removeEdge(new IntWritable(3)));
+    assertEquals(vertex.getNumEdges(), 2);
+    assertNotNull(vertex.removeEdge(new IntWritable(2)));
+    assertEquals(vertex.getNumEdges(), 1);
+    assertNotNull(vertex.removeEdge(new IntWritable(4)));
+    assertEquals(vertex.getNumEdges(), 0);
+  }
+
+  @Test
+  public void testSerialized() throws IOException {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testSerializeVertexClass(vertexClass);
+      testDynamicChannelBufferSerializeVertexClass(vertexClass);
+      testUnsafeSerializeVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Build a vertex for testing
+   *
+   * @param vertexClass Vertex class to use for testing
+   * @return Vertex that has some initial data
+   */
+  private MutableVertex<IntWritable,
+      FloatWritable, DoubleWritable, LongWritable> buildVertex(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    final int edgesCount = 200;
+    Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+    for (int i = edgesCount; i > 0; --i) {
+      edgeMap.put(new IntWritable(i), new DoubleWritable(i * 2.0));
+    }
+    vertex.initialize(new IntWritable(2), new FloatWritable(3.0f), edgeMap);
+    return vertex;
+  }
+
+  /**
+   * Test a vertex class for serializing
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testSerializeVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        buildVertex(vertexClass);
+
+    long serializeNanosStart = 0;
+    long serializeNanos = 0;
+    byte[] byteArray = null;
+    for (int i = 0; i < REPS; ++i) {
+      serializeNanosStart = SystemTime.getInstance().getNanoseconds();
+      byteArray = WritableUtils.writeToByteArray(vertex);
+      serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(),
+          serializeNanosStart);
+    }
+    serializeNanos /= REPS;
+    System.out.println("testSerialize: Serializing took " +
+        serializeNanos +
+        " ns for " + byteArray.length + " bytes " +
+        (byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> readVertex =
+        testInstantiateVertexClass(vertexClass);
+
+    long deserializeNanosStart = 0;
+    long deserializeNanos = 0;
+    for (int i = 0; i < REPS; ++i) {
+      deserializeNanosStart = SystemTime.getInstance().getNanoseconds();
+      WritableUtils.readFieldsFromByteArray(byteArray, readVertex);
+      deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(),
+          deserializeNanosStart);
+    }
+    deserializeNanos /= REPS;
+    System.out.println("testSerialize: " +
+        "Deserializing " +
+        "took " +
+        deserializeNanos +
+        " ns for " + byteArray.length + " bytes " +
+        (byteArray.length * 1f * Time.NS_PER_SECOND / deserializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    assertEquals(vertex.getId(), readVertex.getId());
+    assertEquals(vertex.getValue(), readVertex.getValue());
+    assertEquals(Lists.newArrayList(vertex.getEdges()),
+        Lists.newArrayList(readVertex.getEdges()));
+  }
+
+  /**
+   * Test a vertex class for serializing with DynamicChannelBuffers
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testDynamicChannelBufferSerializeVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) throws IOException {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        buildVertex(vertexClass);
+
+    long serializeNanosStart = 0;
+    long serializeNanos = 0;
+    DynamicChannelBufferOutputStream outputStream = null;
+    for (int i = 0; i <
+        REPS; ++i) {
+      serializeNanosStart = SystemTime.getInstance().getNanoseconds();
+      outputStream =
+          new DynamicChannelBufferOutputStream(32);
+      vertex.write(outputStream);
+      serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(),
+          serializeNanosStart);
+    }
+    serializeNanos /= REPS;
+    System.out.println("testDynamicChannelBufferSerializeVertexClass: " +
+        "Serializing took " +
+        serializeNanos +
+        " ns for " + outputStream.getDynamicChannelBuffer().writerIndex()
+        + " bytes " +
+        (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
+            Time.NS_PER_SECOND / serializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> readVertex =
+        testInstantiateVertexClass(vertexClass);
+
+    long deserializeNanosStart = 0;
+    long deserializeNanos = 0;
+    for (int i = 0; i < REPS; ++i) {
+      deserializeNanosStart = SystemTime.getInstance().getNanoseconds();
+      DynamicChannelBufferInputStream inputStream = new
+          DynamicChannelBufferInputStream(
+          outputStream.getDynamicChannelBuffer());
+      readVertex.readFields(inputStream);
+      deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(),
+          deserializeNanosStart);
+      outputStream.getDynamicChannelBuffer().readerIndex(0);
+    }
+    deserializeNanos /= REPS;
+    System.out.println("testDynamicChannelBufferSerializeVertexClass: " +
+        "Deserializing took " +
+        deserializeNanos +
+        " ns for " + outputStream.getDynamicChannelBuffer().writerIndex() +
+        " bytes " +
+        (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
+            Time.NS_PER_SECOND / deserializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    assertEquals(vertex.getId(), readVertex.getId());
+    assertEquals(vertex.getValue(), readVertex.getValue());
+    assertEquals(Lists.newArrayList(vertex.getEdges()),
+        Lists.newArrayList(readVertex.getEdges()));
+  }
+
+
+  /**
+   * Test a vertex class for serializing with UnsafeByteArray(Input/Output)
+   * Stream
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testUnsafeSerializeVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) throws IOException {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        buildVertex(vertexClass);
+
+    long serializeNanosStart = 0;
+    long serializeNanos = 0;
+    UnsafeByteArrayOutputStream outputStream = null;
+    for (int i = 0; i <
+        REPS; ++i) {
+      serializeNanosStart = SystemTime.getInstance().getNanoseconds();
+      outputStream =
+          new UnsafeByteArrayOutputStream(32);
+      vertex.write(outputStream);
+      serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(),
+          serializeNanosStart);
+    }
+    serializeNanos /= REPS;
+    System.out.println("testUnsafeSerializeVertexClass: " +
+        "Serializing took " +
+        serializeNanos +
+        " ns for " + outputStream.getPos()
+        + " bytes " +
+        (outputStream.getPos() * 1f *
+            Time.NS_PER_SECOND / serializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> readVertex =
+        testInstantiateVertexClass(vertexClass);
+
+    long deserializeNanosStart = 0;
+    long deserializeNanos = 0;
+    for (int i = 0; i < REPS; ++i) {
+      deserializeNanosStart = SystemTime.getInstance().getNanoseconds();
+      UnsafeByteArrayInputStream inputStream = new
+          UnsafeByteArrayInputStream(
+          outputStream.getByteArray(), 0, outputStream.getPos());
+      readVertex.readFields(inputStream);
+      deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(),
+          deserializeNanosStart);
+    }
+    deserializeNanos /= REPS;
+    System.out.println("testUnsafeSerializeVertexClass: " +
+        "Deserializing took " +
+        deserializeNanos +
+        " ns for " + outputStream.getPos() +
+        " bytes " +
+        (outputStream.getPos() * 1f *
+            Time.NS_PER_SECOND / deserializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    assertEquals(vertex.getId(), readVertex.getId());
+    assertEquals(vertex.getValue(), readVertex.getValue());
+    assertEquals(Lists.newArrayList(vertex.getEdges()),
+        Lists.newArrayList(readVertex.getEdges()));
+  }
+}

Added: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java?rev=1409973&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
 (added)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
 Thu Nov 15 20:17:38 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph.partition;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.GiraphTransferRegulator;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test the GiraphTransferRegulator.
+ */
+public class TestGiraphTransferRegulator {
+  /** Job filled in by setup() */
+  private GiraphJob job;
+  /** Instantiated vertex filled in from setup() */
+  private IFDLEdgeListVertex vertex = new IFDLEdgeListVertex();
+
+  /**
+   * Simple instantiable class that extends
+   * {@link org.apache.giraph.graph.EdgeListVertex}.
+   */
+  public static class IFDLEdgeListVertex extends
+      EdgeListVertex<IntWritable, FloatWritable, DoubleWritable, LongWritable> 
{
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  @Before
+  public void setUp() {
+    try {
+      job = new GiraphJob("TestGiraphTransferRegulator");
+    } catch (IOException e) {
+      throw new RuntimeException("setUp: Failed", e);
+    }
+    job.getConfiguration().setVertexClass(IFDLEdgeListVertex.class);
+  }
+
+  @Test
+  public void testGiraphTransferRegulator() {
+    job.getConfiguration()
+        .setInt(GiraphTransferRegulator.MAX_VERTICES_PER_TRANSFER, 1);
+    job.getConfiguration()
+        .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3);
+    Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+    edgeMap.put(new IntWritable(2), new DoubleWritable(22));
+    edgeMap.put(new IntWritable(3), new DoubleWritable(33));
+    edgeMap.put(new IntWritable(4), new DoubleWritable(44));
+    vertex.initialize(null, null, edgeMap);
+    GiraphTransferRegulator gtr =
+        new GiraphTransferRegulator(job.getConfiguration());
+    PartitionOwner owner = mock(PartitionOwner.class);
+    when(owner.getPartitionId()).thenReturn(57);
+    assertFalse(gtr.transferThisPartition(owner));
+    gtr.incrementCounters(owner, vertex);
+    assertTrue(gtr.transferThisPartition(owner));
+  }
+
+}

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
 (original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
 Thu Nov 15 20:17:38 2012
@@ -22,6 +22,8 @@ import org.apache.giraph.GiraphConfigura
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.IntIntNullIntVertex;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -56,8 +58,7 @@ public class TestPartitionStores {
                                    Vertex<IntWritable, IntWritable,
                                        NullWritable, IntWritable>... vertices) 
{
     Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition =
-        new Partition<IntWritable, IntWritable, NullWritable,
-            IntWritable>(conf, id, context);
+        conf.createPartition(id, context);
     for (Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v :
         vertices) {
       partition.putVertex(v);
@@ -82,6 +83,52 @@ public class TestPartitionStores {
   }
 
   @Test
+  public void testUnsafePartitionSerializationClass() throws IOException {
+    conf.setPartitionClass(ByteArrayPartition.class);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 =
+        new MyVertex();
+    v1.initialize(new IntWritable(1), new IntWritable(1));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 =
+        new MyVertex();
+    v2.initialize(new IntWritable(2), new IntWritable(2));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 =
+        new MyVertex();
+    v3.initialize(new IntWritable(3), new IntWritable(3));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 =
+        new MyVertex();
+    v4.initialize(new IntWritable(4), new IntWritable(4));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 =
+        new MyVertex();
+    v5.initialize(new IntWritable(5), new IntWritable(5));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 =
+        new MyVertex();
+    v6.initialize(new IntWritable(6), new IntWritable(6));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
+        new MyVertex();
+    v7.initialize(new IntWritable(7), new IntWritable(7));
+
+    Partition<IntWritable, IntWritable, NullWritable,
+        IntWritable> partition =
+        createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7);
+    assertEquals(3, partition.getId());
+    assertEquals(0, partition.getEdgeCount());
+    assertEquals(7, partition.getVertexCount());
+    UnsafeByteArrayOutputStream outputStream = new
+        UnsafeByteArrayOutputStream();
+    partition.write(outputStream);
+    UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream(
+        outputStream.getByteArray(), 0, outputStream.getPos());
+    Partition<IntWritable, IntWritable, NullWritable,
+        IntWritable> deserializatedPartition = conf.createPartition(-1,
+        context);
+    deserializatedPartition.readFields(inputStream);
+
+    assertEquals(3, deserializatedPartition.getId());
+    assertEquals(0, deserializatedPartition.getEdgeCount());
+    assertEquals(7, deserializatedPartition.getVertexCount());
+  }
+
+  @Test
   public void testDiskBackedPartitionStore() {
     conf.setBoolean(GiraphConfiguration.USE_OUT_OF_CORE_GRAPH, true);
     conf.setInt(GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY, 1);
@@ -97,6 +144,12 @@ public class TestPartitionStores {
     testReadWrite(partitionStore, conf);
   }
 
+  /**
+   * Test reading/writing to/from a partition store
+   *
+   * @param partitionStore Partition store to test
+   * @param conf Configuration to use
+   */
   public void testReadWrite(
       PartitionStore<IntWritable, IntWritable,
           NullWritable, IntWritable> partitionStore,
@@ -125,10 +178,10 @@ public class TestPartitionStores {
 
     partitionStore.addPartition(createPartition(conf, 1, v1, v2));
     partitionStore.addPartition(createPartition(conf, 2, v3));
-    partitionStore.addPartitionVertices(2, Lists.newArrayList(v4));
+    partitionStore.addPartition(createPartition(conf, 2, v4));
     partitionStore.addPartition(createPartition(conf, 3, v5));
-    partitionStore.addPartitionVertices(1, Lists.newArrayList(v6));
-    partitionStore.addPartitionVertices(4, Lists.newArrayList(v7));
+    partitionStore.addPartition(createPartition(conf, 1, v6));
+    partitionStore.addPartition(createPartition(conf, 4, v7));
 
     Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 =
         partitionStore.getPartition(1);
@@ -146,10 +199,10 @@ public class TestPartitionStores {
     assertTrue(partitionStore.hasPartition(2));
     assertFalse(partitionStore.hasPartition(3));
     assertTrue(partitionStore.hasPartition(4));
-    assertEquals(3, partition1.getVertices().size());
-    assertEquals(2, partition2.getVertices().size());
-    assertEquals(1, partition3.getVertices().size());
-    assertEquals(1, partition4.getVertices().size());
+    assertEquals(3, partition1.getVertexCount());
+    assertEquals(2, partition2.getVertexCount());
+    assertEquals(1, partition3.getVertexCount());
+    assertEquals(1, partition4.getVertexCount());
 
     partitionStore.deletePartition(2);
 


Reply via email to