Updated Branches: refs/heads/trunk 185b9ffc8 -> 621a022c0
GIRAPH-615: Add support for multithreaded output (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/621a022c Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/621a022c Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/621a022c Branch: refs/heads/trunk Commit: 621a022c0e910f38d961ec50a29e2686d7ae49a0 Parents: 185b9ff Author: Maja Kabiljo <[email protected]> Authored: Wed Apr 10 11:28:17 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Wed Apr 10 11:36:50 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 24 +++ .../org/apache/giraph/conf/GiraphConstants.java | 3 + .../java/org/apache/giraph/edge/EdgeStore.java | 86 +++++------ .../org/apache/giraph/graph/ComputeCallable.java | 3 +- .../org/apache/giraph/graph/GraphTaskManager.java | 70 ++++----- .../org/apache/giraph/utils/CallableFactory.java | 36 +++++ .../org/apache/giraph/utils/ProgressableUtils.java | 39 +++++ .../org/apache/giraph/worker/BspServiceWorker.java | 122 +++++++-------- .../worker/EdgeInputSplitsCallableFactory.java | 4 +- .../giraph/worker/InputSplitsCallableFactory.java | 41 ----- .../worker/VertexInputSplitsCallableFactory.java | 4 +- 12 files changed, 240 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index e79c763..d9c88ec 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-615: Add support for multithreaded output (majakabiljo) + GIRAPH-612: Improve website for upcoming release (aching) GIRAPH-527: readVertexInputSplit is always reporting 0 vertices and 0 edges (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 90b05e3..01f22da 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -302,6 +302,30 @@ public class GiraphConfiguration extends Configuration } /** + * Get the number of threads to use for writing output in the end of the + * application. If output format is not thread safe, returns 1. + * + * @return Number of output threads + */ + public final int getNumOutputThreads() { + if (!vertexOutputFormatThreadSafe()) { + return 1; + } else { + return NUM_OUTPUT_THREADS.get(this); + } + } + + /** + * Set the number of threads to use for writing output in the end of the + * application. Will be used only if {#vertexOutputFormatThreadSafe} is true. + * + * @param numOutputThreads Number of output threads + */ + public void setNumOutputThreads(int numOutputThreads) { + NUM_OUTPUT_THREADS.set(this, numOutputThreads); + } + + /** * Set the vertex combiner class (optional) * * @param vertexCombinerClass Determines how vertex messages are combined http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 95c9862..21e094d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -140,6 +140,9 @@ public interface GiraphConstants { */ BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE = new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false); + /** Number of threads for writing output in the end of the application */ + IntConfOption NUM_OUTPUT_THREADS = + new IntConfOption("giraph.numOutputThreads", 1); /** conf key for comma-separated list of jars to export to YARN workers */ StrConfOption GIRAPH_YARN_LIBJARS = http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java index 01a67dd..e8cb620 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java @@ -19,19 +19,16 @@ package org.apache.giraph.edge; import com.google.common.collect.MapMaker; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.partition.Partition; import org.apache.giraph.utils.ByteArrayVertexIdEdges; -import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.ProgressableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -178,53 +175,50 @@ public class EdgeStore<I extends WritableComparable, new ArrayBlockingQueue<Integer>(transientEdges.size()); partitionIdQueue.addAll(transientEdges.keySet()); int numThreads = configuration.getNumInputSplitsThreads(); - ExecutorService movePartitionExecutor = - Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setNameFormat("move-edges-%d").build()); - for (int i = 0; i < numThreads; ++i) { - Callable moveCallable = new Callable<Void>() { - @Override - public Void call() throws Exception { - Integer partitionId; - while ((partitionId = partitionIdQueue.poll()) != null) { - Partition<I, V, E, M> partition = - service.getPartitionStore().getPartition(partitionId); - ConcurrentMap<I, VertexEdges<I, E>> partitionEdges = - transientEdges.remove(partitionId); - for (I vertexId : partitionEdges.keySet()) { - VertexEdges<I, E> vertexEdges = convertInputToComputeEdges( - partitionEdges.remove(vertexId)); - Vertex<I, V, E, M> vertex = partition.getVertex(vertexId); - // If the source vertex doesn't exist, create it. Otherwise, - // just set the edges. - if (vertex == null) { - vertex = configuration.createVertex(); - vertex.initialize(vertexId, configuration.createVertexValue(), - vertexEdges); - partition.putVertex(vertex); - } else { - vertex.setEdges(vertexEdges); - // Some Partition implementations (e.g. ByteArrayPartition) - // require us to put back the vertex after modifying it. - partition.saveVertex(vertex); + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + Integer partitionId; + while ((partitionId = partitionIdQueue.poll()) != null) { + Partition<I, V, E, M> partition = + service.getPartitionStore().getPartition(partitionId); + ConcurrentMap<I, VertexEdges<I, E>> partitionEdges = + transientEdges.remove(partitionId); + for (I vertexId : partitionEdges.keySet()) { + VertexEdges<I, E> vertexEdges = convertInputToComputeEdges( + partitionEdges.remove(vertexId)); + Vertex<I, V, E, M> vertex = partition.getVertex(vertexId); + // If the source vertex doesn't exist, create it. Otherwise, + // just set the edges. + if (vertex == null) { + vertex = configuration.createVertex(); + vertex.initialize(vertexId, configuration.createVertexValue(), + vertexEdges); + partition.putVertex(vertex); + } else { + vertex.setEdges(vertexEdges); + // Some Partition implementations (e.g. ByteArrayPartition) + // require us to put back the vertex after modifying it. + partition.saveVertex(vertex); + } } + // Some PartitionStore implementations + // (e.g. DiskBackedPartitionStore) require us to put back the + // partition after modifying it. + service.getPartitionStore().putPartition(partition); } - // Some PartitionStore implementations - // (e.g. DiskBackedPartitionStore) require us to put back the - // partition after modifying it. - service.getPartitionStore().putPartition(partition); + return null; } - return null; - } - }; - movePartitionExecutor.submit( - new LogStacktraceCallable<Void>(moveCallable)); - } + }; + } + }; + ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads, + "move-edges-%d", progressable); - movePartitionExecutor.shutdown(); - ProgressableUtils.awaitExecutorTermination(movePartitionExecutor, - progressable); transientEdges.clear(); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 51ed4f6..0fc5fdf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -68,7 +68,8 @@ import java.util.concurrent.Callable; * @param <M> Message data */ public class ComputeCallable<I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> implements Callable { + E extends Writable, M extends Writable> + implements Callable<Collection<PartitionStats>> { /** Class logger */ private static final Logger LOG = Logger.getLogger(ComputeCallable.class); /** Class time object */ http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index abca4c4..97cf55d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -37,7 +37,7 @@ import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; -import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.ReflectionUtils; @@ -56,9 +56,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.io.IOException; import java.lang.reflect.Type; import java.net.URL; @@ -69,9 +66,7 @@ import java.util.Enumeration; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS; @@ -725,13 +720,13 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, * @param numPartitions the number of data partitions (vertices) to process * @param numThreads number of concurrent threads to do processing */ - private void processGraphPartitions(Mapper<?, ?, ?, ?>.Context context, - List<PartitionStats> partitionStatsList, GraphState<I, V, E, M> graphState, - MessageStoreByPartition<I, M> messageStore, int numPartitions, - int numThreads) { - List<Future<Collection<PartitionStats>>> partitionFutures = - Lists.newArrayListWithCapacity(numPartitions); - BlockingQueue<Integer> computePartitionIdQueue = + private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context, + List<PartitionStats> partitionStatsList, + final GraphState<I, V, E, M> graphState, + final MessageStoreByPartition<I, M> messageStore, + int numPartitions, + int numThreads) { + final BlockingQueue<Integer> computePartitionIdQueue = new ArrayBlockingQueue<Integer>(numPartitions); for (Integer partitionId : serviceWorker.getPartitionStore().getPartitionIds()) { @@ -741,32 +736,27 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, GiraphTimerContext computeAllTimerContext = computeAll.time(); timeToFirstMessageTimerContext = timeToFirstMessage.time(); - ExecutorService partitionExecutor = - Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setNameFormat("compute-%d").build()); - for (int i = 0; i < numThreads; ++i) { - ComputeCallable<I, V, E, M> computeCallable = - new ComputeCallable<I, V, E, M>( - context, - graphState, - messageStore, - computePartitionIdQueue, - conf, - serviceWorker); - LogStacktraceCallable<Collection<PartitionStats>> wrapped = - new LogStacktraceCallable<Collection<PartitionStats>>( - computeCallable); - partitionFutures.add(partitionExecutor.submit(wrapped)); - } - - // Wait until all the threads are done to wait on all requests - for (Future<Collection<PartitionStats>> partitionFuture : - partitionFutures) { - Collection<PartitionStats> stats = - ProgressableUtils.getFutureResult(partitionFuture, context); - partitionStatsList.addAll(stats); - } - partitionExecutor.shutdown(); + CallableFactory<Collection<PartitionStats>> callableFactory = + new CallableFactory<Collection<PartitionStats>>() { + @Override + public Callable<Collection<PartitionStats>> newCallable( + int callableId) { + return new ComputeCallable<I, V, E, M>( + context, + graphState, + messageStore, + computePartitionIdQueue, + conf, + serviceWorker); + } + }; + List<Collection<PartitionStats>> results = + ProgressableUtils.getResultsWithNCallables(callableFactory, + numThreads, "compute-%d", context); + for (Collection<PartitionStats> result : results) { + partitionStatsList.addAll(result); + } + computeAllTimerContext.stop(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java new file mode 100644 index 0000000..72d5b15 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import java.util.concurrent.Callable; + +/** + * Factory for creating {@link Callable}s + * + * @param <R> Callable result type + */ +public interface CallableFactory<R> { + /** + * Create new callable + * + * @param callableId Id of the callable + * @return Callable + */ + Callable<R> newCallable(int callableId); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java index 77eb49a..3b06604 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java @@ -23,8 +23,14 @@ import org.apache.log4j.Logger; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.group.ChannelGroupFuture; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -157,6 +163,39 @@ public class ProgressableUtils { } /** + * Create {#link numThreads} callables from {#link callableFactory}, + * execute them and gather results. + * + * @param callableFactory Factory for Callables + * @param numThreads Number of threads to use + * @param threadNameFormat Format for thread name + * @param progressable Progressable for reporting progress + * @param <R> Type of Callable's results + * @return List of results from Callables + */ + public static <R> List<R> getResultsWithNCallables( + CallableFactory<R> callableFactory, int numThreads, + String threadNameFormat, Progressable progressable) { + ExecutorService executorService = + Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build()); + List<Future<R>> futures = Lists.newArrayListWithCapacity(numThreads); + for (int i = 0; i < numThreads; i++) { + Callable<R> callable = callableFactory.newCallable(i); + Future<R> future = executorService.submit( + new LogStacktraceCallable<R>(callable)); + futures.add(future); + } + executorService.shutdown(); + List<R> futureResults = Lists.newArrayListWithCapacity(numThreads); + for (Future<R> future : futures) { + R result = ProgressableUtils.getFutureResult(future, progressable); + futureResults.add(result); + } + return futureResults; + } + + /** * Interface for waiting on a result from some operation. * * @param <T> Result type. http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index c20d06e..037cdfc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -42,7 +42,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.GlobalStats; import org.apache.giraph.io.superstep_output.SuperstepOutput; -import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexOutputFormat; @@ -85,7 +85,6 @@ import org.json.JSONException; import org.json.JSONObject; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import net.iharder.Base64; import java.io.ByteArrayOutputStream; @@ -102,9 +101,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -263,7 +259,7 @@ public class BspServiceWorker<I extends WritableComparable, */ private VertexEdgeCount loadInputSplits( List<String> inputSplitPathList, - InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory) + CallableFactory<VertexEdgeCount> inputSplitsCallableFactory) throws KeeperException, InterruptedException { VertexEdgeCount vertexEdgeCount = new VertexEdgeCount(); // Determine how many threads to use based on the number of input splits @@ -271,35 +267,20 @@ public class BspServiceWorker<I extends WritableComparable, getConfiguration().getMaxWorkers() + 1; int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(), maxInputSplitThreads); - ExecutorService inputSplitsExecutor = - Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setNameFormat("load-%d").build()); - List<Future<VertexEdgeCount>> threadsFutures = - Lists.newArrayListWithCapacity(numThreads); if (LOG.isInfoEnabled()) { LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " + "originally " + getConfiguration().getNumInputSplitsThreads() + " threads(s) for " + inputSplitPathList.size() + " total splits."); } - for (int i = 0; i < numThreads; ++i) { - Callable<VertexEdgeCount> inputSplitsCallable = - inputSplitsCallableFactory.newCallable(i); - LogStacktraceCallable<VertexEdgeCount> wrapped = - new LogStacktraceCallable<VertexEdgeCount>( - inputSplitsCallable); - threadsFutures.add(inputSplitsExecutor.submit(wrapped)); - } - // Wait until all the threads are done to wait on all requests - for (Future<VertexEdgeCount> threadFuture : threadsFutures) { - VertexEdgeCount threadVertexEdgeCount = - ProgressableUtils.getFutureResult(threadFuture, getContext()); - vertexEdgeCount = - vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount); + List<VertexEdgeCount> results = + ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory, + numThreads, "load-%d", getContext()); + for (VertexEdgeCount result : results) { + vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result); } workerClient.waitAllRequests(); - inputSplitsExecutor.shutdown(); return vertexEdgeCount; } @@ -946,47 +927,60 @@ else[HADOOP_NON_SECURE]*/ return; } + int numThreads = Math.min(getConfiguration().getNumOutputThreads(), + getPartitionStore().getNumPartitions()); LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, - "saveVertices: Starting to save " + numLocalVertices + " vertices"); - VertexOutputFormat<I, V, E> vertexOutputFormat = + "saveVertices: Starting to save " + numLocalVertices + " vertices " + + "using " + numThreads + " threads"); + final VertexOutputFormat<I, V, E> vertexOutputFormat = getConfiguration().createVertexOutputFormat(); - VertexWriter<I, V, E> vertexWriter = - vertexOutputFormat.createVertexWriter(getContext()); - vertexWriter.setConf( - (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) - getConfiguration()); - vertexWriter.initialize(getContext()); - long verticesWritten = 0; - long nextPrintVertices = 0; - long nextPrintMsecs = System.currentTimeMillis() + 15000; - int partitionIndex = 0; - int numPartitions = getPartitionStore().getNumPartitions(); - for (Integer partitionId : getPartitionStore().getPartitionIds()) { - Partition<I, V, E, M> partition = - getPartitionStore().getPartition(partitionId); - for (Vertex<I, V, E, M> vertex : partition) { - getContext().progress(); - vertexWriter.writeVertex(vertex); - ++verticesWritten; - - // Update status at most every 250k vertices or 15 seconds - if (verticesWritten > nextPrintVertices && - System.currentTimeMillis() > nextPrintMsecs) { - LoggerUtils.setStatusAndLog( - getContext(), LOG, Level.INFO, - "saveVertices: Saved " + - verticesWritten + " out of " + numLocalVertices + - " vertices, on partition " + partitionIndex + " out of " + - numPartitions); - nextPrintMsecs = System.currentTimeMillis() + 15000; - nextPrintVertices = verticesWritten + 250000; - } + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + VertexWriter<I, V, E> vertexWriter = + vertexOutputFormat.createVertexWriter(getContext()); + vertexWriter.setConf( + (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) + getConfiguration()); + vertexWriter.initialize(getContext()); + long verticesWritten = 0; + long nextPrintVertices = 0; + long nextPrintMsecs = System.currentTimeMillis() + 15000; + int partitionIndex = 0; + int numPartitions = getPartitionStore().getNumPartitions(); + for (Integer partitionId : getPartitionStore().getPartitionIds()) { + Partition<I, V, E, M> partition = + getPartitionStore().getPartition(partitionId); + for (Vertex<I, V, E, M> vertex : partition) { + vertexWriter.writeVertex(vertex); + ++verticesWritten; + + // Update status at most every 250k vertices or 15 seconds + if (verticesWritten > nextPrintVertices && + System.currentTimeMillis() > nextPrintMsecs) { + LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, + "saveVertices: Saved " + verticesWritten + " out of " + + partition.getVertexCount() + " partition vertices, " + + "on partition " + partitionIndex + + " out of " + numPartitions); + nextPrintMsecs = System.currentTimeMillis() + 15000; + nextPrintVertices = verticesWritten + 250000; + } + } + ++partitionIndex; + } + vertexWriter.close(getContext()); // the temp results are saved now + return null; + } + }; } - getPartitionStore().putPartition(partition); - getContext().progress(); - ++partitionIndex; - } - vertexWriter.close(getContext()); // the temp results are saved now + }; + ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads, + "save-vertices-%d", getContext()); + LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "saveVertices: Done saving vertices."); // YARN: must complete the commit the "task" output, Hadoop isn't there. http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java index 9297ac1..4a1705b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java @@ -20,6 +20,8 @@ package org.apache.giraph.worker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -35,7 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper; */ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> - implements InputSplitsCallableFactory<I, V, E, M> { + implements CallableFactory<VertexEdgeCount> { /** Mapper context. */ private final Mapper<?, ?, ?, ?>.Context context; /** Graph state. */ http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java deleted file mode 100644 index cdc6543..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.worker; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Factory class for creating {@link InputSplitsCallable}s. - * - * @param <I> Vertex id - * @param <V> Vertex value - * @param <E> Edge value - * @param <M> Message data - */ -public interface InputSplitsCallableFactory<I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> { - /** - * Return a newly-created {@link InputSplitsCallable}. - * - * @param threadId Id of input split thread - * @return A new {@link InputSplitsCallable} - */ - InputSplitsCallable<I, V, E, M> newCallable(int threadId); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java index aebca81..4eff3b8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java @@ -20,6 +20,8 @@ package org.apache.giraph.worker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -35,7 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper; */ public class VertexInputSplitsCallableFactory<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> - implements InputSplitsCallableFactory<I, V, E, M> { + implements CallableFactory<VertexEdgeCount> { /** Mapper context. */ private final Mapper<?, ?, ?, ?>.Context context; /** Graph state. */
