Updated Branches: refs/heads/trunk ff8d98e20 -> 39f359136
GIRAPH-616: Decouple vertices and edges in DiskBackedPartitionStore and avoid writing back edges when the algorithm does not change topology. Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/228edbbd Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/228edbbd Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/228edbbd Branch: refs/heads/trunk Commit: 228edbbd798f7718a5a7ccbcfd35c22e812be761 Parents: ff8d98e Author: Claudio Martella <[email protected]> Authored: Thu Apr 11 23:47:00 2013 +0200 Committer: Claudio Martella <[email protected]> Committed: Thu Apr 11 23:47:00 2013 +0200 ---------------------------------------------------------------------- .../apache/giraph/conf/GiraphConfiguration.java | 9 + .../org/apache/giraph/conf/GiraphConstants.java | 7 + .../giraph/partition/DiskBackedPartitionStore.java | 167 +++++++++++++-- 3 files changed, 166 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 01f22da..0aeec40 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -885,4 +885,13 @@ public class GiraphConfiguration extends Configuration public int getMaxNumberOfSupersteps() { return MAX_NUMBER_OF_SUPERSTEPS.get(this); } + + /** + * Whether the application with change or not the graph topology. + * + * @return true if the graph is static, false otherwise. + */ + public boolean isStaticGraph() { + return STATIC_GRAPH.isTrue(this); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 21e094d..a55a2a2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -650,5 +650,12 @@ public interface GiraphConstants { */ IntConfOption MAX_NUMBER_OF_SUPERSTEPS = new IntConfOption("giraph.maxNumberOfSupersteps", 1); + + /** + * The application will not mutate the graph topology (the edges). It is used + * to optimise out-of-core graph, by not writing back edges every time. + */ + BooleanConfOption STATIC_GRAPH = + new BooleanConfOption("giraph.isStaticGraph", false); } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java index 11e0a90..53b9dd4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java @@ -19,8 +19,8 @@ package org.apache.giraph.partition; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.VertexEdges; import org.apache.giraph.graph.Vertex; -import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -35,7 +35,9 @@ import com.google.common.hash.Hashing; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; @@ -291,7 +293,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } } finally { for (Integer id : onDisk.values()) { - deletePartitionFile(id); + deletePartitionFiles(id); } } } @@ -356,7 +358,78 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** - * Load a partition from disk. It deletes the file after the load. + * Writes vertex data (Id, Vertex Value and halted state) to stream. + * + * @param output The output stream + * @param vertex The vertex to serialize + * @throws IOException + */ + private void writeVertexData( + DataOutput output, + Vertex<I, V, E, M> vertex) + throws IOException { + vertex.getId().write(output); + vertex.getValue().write(output); + output.writeBoolean(vertex.isHalted()); + } + + /** + * Writes vertex edges (Id, Edges) to stream. + * + * @param output The output stream + * @param vertex The vertex to serialize + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void writeVertexEdges( + DataOutput output, + Vertex<I, V, E, M> vertex) + throws IOException { + vertex.getId().write(output); + ((VertexEdges<I, E>) vertex.getEdges()).write(output); + } + + /** + * Read vertex data from an input and initialize the vertex. + * + * @param in The input stream + * @param vertex The vertex to initialize + * @throws IOException + */ + private void readVertexData(DataInput in, Vertex<I, V, E, M> vertex) + throws IOException { + I id = conf.createVertexId(); + id.readFields(in); + V value = conf.createVertexValue(); + value.readFields(in); + vertex.initialize(id, value); + if (in.readBoolean()) { + vertex.voteToHalt(); + } else { + vertex.wakeUp(); + } + } + + /** + * Read vertex edges from an input and set them to the vertex. + * + * @param in The input stream + * @param partition The partition owning the vertex + * @throws IOException + */ + private void readVertexEdges(DataInput in, Partition<I, V, E, M> partition) + throws IOException { + I id = conf.createVertexId(); + id.readFields(in); + Vertex<I, V, E, M> v = partition.getVertex(id); + VertexEdges<I, E> edges = conf.createVertexEdges(); + edges.readFields(in); + v.setEdges(edges); + } + + /** + * Load a partition from disk. It deletes the files after the load, + * except for the edges, if the graph is static. * * @param id The id of the partition to load * @param numVertices The number of vertices contained on disk @@ -367,16 +440,29 @@ public class DiskBackedPartitionStore<I extends WritableComparable, throws IOException { Partition<I, V, E, M> partition = conf.createPartition(id, context); - File file = new File(getPartitionPath(id)); + File file = new File(getVerticesPath(id)); DataInputStream inputStream = new DataInputStream( new BufferedInputStream(new FileInputStream(file))); for (int i = 0; i < numVertices; ++i) { - Vertex<I, V , E, M> vertex = - WritableUtils.readVertexFromDataInput(inputStream, conf); + Vertex<I, V , E, M> vertex = conf.createVertex(); + readVertexData(inputStream, vertex); partition.putVertex(vertex); } inputStream.close(); file.delete(); + file = new File(getEdgesPath(id)); + inputStream = new DataInputStream( + new BufferedInputStream(new FileInputStream(file))); + for (int i = 0; i < numVertices; ++i) { + readVertexEdges(inputStream, partition); + } + inputStream.close(); + /* + * If the graph is static, keep the file around. + */ + if (!conf.isStaticGraph()) { + file.delete(); + } return partition; } @@ -388,19 +474,37 @@ public class DiskBackedPartitionStore<I extends WritableComparable, */ private void offloadPartition(Partition<I, V, E, M> partition) throws IOException { - File file = new File(getPartitionPath(partition.getId())); + File file = new File(getVerticesPath(partition.getId())); file.getParentFile().mkdirs(); file.createNewFile(); if (LOG.isInfoEnabled()) { - LOG.info("offloadPartition: writing partition " + partition.getId() + - " to " + file.getAbsolutePath()); + LOG.info("offloadPartition: writing partition vertices " + + partition.getId() + " to " + file.getAbsolutePath()); } DataOutputStream outputStream = new DataOutputStream( new BufferedOutputStream(new FileOutputStream(file))); for (Vertex<I, V, E, M> vertex : partition) { - WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf); + writeVertexData(outputStream, vertex); } outputStream.close(); + file = new File(getEdgesPath(partition.getId())); + /* + * Avoid writing back edges if we have already written them once and + * the graph is not changing. + */ + if (!conf.isStaticGraph() || !file.exists()) { + file.createNewFile(); + if (LOG.isInfoEnabled()) { + LOG.info("offloadPartition: writing partition edges " + + partition.getId() + " to " + file.getAbsolutePath()); + } + outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file))); + for (Vertex<I, V, E, M> vertex : partition) { + writeVertexEdges(outputStream, vertex); + } + outputStream.close(); + } } /** @@ -415,27 +519,36 @@ public class DiskBackedPartitionStore<I extends WritableComparable, Integer id = partition.getId(); Integer count = onDisk.get(id); onDisk.put(id, count + (int) partition.getVertexCount()); - File file = new File(getPartitionPath(id)); + File file = new File(getVerticesPath(id)); DataOutputStream outputStream = new DataOutputStream( new BufferedOutputStream(new FileOutputStream(file, true))); for (Vertex<I, V, E, M> vertex : partition) { - WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf); + writeVertexData(outputStream, vertex); + } + outputStream.close(); + file = new File(getEdgesPath(id)); + outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file, true))); + for (Vertex<I, V, E, M> vertex : partition) { + writeVertexEdges(outputStream, vertex); } outputStream.close(); } /** - * Delete a partition file + * Delete a partition's files. * * @param id The id of the partition owning the file. */ - public void deletePartitionFile(Integer id) { - File file = new File(getPartitionPath(id)); + public void deletePartitionFiles(Integer id) { + File file = new File(getVerticesPath(id)); + file.delete(); + file = new File(getEdgesPath(id)); file.delete(); } /** - * Get the path to the file where a partition is stored. + * Get the path and basename of the storage files. * * @param partitionId The partition * @return The path to the given partition @@ -447,6 +560,26 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** + * Get the path to the file where vertices are stored. + * + * @param partitionId The partition + * @return The path to the vertices file + */ + private String getVerticesPath(Integer partitionId) { + return getPartitionPath(partitionId) + "_vertices"; + } + + /** + * Get the path to the file where edges are stored. + * + * @param partitionId The partition + * @return The path to the edges file + */ + private String getEdgesPath(Integer partitionId) { + return getPartitionPath(partitionId) + "_edges"; + } + + /** * Task that gets a partition from the store */ private class GetPartition implements Callable<Partition<I, V, E, M>> { @@ -707,7 +840,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, switch (pState) { case ONDISK: onDisk.remove(id); - deletePartitionFile(id); + deletePartitionFiles(id); done = true; break; case INACTIVE:
