Updated Branches: refs/heads/trunk da2a68708 -> 67f5f7475
GIRAPH-600: Create an option to do output during computation (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/67f5f747 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/67f5f747 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/67f5f747 Branch: refs/heads/trunk Commit: 67f5f747578b0e67944da2fea9e4a3d3a22c4e09 Parents: da2a687 Author: Maja Kabiljo <[email protected]> Authored: Sat Mar 30 01:13:29 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Sat Mar 30 01:13:29 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/bsp/CentralizedServiceWorker.java | 8 + .../apache/giraph/conf/GiraphConfiguration.java | 40 +++++ .../org/apache/giraph/conf/GiraphConstants.java | 16 ++ .../conf/ImmutableClassesGiraphConfiguration.java | 24 +++ .../org/apache/giraph/graph/ComputeCallable.java | 15 ++- .../org/apache/giraph/graph/GraphTaskManager.java | 3 +- .../org/apache/giraph/io/SimpleVertexWriter.java | 45 ++++++ .../java/org/apache/giraph/io/VertexWriter.java | 13 +-- .../MultiThreadedSuperstepOutput.java | 119 +++++++++++++++ .../io/superstep_output/NoOpSuperstepOutput.java | 57 +++++++ .../io/superstep_output/SuperstepOutput.java | 57 +++++++ .../SynchronizedSuperstepOutput.java | 95 ++++++++++++ .../giraph/io/superstep_output/package-info.java | 21 +++ .../org/apache/giraph/worker/BspServiceWorker.java | 21 +++- 15 files changed, 521 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 8263026..e0a82a6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-600: Create an option to do output during computation (majakabiljo) + GIRAPH-599: Hive IO dependency issues with some Hadoop profiles (nitay via majakabiljo) GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta) http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index 56b5d03..1c7bde4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -24,6 +24,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.GraphTaskManager; import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.io.superstep_output.SuperstepOutput; import org.apache.giraph.master.MasterInfo; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; @@ -223,6 +224,13 @@ public interface CentralizedServiceWorker<I extends WritableComparable, void prepareSuperstep(); /** + * Get the superstep output class + * + * @return SuperstepOutput + */ + SuperstepOutput<I, V, E> getSuperstepOutput(); + + /** * Clean up the service (no calls may be issued after this) * * @param finishedSuperstepStats Finished supestep stats http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 963b82a..040c26f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -262,6 +262,46 @@ public class GiraphConfiguration extends Configuration } /** + * Check if output should be done during computation + * + * @return True iff output should be done during computation + */ + public final boolean doOutputDuringComputation() { + return DO_OUTPUT_DURING_COMPUTATION.get(this); + } + + /** + * Set whether or not we should do output during computation + * + * @param doOutputDuringComputation True iff we want output to happen + * during computation + */ + public final void setDoOutputDuringComputation( + boolean doOutputDuringComputation) { + DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation); + } + + /** + * Check if VertexOutputFormat is thread-safe + * + * @return True iff VertexOutputFormat is thread-safe + */ + public final boolean vertexOutputFormatThreadSafe() { + return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this); + } + + /** + * Set whether or not selected VertexOutputFormat is thread-safe + * + * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat + * is thread-safe + */ + public final void setVertexOutputFormatThreadSafe( + boolean vertexOutputFormatThreadSafe) { + VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe); + } + + /** * Set the vertex combiner class (optional) * * @param vertexCombinerClass Determines how vertex messages are combined http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index c5b9b93..eaa8363 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -124,6 +124,22 @@ public interface GiraphConstants { ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS = ClassConfOption.create("giraph.vertexOutputFormatClass", null, VertexOutputFormat.class); + /** + * If you use this option, instead of having saving vertices in the end of + * application, saveVertex will be called right after each vertex.compute() + * is called. + * NOTE: This feature doesn't work well with checkpointing - if you restart + * from a checkpoint you won't have any ouptut from previous supresteps. + */ + BooleanConfOption DO_OUTPUT_DURING_COMPUTATION = + new BooleanConfOption("giraph.doOutputDuringComputation", false); + /** + * Vertex output format thread-safe - if your VertexOutputFormat allows + * several vertexWriters to be created and written to in parallel, + * you should set this to true. + */ + BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE = + new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false); /** Output Format Path (for Giraph-on-YARN) */ String GIRAPH_OUTPUT_DIR = "giraph.output.dir"; http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 76f6105..e290c57 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -31,6 +31,10 @@ import org.apache.giraph.graph.VertexValueFactory; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput; +import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput; +import org.apache.giraph.io.superstep_output.SuperstepOutput; +import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput; import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; @@ -50,6 +54,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.Progressable; import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION; @@ -201,6 +206,25 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create the proper superstep output, based on the configuration settings. + * + * @param context Mapper context + * @return SuperstepOutput + */ + public SuperstepOutput<I, V, E> createSuperstepOutput( + Mapper<?, ?, ?, ?>.Context context) { + if (doOutputDuringComputation()) { + if (vertexOutputFormatThreadSafe()) { + return new MultiThreadedSuperstepOutput<I, V, E>(this, context); + } else { + return new SynchronizedSuperstepOutput<I, V, E>(this, context); + } + } else { + return new NoOpSuperstepOutput<I, V, E>(); + } + } + + /** * Does the job have an {@link EdgeInputFormat}? * * @return True iff an {@link EdgeInputFormat} has been specified. http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 4840471..51ed4f6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -22,6 +22,7 @@ import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.messages.MessageStoreByPartition; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.io.SimpleVertexWriter; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.MetricNames; import org.apache.giraph.metrics.SuperstepMetricsRegistry; @@ -89,6 +90,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, /** Sends the messages (unique per Callable) */ private WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor; + /** VertexWriter for this ComputeCallable */ + private SimpleVertexWriter<I, V, E> vertexWriter; /** Get the start time in nanos */ private final long startNanos = TIME.getNanoseconds(); @@ -143,6 +146,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, context, graphState.getGraphTaskManager(), workerClientRequestProcessor, aggregatorUsage); + vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter(); + List<PartitionStats> partitionStatsList = Lists.newArrayList(); while (!partitionIdQueue.isEmpty()) { Integer partitionId = partitionIdQueue.poll(); @@ -165,11 +170,17 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, } catch (IOException e) { throw new IllegalStateException("call: Caught unexpected IOException," + " failing.", e); + } catch (InterruptedException e) { + throw new IllegalStateException("call: Caught unexpected " + + "InterruptedException, failing.", e); } finally { serviceWorker.getPartitionStore().putPartition(partition); } } + // Return VertexWriter after the usage + serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter); + if (LOG.isInfoEnabled()) { float seconds = Times.getNanosSince(TIME, startNanos) / Time.NS_PER_SECOND_AS_FLOAT; @@ -193,7 +204,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, * @return Partition stats for this computed partition */ private PartitionStats computePartition(Partition<I, V, E, M> partition) - throws IOException { + throws IOException, InterruptedException { PartitionStats partitionStats = new PartitionStats(partition.getId(), 0, 0, 0, 0); // Make sure this is thread-safe across runs @@ -225,6 +236,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, } // Need to unwrap the mutated edges (possibly) vertex.unwrapMutableEdges(); + // Write vertex to superstep output (no-op if it is not used) + vertexWriter.writeVertex(vertex); // Need to save the vertex changes (possibly) partition.saveVertex(vertex); } http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 57f7dff..3ae5ed3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -288,9 +288,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, /** * Handle post-application callbacks. */ - private void postApplication() { + private void postApplication() throws IOException, InterruptedException { GiraphTimerContext postAppTimerContext = wcPostAppTimer.time(); serviceWorker.getWorkerContext().postApplication(); + serviceWorker.getSuperstepOutput().postApplication(); postAppTimerContext.stop(); context.progress(); http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java new file mode 100644 index 0000000..e4c3496 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +/** + * Interface which can only write vertices + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public interface SimpleVertexWriter<I extends WritableComparable, + V extends Writable, E extends Writable> { + /** + * Writes the next vertex and associated data + * + * @param vertex set the properties of this vertex + * @throws IOException + * @throws InterruptedException + */ + void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, + InterruptedException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java index 38c5548..2aa3a71 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java @@ -20,7 +20,6 @@ package org.apache.giraph.io; import java.io.IOException; -import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -34,7 +33,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; */ @SuppressWarnings("rawtypes") public interface VertexWriter<I extends WritableComparable, V extends Writable, - E extends Writable> { + E extends Writable> extends SimpleVertexWriter<I, V, E> { /** * Use the context to setup writing the vertices. * Guaranteed to be called prior to any other function. @@ -47,16 +46,6 @@ public interface VertexWriter<I extends WritableComparable, V extends Writable, InterruptedException; /** - * Writes the next vertex and associated data - * - * @param vertex set the properties of this vertex - * @throws IOException - * @throws InterruptedException - */ - void writeVertex(Vertex<I, V, E, ?> vertex) - throws IOException, InterruptedException; - - /** * Close this {@link VertexWriter} to future operations. * * @param context the context of the task http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java new file mode 100644 index 0000000..a09f915 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.superstep_output; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.io.SimpleVertexWriter; +import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.VertexWriter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +/** + * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is + * thread-safe. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public class MultiThreadedSuperstepOutput<I extends WritableComparable, + V extends Writable, E extends Writable> implements + SuperstepOutput<I, V, E> { + /** Mapper context */ + private final Mapper<?, ?, ?, ?>.Context context; + /** Vertex output format, used to get new vertex writers */ + private final VertexOutputFormat<I, V, E> vertexOutputFormat; + /** + * List of returned vertex writers, these can be reused and will all be + * closed in the end of the application + */ + private final List<VertexWriter<I, V, E>> availableVertexWriters; + /** Vertex writes which were created by this class and are currently used */ + private final Set<VertexWriter<I, V, E>> occupiedVertexWriters; + + /** + * Constructor + * + * @param conf Configuration + * @param context Mapper context + */ + public MultiThreadedSuperstepOutput( + ImmutableClassesGiraphConfiguration<I, V, E, ?> conf, + Mapper<?, ?, ?, ?>.Context context) { + vertexOutputFormat = conf.createVertexOutputFormat(); + this.context = context; + availableVertexWriters = Lists.newArrayList(); + occupiedVertexWriters = Sets.newHashSet(); + } + + @Override + public synchronized SimpleVertexWriter<I, V, E> getVertexWriter() { + VertexWriter<I, V, E> vertexWriter; + if (availableVertexWriters.isEmpty()) { + try { + vertexWriter = vertexOutputFormat.createVertexWriter(context); + vertexWriter.initialize(context); + } catch (IOException e) { + throw new IllegalStateException("getVertexWriter: " + + "IOException occurred", e); + } catch (InterruptedException e) { + throw new IllegalStateException("getVertexWriter: " + + "InterruptedException occurred", e); + } + } else { + vertexWriter = + availableVertexWriters.remove(availableVertexWriters.size() - 1); + } + occupiedVertexWriters.add(vertexWriter); + return vertexWriter; + } + + @Override + public synchronized void returnVertexWriter( + SimpleVertexWriter<I, V, E> vertexWriter) { + VertexWriter<I, V, E> returnedWriter = (VertexWriter<I, V, E>) vertexWriter; + if (!occupiedVertexWriters.remove(returnedWriter)) { + throw new IllegalStateException("returnVertexWriter: " + + "Returned vertex writer which is not currently occupied!"); + } + availableVertexWriters.add(returnedWriter); + } + + @Override + public synchronized void postApplication() throws IOException, + InterruptedException { + if (!occupiedVertexWriters.isEmpty()) { + throw new IllegalStateException("postApplication: " + + occupiedVertexWriters.size() + + " vertex writers were not returned!"); + } + for (VertexWriter<I, V, E> vertexWriter : availableVertexWriters) { + vertexWriter.close(context); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java new file mode 100644 index 0000000..82684b2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.superstep_output; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.SimpleVertexWriter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +/** + * Class to use as {@link SuperstepOutput} when we don't have output during + * computation. All the methods are no-ops. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public class NoOpSuperstepOutput<I extends WritableComparable, + V extends Writable, E extends Writable> implements + SuperstepOutput<I, V, E> { + @Override + public SimpleVertexWriter<I, V, E> getVertexWriter() { + return new SimpleVertexWriter<I, V, E>() { + @Override + public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, + InterruptedException { + } + }; + } + + @Override + public void returnVertexWriter( + SimpleVertexWriter<I, V, E> vertexWriter) { + } + + @Override + public void postApplication() { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java new file mode 100644 index 0000000..107039a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.superstep_output; + +import org.apache.giraph.io.SimpleVertexWriter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +/** + * Interface for outputing data during the computation. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public interface SuperstepOutput<I extends WritableComparable, + V extends Writable, E extends Writable> { + + /** + * Get the Writer. You have to return it after usage in order for it to be + * properly closed. + * + * @return SimpleVertexWriter + */ + SimpleVertexWriter<I, V, E> getVertexWriter(); + + /** + * Return the Writer after usage, which you got by calling + * {@link #getVertexWriter()} + * + * @param vertexWriter SimpleVertexWriter which you are returning + */ + void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter); + + /** + * Finalize this output in the end of the application + */ + void postApplication() throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java new file mode 100644 index 0000000..9617b24 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.superstep_output; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.SimpleVertexWriter; +import org.apache.giraph.io.VertexWriter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +/** + * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is + * not thread-safe. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public class SynchronizedSuperstepOutput<I extends WritableComparable, + V extends Writable, E extends Writable> implements + SuperstepOutput<I, V, E> { + /** Mapper context */ + private final Mapper<?, ?, ?, ?>.Context context; + /** Main vertex writer */ + private final VertexWriter<I, V, E> vertexWriter; + /** + * Simple vertex writer, wrapper for {@link #vertexWriter}. + * Call to writeVertex is thread-safe. + */ + private final SimpleVertexWriter<I, V, E> simpleVertexWriter; + + /** + * Constructor + * + * @param conf Configuration + * @param context Mapper context + */ + public SynchronizedSuperstepOutput( + ImmutableClassesGiraphConfiguration<I, V, E, ?> conf, + Mapper<?, ?, ?, ?>.Context context) { + this.context = context; + try { + vertexWriter = + conf.createVertexOutputFormat().createVertexWriter(context); + vertexWriter.initialize(context); + } catch (IOException e) { + throw new IllegalStateException("SynchronizedSuperstepOutput: " + + "IOException occurred", e); + } catch (InterruptedException e) { + throw new IllegalStateException("SynchronizedSuperstepOutput: " + + "InterruptedException occurred", e); + } + simpleVertexWriter = new SimpleVertexWriter<I, V, E>() { + @Override + public synchronized void writeVertex( + Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException { + vertexWriter.writeVertex(vertex); + } + }; + } + + @Override + public SimpleVertexWriter<I, V, E> getVertexWriter() { + return simpleVertexWriter; + } + + @Override + public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) { + } + + @Override + public void postApplication() throws IOException, InterruptedException { + vertexWriter.close(context); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java new file mode 100644 index 0000000..6523c74 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Classes related to output during computation + */ +package org.apache.giraph.io.superstep_output; http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 74c1f87..35db999 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -41,6 +41,7 @@ import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.GlobalStats; +import org.apache.giraph.io.superstep_output.SuperstepOutput; import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexOutputFormat; @@ -149,6 +150,9 @@ public class BspServiceWorker<I extends WritableComparable, /** Handler for aggregators */ private final WorkerAggregatorHandler aggregatorHandler; + /** Superstep output */ + private SuperstepOutput<I, V, E> superstepOutput; + /** array of observers to call back to */ private final WorkerObserver[] observers; @@ -175,7 +179,7 @@ public class BspServiceWorker<I extends WritableComparable, GraphTaskManager<I, V, E, M> graphTaskManager) throws IOException, InterruptedException { super(serverPortList, sessionMsecTimeout, context, graphTaskManager); - ImmutableClassesGiraphConfiguration conf = getConfiguration(); + ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration(); partitionExchangeChildrenChanged = new PredicateLock(context); registerBspEvent(partitionExchangeChildrenChanged); workerGraphPartitioner = @@ -193,6 +197,8 @@ public class BspServiceWorker<I extends WritableComparable, aggregatorHandler = new WorkerAggregatorHandler(this, conf, context); + superstepOutput = conf.createSuperstepOutput(context); + if (conf.isJMapHistogramDumpEnabled()) { conf.addWorkerObserverClass(JMapHistoDumper.class); } @@ -926,6 +932,14 @@ else[HADOOP_NON_SECURE]*/ " not specified -- there will be no saved output"); return; } + if (getConfiguration().doOutputDuringComputation()) { + if (LOG.isInfoEnabled()) { + LOG.info("saveVertices: The option for doing output during " + + "computation is selected, so there will be no saving of the " + + "output in the end of application"); + } + return; + } LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "saveVertices: Starting to save " + numLocalVertices + " vertices"); @@ -1464,4 +1478,9 @@ else[HADOOP_NON_SECURE]*/ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); } } + + @Override + public SuperstepOutput<I, V, E> getSuperstepOutput() { + return superstepOutput; + } }
