Added: giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java?rev=1409973&view=auto ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java (added) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java Thu Nov 15 20:17:38 2012 @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.graph; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.apache.giraph.GiraphConfiguration; +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.DynamicChannelBufferInputStream; +import org.apache.giraph.utils.DynamicChannelBufferOutputStream; +import org.apache.giraph.utils.SystemTime; +import org.apache.giraph.utils.Time; +import org.apache.giraph.utils.Times; +import org.apache.giraph.utils.UnsafeByteArrayInputStream; +import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Test all the mutable vertices + */ +public class TestMutableVertex { + /** Number of repetitions */ + public static final int REPS = 100; + /** Vertex classes to be tested filled in from setup() */ + private Collection< + Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>>> vertexClasses = Lists.newArrayList(); + + /** + * Simple instantiable class that extends + * {@link HashMapVertex}. + */ + public static class IFDLHashMapVertex extends + HashMapVertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable> { + @Override + public void compute(Iterable<LongWritable> messages) throws IOException { } + } + + /** + * Simple instantiable class that extends + * {@link EdgeListVertex}. + */ + public static class IFDLEdgeListVertex extends + EdgeListVertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable> { + @Override + public void compute(Iterable<LongWritable> messages) throws IOException { } + } + + /** + * Simple instantiable class that extends + * {@link RepresentativeVertex}. + */ + public static class IFDLRepresentativeVertex extends + RepresentativeVertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable> { + @Override + public void compute(Iterable<LongWritable> messages) throws IOException { } + } + + @Before + public void setUp() { + vertexClasses.add(IFDLHashMapVertex.class); + vertexClasses.add(IFDLEdgeListVertex.class); + vertexClasses.add(IFDLRepresentativeVertex.class); + } + + @Test + public void testInstantiate() throws IOException { + for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass : vertexClasses) { + testInstantiateVertexClass(vertexClass); + } + } + + /** + * Test a vertex class for instantiation + * + * @param vertexClass Vertex class to check + * @return Instantiated mutable vertex + */ + private MutableVertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable> testInstantiateVertexClass( + Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass) { + GiraphConfiguration giraphConfiguration = new GiraphConfiguration(); + giraphConfiguration.setVertexClass(vertexClass); + ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration = + new ImmutableClassesGiraphConfiguration(giraphConfiguration); + MutableVertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable> vertex = + (MutableVertex<IntWritable, FloatWritable, + DoubleWritable, LongWritable>) + immutableClassesGiraphConfiguration.createVertex(); + assertNotNull(vertex); + return vertex; + } + + @Test + public void testEdges() { + for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass : vertexClasses) { + testEdgesVertexClass(vertexClass); + } + } + + /** + * Test a vertex class for edges + * + * @param vertexClass Vertex class to check + */ + private void testEdgesVertexClass(Class<? extends Vertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable>> vertexClass) { + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> vertex = + testInstantiateVertexClass(vertexClass); + + Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap(); + for (int i = 1000; i > 0; --i) { + edgeMap.put(new IntWritable(i), new DoubleWritable(i * 2.0)); + } + vertex.initialize(null, null, edgeMap); + assertEquals(vertex.getNumEdges(), 1000); + for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) { + assertEquals(edge.getValue().get(), + edge.getTargetVertexId().get() * 2.0d, 0d); + } + assertEquals(vertex.removeEdge(new IntWritable(500)), + new DoubleWritable(1000)); + assertEquals(vertex.getNumEdges(), 999); + } + + @Test + public void testGetEdges() { + for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass : vertexClasses) { + testGetEdgesVertexClass(vertexClass); + } + } + + /** + * Test a vertex class for getting edges + * + * @param vertexClass Vertex class to check + */ + private void testGetEdgesVertexClass(Class<? extends Vertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable>> vertexClass) { + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> vertex = + testInstantiateVertexClass(vertexClass); + + Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap(); + for (int i = 1000; i > 0; --i) { + edgeMap.put(new IntWritable(i), new DoubleWritable(i * 3.0)); + } + vertex.initialize(null, null, edgeMap); + assertEquals(vertex.getNumEdges(), 1000); + assertEquals(vertex.getEdgeValue(new IntWritable(600)), + new DoubleWritable(600 * 3.0)); + assertEquals(vertex.removeEdge(new IntWritable(600)), + new DoubleWritable(600 * 3.0)); + assertEquals(vertex.getNumEdges(), 999); + assertEquals(vertex.getEdgeValue(new IntWritable(500)), + new DoubleWritable(500 * 3.0)); + assertEquals(vertex.getEdgeValue(new IntWritable(700)), + new DoubleWritable(700 * 3.0)); + } + + @Test + public void testAddRemoveEdges() { + for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass : vertexClasses) { + testAddRemoveEdgesVertexClass(vertexClass); + } + } + + /** + * Test a vertex class for adding/removing edges + * + * @param vertexClass Vertex class to check + */ + private void testAddRemoveEdgesVertexClass(Class<? extends + Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass) { + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> vertex = + testInstantiateVertexClass(vertexClass); + + Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap(); + vertex.initialize(null, null, edgeMap); + assertEquals(vertex.getNumEdges(), 0); + assertTrue(vertex.addEdge(new IntWritable(2), + new DoubleWritable(2.0))); + assertEquals(vertex.getNumEdges(), 1); + assertEquals(vertex.getEdgeValue(new IntWritable(2)), + new DoubleWritable(2.0)); + assertTrue(vertex.addEdge(new IntWritable(4), + new DoubleWritable(4.0))); + assertTrue(vertex.addEdge(new IntWritable(3), + new DoubleWritable(3.0))); + assertTrue(vertex.addEdge(new IntWritable(1), + new DoubleWritable(1.0))); + assertEquals(vertex.getNumEdges(), 4); + assertNull(vertex.getEdgeValue(new IntWritable(5))); + assertNull(vertex.getEdgeValue(new IntWritable(0))); + for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) { + assertEquals(edge.getTargetVertexId().get() * 1.0d, + edge.getValue().get(), 0d); + } + assertNotNull(vertex.removeEdge(new IntWritable(1))); + assertEquals(vertex.getNumEdges(), 3); + assertNotNull(vertex.removeEdge(new IntWritable(3))); + assertEquals(vertex.getNumEdges(), 2); + assertNotNull(vertex.removeEdge(new IntWritable(2))); + assertEquals(vertex.getNumEdges(), 1); + assertNotNull(vertex.removeEdge(new IntWritable(4))); + assertEquals(vertex.getNumEdges(), 0); + } + + @Test + public void testSerialized() throws IOException { + for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass : vertexClasses) { + testSerializeVertexClass(vertexClass); + testDynamicChannelBufferSerializeVertexClass(vertexClass); + testUnsafeSerializeVertexClass(vertexClass); + } + } + + /** + * Build a vertex for testing + * + * @param vertexClass Vertex class to use for testing + * @return Vertex that has some initial data + */ + private MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> buildVertex(Class<? extends + Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass) { + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> vertex = + testInstantiateVertexClass(vertexClass); + + final int edgesCount = 200; + Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap(); + for (int i = edgesCount; i > 0; --i) { + edgeMap.put(new IntWritable(i), new DoubleWritable(i * 2.0)); + } + vertex.initialize(new IntWritable(2), new FloatWritable(3.0f), edgeMap); + return vertex; + } + + /** + * Test a vertex class for serializing + * + * @param vertexClass Vertex class to check + */ + private void testSerializeVertexClass(Class<? extends + Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass) { + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> vertex = + buildVertex(vertexClass); + + long serializeNanosStart = 0; + long serializeNanos = 0; + byte[] byteArray = null; + for (int i = 0; i < REPS; ++i) { + serializeNanosStart = SystemTime.getInstance().getNanoseconds(); + byteArray = WritableUtils.writeToByteArray(vertex); + serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + serializeNanosStart); + } + serializeNanos /= REPS; + System.out.println("testSerialize: Serializing took " + + serializeNanos + + " ns for " + byteArray.length + " bytes " + + (byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) + + " bytes / sec for " + vertexClass.getName()); + + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> readVertex = + testInstantiateVertexClass(vertexClass); + + long deserializeNanosStart = 0; + long deserializeNanos = 0; + for (int i = 0; i < REPS; ++i) { + deserializeNanosStart = SystemTime.getInstance().getNanoseconds(); + WritableUtils.readFieldsFromByteArray(byteArray, readVertex); + deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + deserializeNanosStart); + } + deserializeNanos /= REPS; + System.out.println("testSerialize: " + + "Deserializing " + + "took " + + deserializeNanos + + " ns for " + byteArray.length + " bytes " + + (byteArray.length * 1f * Time.NS_PER_SECOND / deserializeNanos) + + " bytes / sec for " + vertexClass.getName()); + + assertEquals(vertex.getId(), readVertex.getId()); + assertEquals(vertex.getValue(), readVertex.getValue()); + assertEquals(Lists.newArrayList(vertex.getEdges()), + Lists.newArrayList(readVertex.getEdges())); + } + + /** + * Test a vertex class for serializing with DynamicChannelBuffers + * + * @param vertexClass Vertex class to check + */ + private void testDynamicChannelBufferSerializeVertexClass(Class<? extends + Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass) throws IOException { + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> vertex = + buildVertex(vertexClass); + + long serializeNanosStart = 0; + long serializeNanos = 0; + DynamicChannelBufferOutputStream outputStream = null; + for (int i = 0; i < + REPS; ++i) { + serializeNanosStart = SystemTime.getInstance().getNanoseconds(); + outputStream = + new DynamicChannelBufferOutputStream(32); + vertex.write(outputStream); + serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + serializeNanosStart); + } + serializeNanos /= REPS; + System.out.println("testDynamicChannelBufferSerializeVertexClass: " + + "Serializing took " + + serializeNanos + + " ns for " + outputStream.getDynamicChannelBuffer().writerIndex() + + " bytes " + + (outputStream.getDynamicChannelBuffer().writerIndex() * 1f * + Time.NS_PER_SECOND / serializeNanos) + + " bytes / sec for " + vertexClass.getName()); + + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> readVertex = + testInstantiateVertexClass(vertexClass); + + long deserializeNanosStart = 0; + long deserializeNanos = 0; + for (int i = 0; i < REPS; ++i) { + deserializeNanosStart = SystemTime.getInstance().getNanoseconds(); + DynamicChannelBufferInputStream inputStream = new + DynamicChannelBufferInputStream( + outputStream.getDynamicChannelBuffer()); + readVertex.readFields(inputStream); + deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + deserializeNanosStart); + outputStream.getDynamicChannelBuffer().readerIndex(0); + } + deserializeNanos /= REPS; + System.out.println("testDynamicChannelBufferSerializeVertexClass: " + + "Deserializing took " + + deserializeNanos + + " ns for " + outputStream.getDynamicChannelBuffer().writerIndex() + + " bytes " + + (outputStream.getDynamicChannelBuffer().writerIndex() * 1f * + Time.NS_PER_SECOND / deserializeNanos) + + " bytes / sec for " + vertexClass.getName()); + + assertEquals(vertex.getId(), readVertex.getId()); + assertEquals(vertex.getValue(), readVertex.getValue()); + assertEquals(Lists.newArrayList(vertex.getEdges()), + Lists.newArrayList(readVertex.getEdges())); + } + + + /** + * Test a vertex class for serializing with UnsafeByteArray(Input/Output) + * Stream + * + * @param vertexClass Vertex class to check + */ + private void testUnsafeSerializeVertexClass(Class<? extends + Vertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable>> vertexClass) throws IOException { + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> vertex = + buildVertex(vertexClass); + + long serializeNanosStart = 0; + long serializeNanos = 0; + UnsafeByteArrayOutputStream outputStream = null; + for (int i = 0; i < + REPS; ++i) { + serializeNanosStart = SystemTime.getInstance().getNanoseconds(); + outputStream = + new UnsafeByteArrayOutputStream(32); + vertex.write(outputStream); + serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + serializeNanosStart); + } + serializeNanos /= REPS; + System.out.println("testUnsafeSerializeVertexClass: " + + "Serializing took " + + serializeNanos + + " ns for " + outputStream.getPos() + + " bytes " + + (outputStream.getPos() * 1f * + Time.NS_PER_SECOND / serializeNanos) + + " bytes / sec for " + vertexClass.getName()); + + MutableVertex<IntWritable, + FloatWritable, DoubleWritable, LongWritable> readVertex = + testInstantiateVertexClass(vertexClass); + + long deserializeNanosStart = 0; + long deserializeNanos = 0; + for (int i = 0; i < REPS; ++i) { + deserializeNanosStart = SystemTime.getInstance().getNanoseconds(); + UnsafeByteArrayInputStream inputStream = new + UnsafeByteArrayInputStream( + outputStream.getByteArray(), 0, outputStream.getPos()); + readVertex.readFields(inputStream); + deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + deserializeNanosStart); + } + deserializeNanos /= REPS; + System.out.println("testUnsafeSerializeVertexClass: " + + "Deserializing took " + + deserializeNanos + + " ns for " + outputStream.getPos() + + " bytes " + + (outputStream.getPos() * 1f * + Time.NS_PER_SECOND / deserializeNanos) + + " bytes / sec for " + vertexClass.getName()); + + assertEquals(vertex.getId(), readVertex.getId()); + assertEquals(vertex.getValue(), readVertex.getValue()); + assertEquals(Lists.newArrayList(vertex.getEdges()), + Lists.newArrayList(readVertex.getEdges())); + } +}
Added: giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java?rev=1409973&view=auto ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java (added) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java Thu Nov 15 20:17:38 2012 @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.graph.partition; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.graph.GiraphTransferRegulator; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test the GiraphTransferRegulator. + */ +public class TestGiraphTransferRegulator { + /** Job filled in by setup() */ + private GiraphJob job; + /** Instantiated vertex filled in from setup() */ + private IFDLEdgeListVertex vertex = new IFDLEdgeListVertex(); + + /** + * Simple instantiable class that extends + * {@link org.apache.giraph.graph.EdgeListVertex}. + */ + public static class IFDLEdgeListVertex extends + EdgeListVertex<IntWritable, FloatWritable, DoubleWritable, LongWritable> { + @Override + public void compute(Iterable<LongWritable> messages) throws IOException { } + } + + @Before + public void setUp() { + try { + job = new GiraphJob("TestGiraphTransferRegulator"); + } catch (IOException e) { + throw new RuntimeException("setUp: Failed", e); + } + job.getConfiguration().setVertexClass(IFDLEdgeListVertex.class); + } + + @Test + public void testGiraphTransferRegulator() { + job.getConfiguration() + .setInt(GiraphTransferRegulator.MAX_VERTICES_PER_TRANSFER, 1); + job.getConfiguration() + .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3); + Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap(); + edgeMap.put(new IntWritable(2), new DoubleWritable(22)); + edgeMap.put(new IntWritable(3), new DoubleWritable(33)); + edgeMap.put(new IntWritable(4), new DoubleWritable(44)); + vertex.initialize(null, null, edgeMap); + GiraphTransferRegulator gtr = + new GiraphTransferRegulator(job.getConfiguration()); + PartitionOwner owner = mock(PartitionOwner.class); + when(owner.getPartitionId()).thenReturn(57); + assertFalse(gtr.transferThisPartition(owner)); + gtr.incrementCounters(owner, vertex); + assertTrue(gtr.transferThisPartition(owner)); + } + +} Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java?rev=1409973&r1=1409972&r2=1409973&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java Thu Nov 15 20:17:38 2012 @@ -22,6 +22,8 @@ import org.apache.giraph.GiraphConfigura import org.apache.giraph.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.IntIntNullIntVertex; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.UnsafeByteArrayInputStream; +import org.apache.giraph.utils.UnsafeByteArrayOutputStream; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; @@ -56,8 +58,7 @@ public class TestPartitionStores { Vertex<IntWritable, IntWritable, NullWritable, IntWritable>... vertices) { Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition = - new Partition<IntWritable, IntWritable, NullWritable, - IntWritable>(conf, id, context); + conf.createPartition(id, context); for (Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v : vertices) { partition.putVertex(v); @@ -82,6 +83,52 @@ public class TestPartitionStores { } @Test + public void testUnsafePartitionSerializationClass() throws IOException { + conf.setPartitionClass(ByteArrayPartition.class); + Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 = + new MyVertex(); + v1.initialize(new IntWritable(1), new IntWritable(1)); + Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 = + new MyVertex(); + v2.initialize(new IntWritable(2), new IntWritable(2)); + Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 = + new MyVertex(); + v3.initialize(new IntWritable(3), new IntWritable(3)); + Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 = + new MyVertex(); + v4.initialize(new IntWritable(4), new IntWritable(4)); + Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 = + new MyVertex(); + v5.initialize(new IntWritable(5), new IntWritable(5)); + Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 = + new MyVertex(); + v6.initialize(new IntWritable(6), new IntWritable(6)); + Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 = + new MyVertex(); + v7.initialize(new IntWritable(7), new IntWritable(7)); + + Partition<IntWritable, IntWritable, NullWritable, + IntWritable> partition = + createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7); + assertEquals(3, partition.getId()); + assertEquals(0, partition.getEdgeCount()); + assertEquals(7, partition.getVertexCount()); + UnsafeByteArrayOutputStream outputStream = new + UnsafeByteArrayOutputStream(); + partition.write(outputStream); + UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream( + outputStream.getByteArray(), 0, outputStream.getPos()); + Partition<IntWritable, IntWritable, NullWritable, + IntWritable> deserializatedPartition = conf.createPartition(-1, + context); + deserializatedPartition.readFields(inputStream); + + assertEquals(3, deserializatedPartition.getId()); + assertEquals(0, deserializatedPartition.getEdgeCount()); + assertEquals(7, deserializatedPartition.getVertexCount()); + } + + @Test public void testDiskBackedPartitionStore() { conf.setBoolean(GiraphConfiguration.USE_OUT_OF_CORE_GRAPH, true); conf.setInt(GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY, 1); @@ -97,6 +144,12 @@ public class TestPartitionStores { testReadWrite(partitionStore, conf); } + /** + * Test reading/writing to/from a partition store + * + * @param partitionStore Partition store to test + * @param conf Configuration to use + */ public void testReadWrite( PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable> partitionStore, @@ -125,10 +178,10 @@ public class TestPartitionStores { partitionStore.addPartition(createPartition(conf, 1, v1, v2)); partitionStore.addPartition(createPartition(conf, 2, v3)); - partitionStore.addPartitionVertices(2, Lists.newArrayList(v4)); + partitionStore.addPartition(createPartition(conf, 2, v4)); partitionStore.addPartition(createPartition(conf, 3, v5)); - partitionStore.addPartitionVertices(1, Lists.newArrayList(v6)); - partitionStore.addPartitionVertices(4, Lists.newArrayList(v7)); + partitionStore.addPartition(createPartition(conf, 1, v6)); + partitionStore.addPartition(createPartition(conf, 4, v7)); Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 = partitionStore.getPartition(1); @@ -146,10 +199,10 @@ public class TestPartitionStores { assertTrue(partitionStore.hasPartition(2)); assertFalse(partitionStore.hasPartition(3)); assertTrue(partitionStore.hasPartition(4)); - assertEquals(3, partition1.getVertices().size()); - assertEquals(2, partition2.getVertices().size()); - assertEquals(1, partition3.getVertices().size()); - assertEquals(1, partition4.getVertices().size()); + assertEquals(3, partition1.getVertexCount()); + assertEquals(2, partition2.getVertexCount()); + assertEquals(1, partition3.getVertexCount()); + assertEquals(1, partition4.getVertexCount()); partitionStore.deletePartition(2);
