Updated Branches: refs/heads/trunk 75b311266 -> c1b88405c
GIRAPH-624: ByteArrayPartition reports 0 aggregate edges when used with DiskBackedPartitionStore (claudio) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1b88405 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1b88405 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1b88405 Branch: refs/heads/trunk Commit: c1b88405c96db8203641c2baae8ff30527bb2b46 Parents: 75b3112 Author: Claudio Martella <[email protected]> Authored: Fri Apr 19 18:33:54 2013 +0200 Committer: Claudio Martella <[email protected]> Committed: Fri Apr 19 18:33:54 2013 +0200 ---------------------------------------------------------------------- CHANGELOG | 3 + .../giraph/partition/DiskBackedPartitionStore.java | 143 ++++++++++----- .../giraph/partition/TestPartitionStores.java | 37 +++- 3 files changed, 128 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d7679a2..5a980cf 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.0.1 - unreleased + GIRAPH-624: ByteArrayPartition reports 0 aggregate edges when used with + DiskBackedPartitionStore (claudio) + GIRAPH-636: Initialize compute OutEdges directly from input OutEdges (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java index 2d30bf9..a4739f1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java @@ -143,6 +143,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable, for (String path : userPaths) { basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job"); } + if (LOG.isInfoEnabled()) { + LOG.info("DiskBackedPartitionStore with maxInMemoryPartitions=" + + maxInMemoryPartitions + ", isStaticGraph=" + conf.isStaticGraph()); + } } @Override @@ -358,15 +362,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** - * Writes vertex data (Id, Vertex Value and halted state) to stream. + * Writes vertex data (Id, value and halted state) to stream. * * @param output The output stream * @param vertex The vertex to serialize * @throws IOException */ - private void writeVertexData( - DataOutput output, - Vertex<I, V, E, M> vertex) + private void writeVertexData(DataOutput output, Vertex<I, V, E, M> vertex) throws IOException { vertex.getId().write(output); vertex.getValue().write(output); @@ -374,16 +376,14 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** - * Writes vertex edges (Id, Edges) to stream. + * Writes vertex edges (Id, edges) to stream. * * @param output The output stream * @param vertex The vertex to serialize * @throws IOException */ @SuppressWarnings("unchecked") - private void writeOutEdges( - DataOutput output, - Vertex<I, V, E, M> vertex) + private void writeOutEdges(DataOutput output, Vertex<I, V, E, M> vertex) throws IOException { vertex.getId().write(output); ((OutEdges<I, E>) vertex.getEdges()).write(output); @@ -402,7 +402,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable, id.readFields(in); V value = conf.createVertexValue(); value.readFields(in); - vertex.initialize(id, value); + OutEdges<I, E> edges = conf.createOutEdges(); + vertex.initialize(id, value, edges); if (in.readBoolean()) { vertex.voteToHalt(); } else { @@ -417,16 +418,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable, * @param partition The partition owning the vertex * @throws IOException */ + @SuppressWarnings("unchecked") private void readOutEdges(DataInput in, Partition<I, V, E, M> partition) throws IOException { I id = conf.createVertexId(); id.readFields(in); Vertex<I, V, E, M> v = partition.getVertex(id); - OutEdges<I, E> edges = conf.createOutEdges(); - edges.readFields(in); - v.setEdges(edges); + ((OutEdges<I, E>) v.getEdges()).readFields(in); } + /** * Load a partition from disk. It deletes the files after the load, * except for the edges, if the graph is static. @@ -441,22 +442,42 @@ public class DiskBackedPartitionStore<I extends WritableComparable, Partition<I, V, E, M> partition = conf.createPartition(id, context); File file = new File(getVerticesPath(id)); - DataInputStream inputStream = new DataInputStream( - new BufferedInputStream(new FileInputStream(file))); - for (int i = 0; i < numVertices; ++i) { - Vertex<I, V , E, M> vertex = conf.createVertex(); - readVertexData(inputStream, vertex); - partition.putVertex(vertex); - } - inputStream.close(); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading partition vertices " + + partition.getId() + " from " + file.getAbsolutePath()); + } + DataInputStream inputStream = null; + try { + inputStream = new DataInputStream( + new BufferedInputStream(new FileInputStream(file))); + for (int i = 0; i < numVertices; ++i) { + Vertex<I, V , E, M> vertex = conf.createVertex(); + readVertexData(inputStream, vertex); + partition.putVertex(vertex); + } + } finally { + if (inputStream != null) { + inputStream.close(); + inputStream = null; + } + } file.delete(); file = new File(getEdgesPath(id)); - inputStream = new DataInputStream( - new BufferedInputStream(new FileInputStream(file))); - for (int i = 0; i < numVertices; ++i) { - readOutEdges(inputStream, partition); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading partition edges " + + partition.getId() + " from " + file.getAbsolutePath()); + } + try { + inputStream = new DataInputStream( + new BufferedInputStream(new FileInputStream(file))); + for (int i = 0; i < numVertices; ++i) { + readOutEdges(inputStream, partition); + } + } finally { + if (inputStream != null) { + inputStream.close(); + } } - inputStream.close(); /* * If the graph is static, keep the file around. */ @@ -477,16 +498,23 @@ public class DiskBackedPartitionStore<I extends WritableComparable, File file = new File(getVerticesPath(partition.getId())); file.getParentFile().mkdirs(); file.createNewFile(); - if (LOG.isInfoEnabled()) { - LOG.info("offloadPartition: writing partition vertices " + + if (LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: writing partition vertices " + partition.getId() + " to " + file.getAbsolutePath()); } - DataOutputStream outputStream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file))); - for (Vertex<I, V, E, M> vertex : partition) { - writeVertexData(outputStream, vertex); + DataOutputStream outputStream = null; + try { + outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file))); + for (Vertex<I, V, E, M> vertex : partition) { + writeVertexData(outputStream, vertex); + } + } finally { + if (outputStream != null) { + outputStream.close(); + outputStream = null; + } } - outputStream.close(); file = new File(getEdgesPath(partition.getId())); /* * Avoid writing back edges if we have already written them once and @@ -494,16 +522,21 @@ public class DiskBackedPartitionStore<I extends WritableComparable, */ if (!conf.isStaticGraph() || !file.exists()) { file.createNewFile(); - if (LOG.isInfoEnabled()) { - LOG.info("offloadPartition: writing partition edges " + + if (LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: writing partition edges " + partition.getId() + " to " + file.getAbsolutePath()); } - outputStream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file))); - for (Vertex<I, V, E, M> vertex : partition) { - writeOutEdges(outputStream, vertex); + try { + outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file))); + for (Vertex<I, V, E, M> vertex : partition) { + writeOutEdges(outputStream, vertex); + } + } finally { + if (outputStream != null) { + outputStream.close(); + } } - outputStream.close(); } } @@ -520,19 +553,31 @@ public class DiskBackedPartitionStore<I extends WritableComparable, Integer count = onDisk.get(id); onDisk.put(id, count + (int) partition.getVertexCount()); File file = new File(getVerticesPath(id)); - DataOutputStream outputStream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file, true))); - for (Vertex<I, V, E, M> vertex : partition) { - writeVertexData(outputStream, vertex); + DataOutputStream outputStream = null; + try { + outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file, true))); + for (Vertex<I, V, E, M> vertex : partition) { + writeVertexData(outputStream, vertex); + } + } finally { + if (outputStream != null) { + outputStream.close(); + outputStream = null; + } } - outputStream.close(); file = new File(getEdgesPath(id)); - outputStream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file, true))); - for (Vertex<I, V, E, M> vertex : partition) { - writeOutEdges(outputStream, vertex); + try { + outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file, true))); + for (Vertex<I, V, E, M> vertex : partition) { + writeOutEdges(outputStream, vertex); + } + } finally { + if (outputStream != null) { + outputStream.close(); + } } - outputStream.close(); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index 9bb8f71..5a93d41 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.utils.UnsafeByteArrayInputStream; import org.apache.giraph.utils.UnsafeByteArrayOutputStream; @@ -135,6 +136,23 @@ public class TestPartitionStores { assertEquals(0, deserializatedPartition.getEdgeCount()); assertEquals(7, deserializatedPartition.getVertexCount()); } + + @Test + public void testDiskBackedPartitionStoreWithByteArrayPartition() throws IOException { + File directory = Files.createTempDir(); + GiraphConstants.PARTITIONS_DIRECTORY.set( + conf, new File(directory, "giraph_partitions").toString()); + GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1); + conf.setPartitionClass(ByteArrayPartition.class); + + PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable> + partitionStore = new DiskBackedPartitionStore<IntWritable, + IntWritable, NullWritable, IntWritable>(conf, context); + testReadWrite(partitionStore, conf); + partitionStore.shutdown(); + FileUtils.deleteDirectory(directory); + } @Test public void testDiskBackedPartitionStore() throws IOException { @@ -190,6 +208,8 @@ public class TestPartitionStores { Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 = conf.createVertex(); v7.initialize(new IntWritable(7), new IntWritable(7)); + v7.addEdge(EdgeFactory.create(new IntWritable(1))); + v7.addEdge(EdgeFactory.create(new IntWritable(2))); partitionStore.addPartition(createPartition(conf, 1, v1, v2)); partitionStore.addPartition(createPartition(conf, 2, v3)); @@ -219,18 +239,23 @@ public class TestPartitionStores { partitionStore.putPartition(p); partitionsNumber++; } + Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition; assertEquals(3, partitionsNumber); assertTrue(partitionStore.hasPartition(1)); assertTrue(partitionStore.hasPartition(2)); assertFalse(partitionStore.hasPartition(3)); assertTrue(partitionStore.hasPartition(4)); - assertEquals(3, partition1.getVertexCount()); - assertEquals(2, partition2.getVertexCount()); - assertEquals(1, partition3.getVertexCount()); - assertEquals(1, partition4.getVertexCount()); - + partition = partitionStore.getPartition(1); + assertEquals(3, partition.getVertexCount()); + partitionStore.putPartition(partition); + partition = partitionStore.getPartition(2); + assertEquals(2, partition.getVertexCount()); + partitionStore.putPartition(partition); + partition = partitionStore.getPartition(4); + assertEquals(1, partition.getVertexCount()); + assertEquals(2, partition.getEdgeCount()); + partitionStore.putPartition(partition); partitionStore.deletePartition(2); - assertEquals(2, partitionStore.getNumPartitions()); } }
