Repository: giraph Updated Branches: refs/heads/trunk f0b6cddd3 -> 9cedc7d76
GIRAPH-873 : Specialized edge stores Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9cedc7d7 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9cedc7d7 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9cedc7d7 Branch: refs/heads/trunk Commit: 9cedc7d76f2bbe52b3d1cc4caf8024e730266f83 Parents: f0b6cdd Author: Pavan Kumar <[email protected]> Authored: Sun Jun 1 13:29:38 2014 -0700 Committer: Pavan Kumar <[email protected]> Committed: Sun Jun 1 13:31:42 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/comm/ServerData.java | 5 +- .../org/apache/giraph/conf/GiraphConstants.java | 9 + .../ImmutableClassesGiraphConfiguration.java | 12 + .../apache/giraph/edge/AbstractEdgeStore.java | 276 +++++++++++++++++++ .../java/org/apache/giraph/edge/EdgeStore.java | 205 +------------- .../apache/giraph/edge/EdgeStoreFactory.java | 54 ++++ .../giraph/edge/InMemoryEdgeStoreFactory.java | 79 ++++++ .../org/apache/giraph/edge/SimpleEdgeStore.java | 123 +++++++++ .../giraph/edge/primitives/IntEdgeStore.java | 133 +++++++++ .../giraph/edge/primitives/LongEdgeStore.java | 134 +++++++++ .../giraph/edge/primitives/package-info.java | 21 ++ 12 files changed, 851 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d1663c4..36af911 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-873: Specialized edge stores + GIRAPH-898: Remove giraph-accumulo from Facebook profile (edunov via majakabiljo) GIRAPH-896: Fix memory leak in SuperstepMetricsRegistry (edunov via pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index 5a217d4..f0ecca2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -26,6 +26,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.EdgeStore; +import org.apache.giraph.edge.EdgeStoreFactory; import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.partition.DiskBackedPartitionStore; import org.apache.giraph.partition.PartitionStore; @@ -108,7 +109,9 @@ public class ServerData<I extends WritableComparable, partitionStore = new SimplePartitionStore<I, V, E>(conf, context); } - edgeStore = new EdgeStore<I, V, E>(service, conf, context); + EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); + edgeStoreFactory.initialize(service, conf, context); + edgeStore = edgeStoreFactory.newStore(); ownerAggregatorData = new OwnerAggregatorServerData(context, conf); allAggregatorData = new AllAggregatorServerData(context, conf); } http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 7f1317f..6b36418 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -23,6 +23,8 @@ import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.edge.EdgeStoreFactory; +import org.apache.giraph.edge.InMemoryEdgeStoreFactory; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.factories.ComputationFactory; import org.apache.giraph.factories.DefaultComputationFactory; @@ -94,6 +96,13 @@ public interface GiraphConstants { TypesHolder.class, "TypesHolder, used if Computation not set - optional"); + /** Edge Store Factory */ + ClassConfOption<EdgeStoreFactory> EDGE_STORE_FACTORY_CLASS = + ClassConfOption.create("giraph.edgeStoreFactoryClass", + InMemoryEdgeStoreFactory.class, + EdgeStoreFactory.class, + "Edge Store Factory class to use for creating edgeStore"); + /** Message Store Factory */ ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS = ClassConfOption.create("giraph.messageStoreFactoryClass", http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 2e8c935..95e029d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -22,6 +22,7 @@ import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.edge.EdgeStoreFactory; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.edge.ReusableEdge; import org.apache.giraph.factories.ComputationFactory; @@ -768,6 +769,17 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create edge store factory + * + * @return edge store factory + */ + public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() { + Class<? extends EdgeStoreFactory> edgeStoreFactoryClass = + EDGE_STORE_FACTORY_CLASS.get(this); + return ReflectionUtils.newInstance(edgeStoreFactoryClass); + } + + /** * Get the user's subclassed incoming message value class. * * @param <M> Message data http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java new file mode 100644 index 0000000..80e909d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import com.google.common.collect.MapMaker; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.partition.Partition; +import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.ProgressableUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; + +/** + * Basic implementation of edges store, extended this to easily define simple + * and primitive edge stores + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <K> Key corresponding to Vertex id + * @param <Et> Entry type + */ +public abstract class AbstractEdgeStore<I extends WritableComparable, + V extends Writable, E extends Writable, K, Et> + extends DefaultImmutableClassesGiraphConfigurable<I, V, E> + implements EdgeStore<I, V, E> { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class); + /** Service worker. */ + protected CentralizedServiceWorker<I, V, E> service; + /** Giraph configuration. */ + protected ImmutableClassesGiraphConfiguration<I, V, E> configuration; + /** Progressable to report progress. */ + protected Progressable progressable; + /** Map used to temporarily store incoming edges. */ + protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges; + /** + * Whether the chosen {@link OutEdges} implementation allows for Edge + * reuse. + */ + protected boolean reuseEdgeObjects; + /** + * Whether the {@link OutEdges} class used during input is different + * from the one used during computation. + */ + protected boolean useInputOutEdges; + + /** + * Constructor. + * + * @param service Service worker + * @param configuration Configuration + * @param progressable Progressable + */ + public AbstractEdgeStore( + CentralizedServiceWorker<I, V, E> service, + ImmutableClassesGiraphConfiguration<I, V, E> configuration, + Progressable progressable) { + this.service = service; + this.configuration = configuration; + this.progressable = progressable; + transientEdges = new MapMaker().concurrencyLevel( + configuration.getNettyServerExecutionConcurrency()).makeMap(); + reuseEdgeObjects = configuration.reuseEdgeObjects(); + useInputOutEdges = configuration.useInputOutEdges(); + } + + /** + * Get vertexId for a given key + * + * @param entry for vertexId key + * @param representativeVertexId representativeVertexId + * @return vertex Id + */ + protected abstract I getVertexId(Et entry, I representativeVertexId); + + /** + * Create vertexId from a given key + * + * @param entry for vertexId key + * @return new vertexId + */ + protected abstract I createVertexId(Et entry); + + /** + * Get OutEdges for a given partition + * + * @param partitionId id of partition + * @return OutEdges for the partition + */ + protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId); + + /** + * Remove and return the OutEdges for a given partition + * + * @param entry for vertexId key + * @param partitionEdges map of out-edges for vertices in a partition + * @return out edges + */ + protected abstract OutEdges<I, E> removePartitionEdges(Et entry, + Map<K, OutEdges<I, E>> partitionEdges); + + /** + * Get iterator for partition edges + * + * @param partitionEdges map of out-edges for vertices in a partition + * @return iterator + */ + protected abstract Iterator<Et> + getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges); + + /** + * Get out-edges for a given vertex + * + * @param vertexIdEdgeIterator vertex Id Edge iterator + * @param partitionEdgesIn map of out-edges for vertices in a partition + * @return out-edges for the vertex + */ + protected abstract OutEdges<I, E> getVertexOutEdges( + ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator, + Map<K, OutEdges<I, E>> partitionEdgesIn); + + @Override + public void addPartitionEdges( + int partitionId, ByteArrayVertexIdEdges<I, E> edges) { + Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId); + + ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator = + edges.getVertexIdEdgeIterator(); + while (vertexIdEdgeIterator.hasNext()) { + vertexIdEdgeIterator.next(); + Edge<I, E> edge = reuseEdgeObjects ? + vertexIdEdgeIterator.getCurrentEdge() : + vertexIdEdgeIterator.releaseCurrentEdge(); + OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator, + partitionEdges); + synchronized (outEdges) { + outEdges.add(edge); + } + } + } + + /** + * Convert the input edges to the {@link OutEdges} data structure used + * for computation (if different). + * + * @param inputEdges Input edges + * @return Compute edges + */ + private OutEdges<I, E> convertInputToComputeEdges( + OutEdges<I, E> inputEdges) { + if (!useInputOutEdges) { + return inputEdges; + } else { + return configuration.createAndInitializeOutEdges(inputEdges); + } + } + + @Override + public void moveEdgesToVertices() { + final boolean createSourceVertex = configuration.getCreateSourceVertex(); + if (transientEdges.isEmpty()) { + if (LOG.isInfoEnabled()) { + LOG.info("moveEdgesToVertices: No edges to move"); + } + return; + } + + if (LOG.isInfoEnabled()) { + LOG.info("moveEdgesToVertices: Moving incoming edges to vertices."); + } + + final BlockingQueue<Integer> partitionIdQueue = + new ArrayBlockingQueue<>(transientEdges.size()); + partitionIdQueue.addAll(transientEdges.keySet()); + int numThreads = configuration.getNumInputSplitsThreads(); + + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + Integer partitionId; + I representativeVertexId = configuration.createVertexId(); + while ((partitionId = partitionIdQueue.poll()) != null) { + Partition<I, V, E> partition = + service.getPartitionStore().getOrCreatePartition(partitionId); + Map<K, OutEdges<I, E>> partitionEdges = + transientEdges.remove(partitionId); + Iterator<Et> iterator = + getPartitionEdgesIterator(partitionEdges); + // process all vertices in given partition + while (iterator.hasNext()) { + Et entry = iterator.next(); + I vertexId = getVertexId(entry, + representativeVertexId); + OutEdges<I, E> outEdges = convertInputToComputeEdges( + removePartitionEdges(entry, partitionEdges)); + Vertex<I, V, E> vertex = partition.getVertex(vertexId); + // If the source vertex doesn't exist, create it. Otherwise, + // just set the edges. + if (vertex == null) { + if (createSourceVertex) { + // createVertex only if it is allowed by configuration + vertex = configuration.createVertex(); + vertex.initialize(createVertexId(entry), + configuration.createVertexValue(), outEdges); + partition.putVertex(vertex); + } + } else { + // A vertex may exist with or without edges initially + // and optimize the case of no initial edges + if (vertex.getNumEdges() == 0) { + vertex.setEdges(outEdges); + } else { + for (Edge<I, E> edge : outEdges) { + vertex.addEdge(edge); + } + } + // Some Partition implementations (e.g. ByteArrayPartition) + // require us to put back the vertex after modifying it. + partition.saveVertex(vertex); + } + } + // Some PartitionStore implementations + // (e.g. DiskBackedPartitionStore) require us to put back the + // partition after modifying it. + service.getPartitionStore().putPartition(partition); + } + return null; + } + }; + } + }; + ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads, + "move-edges-%d", progressable); + + // remove all entries + transientEdges.clear(); + + if (LOG.isInfoEnabled()) { + LOG.info("moveEdgesToVertices: Finished moving incoming edges to " + + "vertices."); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java index 57ad387..1150eaf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java @@ -18,25 +18,9 @@ package org.apache.giraph.edge; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.partition.Partition; import org.apache.giraph.utils.ByteArrayVertexIdEdges; -import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.utils.ProgressableUtils; -import org.apache.giraph.utils.Trimmable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Logger; - -import com.google.common.collect.MapMaker; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; /** * Collects incoming edges for vertices owned by this worker. @@ -45,50 +29,8 @@ import java.util.concurrent.ConcurrentMap; * @param <V> Vertex value * @param <E> Edge value */ -public class EdgeStore<I extends WritableComparable, - V extends Writable, E extends Writable> { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(EdgeStore.class); - /** Service worker. */ - private CentralizedServiceWorker<I, V, E> service; - /** Giraph configuration. */ - private ImmutableClassesGiraphConfiguration<I, V, E> configuration; - /** Progressable to report progress. */ - private Progressable progressable; - /** Map used to temporarily store incoming edges. */ - private ConcurrentMap<Integer, - ConcurrentMap<I, OutEdges<I, E>>> transientEdges; - /** - * Whether the chosen {@link OutEdges} implementation allows for Edge - * reuse. - */ - private boolean reuseEdgeObjects; - /** - * Whether the {@link OutEdges} class used during input is different - * from the one used during computation. - */ - private boolean useInputOutEdges; - - /** - * Constructor. - * - * @param service Service worker - * @param configuration Configuration - * @param progressable Progressable - */ - public EdgeStore( - CentralizedServiceWorker<I, V, E> service, - ImmutableClassesGiraphConfiguration<I, V, E> configuration, - Progressable progressable) { - this.service = service; - this.configuration = configuration; - this.progressable = progressable; - transientEdges = new MapMaker().concurrencyLevel( - configuration.getNettyServerExecutionConcurrency()).makeMap(); - reuseEdgeObjects = configuration.reuseEdgeObjects(); - useInputOutEdges = configuration.useInputOutEdges(); - } - +public interface EdgeStore<I extends WritableComparable, + V extends Writable, E extends Writable> { /** * Add edges belonging to a given partition on this worker. * Note: This method is thread-safe. @@ -96,150 +38,11 @@ public class EdgeStore<I extends WritableComparable, * @param partitionId Partition id for the incoming edges. * @param edges Incoming edges */ - public void addPartitionEdges( - int partitionId, ByteArrayVertexIdEdges<I, E> edges) { - ConcurrentMap<I, OutEdges<I, E>> partitionEdges = - transientEdges.get(partitionId); - if (partitionEdges == null) { - ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges = - new MapMaker().concurrencyLevel( - configuration.getNettyServerExecutionConcurrency()).makeMap(); - partitionEdges = transientEdges.putIfAbsent(partitionId, - newPartitionEdges); - if (partitionEdges == null) { - partitionEdges = newPartitionEdges; - } - } - ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator = - edges.getVertexIdEdgeIterator(); - while (vertexIdEdgeIterator.hasNext()) { - vertexIdEdgeIterator.next(); - I vertexId = vertexIdEdgeIterator.getCurrentVertexId(); - Edge<I, E> edge = reuseEdgeObjects ? - vertexIdEdgeIterator.getCurrentEdge() : - vertexIdEdgeIterator.releaseCurrentEdge(); - OutEdges<I, E> outEdges = partitionEdges.get(vertexId); - if (outEdges == null) { - OutEdges<I, E> newOutEdges = - configuration.createAndInitializeInputOutEdges(); - outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges); - if (outEdges == null) { - outEdges = newOutEdges; - // Since we had to use the vertex id as a new key in the map, - // we need to release the object. - vertexIdEdgeIterator.releaseCurrentVertexId(); - } - } - synchronized (outEdges) { - outEdges.add(edge); - } - } - } - - /** - * Convert the input edges to the {@link OutEdges} data structure used - * for computation (if different). - * - * @param inputEdges Input edges - * @return Compute edges - */ - private OutEdges<I, E> convertInputToComputeEdges( - OutEdges<I, E> inputEdges) { - if (!useInputOutEdges) { - return inputEdges; - } else { - return configuration.createAndInitializeOutEdges(inputEdges); - } - } + void addPartitionEdges(int partitionId, ByteArrayVertexIdEdges<I, E> edges); /** * Move all edges from temporary storage to their source vertices. * Note: this method is not thread-safe. */ - public void moveEdgesToVertices() { - final boolean createSourceVertex = configuration. - getCreateSourceVertex(); - if (transientEdges.isEmpty()) { - if (LOG.isInfoEnabled()) { - LOG.info("moveEdgesToVertices: No edges to move"); - } - return; - } - - if (LOG.isInfoEnabled()) { - LOG.info("moveEdgesToVertices: Moving incoming edges to vertices."); - } - - final BlockingQueue<Integer> partitionIdQueue = - new ArrayBlockingQueue<Integer>(transientEdges.size()); - partitionIdQueue.addAll(transientEdges.keySet()); - int numThreads = configuration.getNumInputSplitsThreads(); - - CallableFactory<Void> callableFactory = new CallableFactory<Void>() { - @Override - public Callable<Void> newCallable(int callableId) { - return new Callable<Void>() { - @Override - public Void call() throws Exception { - Integer partitionId; - while ((partitionId = partitionIdQueue.poll()) != null) { - Partition<I, V, E> partition = - service.getPartitionStore().getOrCreatePartition(partitionId); - ConcurrentMap<I, OutEdges<I, E>> partitionEdges = - transientEdges.remove(partitionId); - for (I vertexId : partitionEdges.keySet()) { - OutEdges<I, E> outEdges = convertInputToComputeEdges( - partitionEdges.remove(vertexId)); - Vertex<I, V, E> vertex = partition.getVertex(vertexId); - // If the source vertex doesn't exist, create it. Otherwise, - // just set the edges. - if (vertex == null) { - if (createSourceVertex) { - // createVertex only if it is allowed by configuration - vertex = configuration.createVertex(); - vertex.initialize(vertexId, - configuration.createVertexValue(), outEdges); - if (vertex instanceof Trimmable) { - ((Trimmable) vertex).trim(); - } - partition.putVertex(vertex); - } - } else { - // A vertex may exist with or without edges initially - // and optimize the case of no initial edges - if (vertex.getNumEdges() == 0) { - vertex.setEdges(outEdges); - } else { - for (Edge<I, E> edge : outEdges) { - vertex.addEdge(edge); - } - } - if (vertex instanceof Trimmable) { - ((Trimmable) vertex).trim(); - } - // Some Partition implementations (e.g. ByteArrayPartition) - // require us to put back the vertex after modifying it. - partition.saveVertex(vertex); - } - } - // Some PartitionStore implementations - // (e.g. DiskBackedPartitionStore) require us to put back the - // partition after modifying it. - service.getPartitionStore().putPartition(partition); - } - return null; - } - }; - } - }; - ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads, - "move-edges-%d", progressable); - - transientEdges.clear(); - - if (LOG.isInfoEnabled()) { - LOG.info("moveEdgesToVertices: Finished moving incoming edges to " + - "vertices."); - } - } + void moveEdgesToVertices(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java new file mode 100644 index 0000000..cb47fd0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStoreFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; + +/** + * Factory to create a new Edge Store + * @param <I> vertex id + * @param <V> vertex value + * @param <E> edge value + */ +public interface EdgeStoreFactory<I extends WritableComparable, + V extends Writable, E extends Writable> { + + /** + * Creates new edge store. + * + * @return edge store + */ + EdgeStore<I, V, E> newStore(); + + /** + * Implementation class should use this method of initialization + * of any required internal state. + * + * @param service Service to get partition mappings + * @param conf Configuration + * @param progressable Progressable + */ + void initialize(CentralizedServiceWorker<I, V, E> service, + ImmutableClassesGiraphConfiguration<I, V, E> conf, + Progressable progressable); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java new file mode 100644 index 0000000..d3d6997 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/InMemoryEdgeStoreFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.primitives.IntEdgeStore; +import org.apache.giraph.edge.primitives.LongEdgeStore; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; + +/** + * Edge store factory which produces message stores which hold all + * edges in memory. It creates primitive edges stores when vertex id is + * IntWritable or LongWritable + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +@SuppressWarnings("unchecked") +public class InMemoryEdgeStoreFactory<I extends WritableComparable, + V extends Writable, E extends Writable> + implements EdgeStoreFactory<I, V, E> { + /** Service worker. */ + protected CentralizedServiceWorker<I, V, E> service; + /** Giraph configuration. */ + protected ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Progressable to report progress. */ + protected Progressable progressable; + + @Override + public EdgeStore<I, V, E> newStore() { + Class<I> vertexIdClass = conf.getVertexIdClass(); + EdgeStore<I, V, E> edgeStore; + if (vertexIdClass.equals(IntWritable.class)) { + edgeStore = (EdgeStore<I, V, E>) new IntEdgeStore<>( + (CentralizedServiceWorker<IntWritable, V, E>) service, + (ImmutableClassesGiraphConfiguration<IntWritable, V, E>) conf, + progressable); + } else if (vertexIdClass.equals(LongWritable.class)) { + edgeStore = (EdgeStore<I, V, E>) new LongEdgeStore<>( + (CentralizedServiceWorker<LongWritable, V, E>) service, + (ImmutableClassesGiraphConfiguration<LongWritable, V, E>) conf, + progressable); + } else { + edgeStore = new SimpleEdgeStore<>(service, conf, progressable); + } + return edgeStore; + } + + @Override + public void initialize(CentralizedServiceWorker<I, V, E> service, + ImmutableClassesGiraphConfiguration<I, V, E> conf, + Progressable progressable) { + this.service = service; + this.conf = conf; + this.progressable = progressable; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java new file mode 100644 index 0000000..6e2a74f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; + +import com.google.common.collect.MapMaker; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +/** + * Simple in memory edge store which supports any type of ids. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public class SimpleEdgeStore<I extends WritableComparable, + V extends Writable, E extends Writable> + extends AbstractEdgeStore<I, V, E, I, + Map.Entry<I, OutEdges<I, E>>> { + + /** + * Constructor. + * + * @param service Service worker + * @param configuration Configuration + * @param progressable Progressable + */ + public SimpleEdgeStore( + CentralizedServiceWorker<I, V, E> service, + ImmutableClassesGiraphConfiguration<I, V, E> configuration, + Progressable progressable) { + super(service, configuration, progressable); + } + + @Override + protected I getVertexId(Map.Entry<I, OutEdges<I, E>> entry, + I representativeVertexId) { + return entry.getKey(); + } + + @Override + protected I createVertexId(Map.Entry<I, OutEdges<I, E>> entry) { + return entry.getKey(); + } + + @Override + protected ConcurrentMap<I, OutEdges<I, E>> getPartitionEdges( + int partitionId) { + ConcurrentMap<I, OutEdges<I, E>> partitionEdges = + (ConcurrentMap<I, OutEdges<I, E>>) transientEdges.get(partitionId); + if (partitionEdges == null) { + ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges = + new MapMaker().concurrencyLevel( + configuration.getNettyServerExecutionConcurrency()).makeMap(); + partitionEdges = (ConcurrentMap<I, OutEdges<I, E>>) + transientEdges.putIfAbsent(partitionId, newPartitionEdges); + if (partitionEdges == null) { + partitionEdges = newPartitionEdges; + } + } + return partitionEdges; + } + + @Override + protected OutEdges<I, E> removePartitionEdges( + Map.Entry<I, OutEdges<I, E>> entry, + Map<I, OutEdges<I, E>> partitionEdges) { + return partitionEdges.put(entry.getKey(), null); + } + + @Override + protected Iterator<Map.Entry<I, OutEdges<I, E>>> + getPartitionEdgesIterator(Map<I, OutEdges<I, E>> partitionEdges) { + return partitionEdges.entrySet().iterator(); + } + + @Override + protected OutEdges<I, E> getVertexOutEdges( + ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator, + Map<I, OutEdges<I, E>> partitionEdgesIn) { + ConcurrentMap<I, OutEdges<I, E>> partitionEdges = + (ConcurrentMap<I, OutEdges<I, E>>) partitionEdgesIn; + I vertexId = vertexIdEdgeIterator.getCurrentVertexId(); + OutEdges<I, E> outEdges = partitionEdges.get(vertexId); + if (outEdges == null) { + OutEdges<I, E> newOutEdges = + configuration.createAndInitializeInputOutEdges(); + outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges); + if (outEdges == null) { + outEdges = newOutEdges; + // Since we had to use the vertex id as a new key in the map, + // we need to release the object. + vertexIdEdgeIterator.releaseCurrentVertexId(); + } + } + return outEdges; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java new file mode 100644 index 0000000..c6b5051 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge.primitives; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.AbstractEdgeStore; +import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.Progressable; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; + +import java.util.Iterator; +import java.util.Map; + +/** + * Special edge store to be used when ids are IntWritable. + * Uses fastutil primitive maps in order to decrease number of objects and + * get better performance. + * + * @param <V> Vertex value + * @param <E> Edge value + */ +public class IntEdgeStore<V extends Writable, E extends Writable> + extends AbstractEdgeStore<IntWritable, V, E, Integer, + Int2ObjectMap.Entry<OutEdges<IntWritable, E>>> { + + /** + * Constructor. + * + * @param service Service worker + * @param configuration Configuration + * @param progressable Progressable + */ + public IntEdgeStore( + CentralizedServiceWorker<IntWritable, V, E> service, + ImmutableClassesGiraphConfiguration<IntWritable, V, E> configuration, + Progressable progressable) { + super(service, configuration, progressable); + } + + @Override + protected IntWritable getVertexId( + Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry, + IntWritable representativeVertexId) { + representativeVertexId.set(entry.getIntKey()); + return representativeVertexId; + } + + @Override + protected IntWritable createVertexId( + Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry) { + return new IntWritable(entry.getIntKey()); + } + + @Override + protected OutEdges<IntWritable, E> removePartitionEdges( + Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry, + Map<Integer, OutEdges<IntWritable, E>> partitionEdges) { + return partitionEdges.put(entry.getIntKey(), null); + } + + @Override + protected Iterator<Int2ObjectMap.Entry<OutEdges<IntWritable, E>>> + getPartitionEdgesIterator( + Map<Integer, OutEdges<IntWritable, E>> partitionEdges) { + return ((Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdges) + .int2ObjectEntrySet() + .iterator(); + } + + @Override + protected Int2ObjectMap<OutEdges<IntWritable, E>> getPartitionEdges( + int partitionId) { + Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges = + (Int2ObjectMap<OutEdges<IntWritable, E>>) + transientEdges.get(partitionId); + if (partitionEdges == null) { + Int2ObjectMap<OutEdges<IntWritable, E>> newPartitionEdges = + Int2ObjectMaps.synchronize( + new Int2ObjectOpenHashMap<OutEdges<IntWritable, E>>()); + partitionEdges = (Int2ObjectMap<OutEdges<IntWritable, E>>) + transientEdges.putIfAbsent(partitionId, + newPartitionEdges); + if (partitionEdges == null) { + partitionEdges = newPartitionEdges; + } + } + return partitionEdges; + } + + @Override + protected OutEdges<IntWritable, E> getVertexOutEdges( + ByteArrayVertexIdEdges<IntWritable, E>.VertexIdEdgeIterator + vertexIdEdgeIterator, + Map<Integer, OutEdges<IntWritable, E>> partitionEdgesIn) { + Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges = + (Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdgesIn; + IntWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId(); + OutEdges<IntWritable, E> outEdges = partitionEdges.get(vertexId.get()); + if (outEdges == null) { + synchronized (partitionEdges) { + outEdges = partitionEdges.get(vertexId.get()); + if (outEdges == null) { + outEdges = configuration.createAndInitializeInputOutEdges(); + partitionEdges.put(vertexId.get(), outEdges); + } + } + } + return outEdges; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java new file mode 100644 index 0000000..d4c44c7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge.primitives; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.AbstractEdgeStore; +import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.Progressable; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + +import java.util.Iterator; +import java.util.Map; + +/** + * Special edge store to be used when ids are LongWritable. + * Uses fastutil primitive maps in order to decrease number of objects and + * get better performance. + * + * @param <V> Vertex value + * @param <E> Edge value + */ +public class LongEdgeStore<V extends Writable, E extends Writable> + extends AbstractEdgeStore<LongWritable, V, E, Long, + Long2ObjectMap.Entry<OutEdges<LongWritable, E>>> { + + /** + * Constructor. + * + * @param service Service worker + * @param configuration Configuration + * @param progressable Progressable + */ + public LongEdgeStore( + CentralizedServiceWorker<LongWritable, V, E> service, + ImmutableClassesGiraphConfiguration<LongWritable, V, E> configuration, + Progressable progressable) { + super(service, configuration, progressable); + } + + @Override + protected LongWritable getVertexId( + Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry, + LongWritable representativeVertexId) { + representativeVertexId.set(entry.getLongKey()); + return representativeVertexId; + } + + @Override + protected LongWritable createVertexId( + Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry) { + return new LongWritable(entry.getLongKey()); + } + + + @Override + protected OutEdges<LongWritable, E> removePartitionEdges( + Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry, + Map<Long, OutEdges<LongWritable, E>> partitionEdges) { + return partitionEdges.put(entry.getLongKey(), null); + } + + @Override + protected Iterator<Long2ObjectMap.Entry<OutEdges<LongWritable, E>>> + getPartitionEdgesIterator( + Map<Long, OutEdges<LongWritable, E>> partitionEdges) { + return ((Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdges) + .long2ObjectEntrySet() + .iterator(); + } + + @Override + protected Long2ObjectMap<OutEdges<LongWritable, E>> getPartitionEdges( + int partitionId) { + Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges = + (Long2ObjectMap<OutEdges<LongWritable, E>>) + transientEdges.get(partitionId); + if (partitionEdges == null) { + Long2ObjectMap<OutEdges<LongWritable, E>> newPartitionEdges = + Long2ObjectMaps.synchronize( + new Long2ObjectOpenHashMap<OutEdges<LongWritable, E>>()); + partitionEdges = (Long2ObjectMap<OutEdges<LongWritable, E>>) + transientEdges.putIfAbsent(partitionId, + newPartitionEdges); + if (partitionEdges == null) { + partitionEdges = newPartitionEdges; + } + } + return partitionEdges; + } + + @Override + protected OutEdges<LongWritable, E> getVertexOutEdges( + ByteArrayVertexIdEdges<LongWritable, E>.VertexIdEdgeIterator + vertexIdEdgeIterator, + Map<Long, OutEdges<LongWritable, E>> partitionEdgesIn) { + Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges = + (Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdgesIn; + LongWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId(); + OutEdges<LongWritable, E> outEdges = partitionEdges.get(vertexId.get()); + if (outEdges == null) { + synchronized (partitionEdges) { + outEdges = partitionEdges.get(vertexId.get()); + if (outEdges == null) { + outEdges = configuration.createAndInitializeInputOutEdges(); + partitionEdges.put(vertexId.get(), outEdges); + } + } + } + return outEdges; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9cedc7d7/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java new file mode 100644 index 0000000..81c5b6c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of edge stores specialized for certain type of vertex ids. + */ +package org.apache.giraph.edge.primitives;
