GIRAPH-732
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ae01f039 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ae01f039 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ae01f039 Branch: refs/heads/trunk Commit: ae01f0399cae6baab045b2ece0d71096aebe8ca3 Parents: fa6b754 Author: Claudio Martella <[email protected]> Authored: Mon Aug 26 22:15:18 2013 +0200 Committer: Claudio Martella <[email protected]> Committed: Mon Aug 26 22:15:18 2013 +0200 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/GiraphRunner.java | 2 +- .../org/apache/giraph/conf/GiraphClasses.java | 25 + .../apache/giraph/conf/GiraphConfiguration.java | 57 + .../org/apache/giraph/conf/GiraphConstants.java | 23 + .../ImmutableClassesGiraphConfiguration.java | 45 + .../org/apache/giraph/io/EdgeOutputFormat.java | 82 + .../java/org/apache/giraph/io/EdgeWriter.java | 74 + .../io/formats/GiraphTextOutputFormat.java | 90 + .../io/formats/IdWithValueTextOutputFormat.java | 19 +- .../SrcIdDstIdEdgeValueTextOutputFormat.java | 91 ++ .../giraph/io/formats/TextEdgeOutputFormat.java | 165 ++ .../io/formats/TextVertexOutputFormat.java | 15 +- .../io/internal/WrappedEdgeOutputFormat.java | 169 ++ .../apache/giraph/utils/ConfigurationUtils.java | 53 +- .../apache/giraph/worker/BspServiceWorker.java | 124 +- .../giraph/worker/BspServiceWorker.java.orig | 1535 ++++++++++++++++++ ...TestSrcIdDstIdEdgeValueTextOutputFormat.java | 114 ++ 18 files changed, 2659 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index af43ef8..deca52b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-732: EdgeOutputFormat (aarmax00 via claudio) + GIRAPH-512: JavaDoc warnings (tdn120 via nitay) GIRAPH-736: Bring back FindBugs (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java index 1bd79b5..9af50e1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java +++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java @@ -102,7 +102,7 @@ public class GiraphRunner implements Tool { */ private void prepareHadoopMRJob(final GiraphJob job, final CommandLine cmd) throws Exception { - if (cmd.hasOption("of")) { + if (cmd.hasOption("vof") || cmd.hasOption("eof")) { if (cmd.hasOption("op")) { FileOutputFormat.setOutputPath(job.getInternalJob(), new Path(cmd.getOptionValue("op"))); http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java index 71fe885..f97446f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java @@ -28,6 +28,7 @@ import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.DefaultVertexResolver; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeOutputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.DefaultEdgeInputFilter; @@ -85,6 +86,9 @@ public class GiraphClasses<I extends WritableComparable, /** Edge input format class - cached for fast access */ protected Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass; + /** Edge output format class - cached for fast access */ + protected Class<? extends EdgeOutputFormat<I, V, E>> + edgeOutputFormatClass; /** Aggregator writer class - cached for fast access */ protected Class<? extends AggregatorWriter> aggregatorWriterClass; @@ -168,6 +172,8 @@ public class GiraphClasses<I extends WritableComparable, VERTEX_OUTPUT_FORMAT_CLASS.get(conf); edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>) EDGE_INPUT_FORMAT_CLASS.get(conf); + edgeOutputFormatClass = (Class<? extends EdgeOutputFormat<I, V, E>>) + EDGE_OUTPUT_FORMAT_CLASS.get(conf); aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf); combinerClass = (Class<? extends Combiner<I, ? extends Writable>>) @@ -347,6 +353,25 @@ public class GiraphClasses<I extends WritableComparable, } /** + * Check if EdgeOutputFormat is set + * + * @return true if EdgeOutputFormat is set + */ + public boolean hasEdgeOutputFormat() { + return edgeOutputFormatClass != null; + } + + /** + * Get VertexOutputFormat set + * + * @return VertexOutputFormat + */ + public Class<? extends EdgeOutputFormat<I, V, E>> + getEdgeOutputFormatClass() { + return edgeOutputFormatClass; + } + + /** * Check if AggregatorWriter is set * * @return true if AggregatorWriter is set http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 23bcd32..15ff861 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -27,6 +27,7 @@ import org.apache.giraph.factories.VertexValueFactory; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeOutputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.EdgeInputFilter; @@ -345,6 +346,25 @@ public class GiraphConfiguration extends Configuration VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass); } + + /** + * Does the job have a {@link EdgeOutputFormat} subdir? + * + * @return True iff a {@link EdgeOutputFormat} subdir has been specified. + */ + public boolean hasVertexOutputFormatSubdir() { + return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty(); + } + + /** + * Set the vertex output format path + * + * @param path path where the verteces will be written + */ + public final void setVertexOutputFormatSubdir(String path) { + VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path); + } + /** * Check if output should be done during computation * @@ -386,6 +406,43 @@ public class GiraphConfiguration extends Configuration } /** + * Does the job have a {@link EdgeOutputFormat}? + * + * @return True iff a {@link EdgeOutputFormat} has been specified. + */ + public boolean hasEdgeOutputFormat() { + return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null; + } + + /** + * Set the edge output format class (optional) + * + * @param edgeOutputFormatClass Determines how graph is output + */ + public final void setEdgeOutputFormatClass( + Class<? extends EdgeOutputFormat> edgeOutputFormatClass) { + EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass); + } + + /** + * Does the job have a {@link EdgeOutputFormat} subdir? + * + * @return True iff a {@link EdgeOutputFormat} subdir has been specified. + */ + public boolean hasEdgeOutputFormatSubdir() { + return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty(); + } + + /** + * Set the edge output format path + * + * @param path path where the edges will be written + */ + public final void setEdgeOutputFormatSubdir(String path) { + EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path); + } + + /** * Get the number of threads to use for writing output in the end of the * application. If output format is not thread safe, returns 1. * http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index c276c2a..604729a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -38,6 +38,7 @@ import org.apache.giraph.graph.DefaultVertexResolver; import org.apache.giraph.graph.Language; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeOutputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.DefaultEdgeInputFilter; @@ -210,6 +211,28 @@ public interface GiraphConstants { ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS = ClassConfOption.create("giraph.vertexOutputFormatClass", null, VertexOutputFormat.class, "VertexOutputFormat class"); + /** EdgeOutputFormat sub-directory */ + StrConfOption VERTEX_OUTPUT_FORMAT_SUBDIR = + new StrConfOption("giraph.vertex.output.subdir", "", + "VertexOutputFormat sub-directory"); + /** EdgeOutputFormat class */ + ClassConfOption<EdgeOutputFormat> EDGE_OUTPUT_FORMAT_CLASS = + ClassConfOption.create("giraph.edgeOutputFormatClass", null, + EdgeOutputFormat.class, "EdgeOutputFormat class"); + /** EdgeOutputFormat sub-directory */ + StrConfOption EDGE_OUTPUT_FORMAT_SUBDIR = + new StrConfOption("giraph.edge.output.subdir", "edges", + "EdgeOutputFormat sub-directory"); + + /** GiraphTextOuputFormat Separator */ + StrConfOption GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR = + new StrConfOption("giraph.textoutputformat.separator", "\t", + "GiraphTextOuputFormat Separator"); + /** Reverse values in the output */ + BooleanConfOption GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE = + new BooleanConfOption("giraph.textoutputformat.reverse", false, + "Reverse values in the output"); + /** * If you use this option, instead of having saving vertices in the end of * application, saveVertex will be called right after each vertex.compute() http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 49a2ebc..2506c21 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -36,11 +36,13 @@ import org.apache.giraph.graph.Language; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeOutputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.io.internal.WrappedEdgeInputFormat; +import org.apache.giraph.io.internal.WrappedEdgeOutputFormat; import org.apache.giraph.io.internal.WrappedVertexInputFormat; import org.apache.giraph.io.internal.WrappedVertexOutputFormat; import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput; @@ -289,6 +291,49 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, return wrappedVertexOutputFormat; } + @Override + public boolean hasEdgeOutputFormat() { + return classes.hasEdgeOutputFormat(); + } + + /** + * Get the user's subclassed + * {@link org.apache.giraph.io.EdgeOutputFormat}. + * + * @return User's edge output format class + */ + public Class<? extends EdgeOutputFormat<I, V, E>> + getEdgeOutputFormatClass() { + return classes.getEdgeOutputFormatClass(); + } + + /** + * Create a user edge output format class. + * Note: Giraph should only use WrappedEdgeOutputFormat, + * which makes sure that Configuration parameters are set properly. + * + * @return Instantiated user edge output format class + */ + private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() { + Class<? extends EdgeOutputFormat<I, V, E>> klass = + getEdgeOutputFormatClass(); + return ReflectionUtils.newInstance(klass, this); + } + + /** + * Create a wrapper for user edge output format, + * which makes sure that Configuration parameters are set properly in all + * methods related to this format. + * + * @return Wrapper around user edge output format + */ + public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() { + WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat = + new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat()); + configureIfPossible(wrappedEdgeOutputFormat); + return wrappedEdgeOutputFormat; + } + /** * Create the proper superstep output, based on the configuration settings. * http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java new file mode 100644 index 0000000..ac4c6ce --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import java.io.IOException; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * abstract class which can only write edges + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +@SuppressWarnings("rawtypes") +public abstract class EdgeOutputFormat< + I extends WritableComparable, V extends Writable, + E extends Writable> extends + DefaultImmutableClassesGiraphConfigurable<I, V, E> { + /** + * Create an edge writer for a given split. The framework will call + * {@link EdgeWriter#initialize(TaskAttemptContext)} before + * the split is used. + * + * @param context the information about the task + * @return a new vertex writer + * @throws IOException + * @throws InterruptedException + */ + public abstract EdgeWriter<I, V, E> createEdgeWriter( + TaskAttemptContext context) throws IOException, InterruptedException; + + /** + * Check for validity of the output-specification for the job. + * (Copied from Hadoop OutputFormat) + * + * <p>This is to validate the output specification for the job when it is + * a job is submitted. Typically checks that it does not already exist, + * throwing an exception when it already exists, so that output is not + * overwritten.</p> + * + * @param context information about the job + * @throws IOException when output should not be attempted + */ + public abstract void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException; + + /** + * Get the output committer for this output format. This is responsible + * for ensuring the output is committed correctly. + * (Copied from Hadoop OutputFormat) + * + * @param context the task context + * @return an output committer + * @throws IOException + * @throws InterruptedException + */ + public abstract OutputCommitter getOutputCommitter( + TaskAttemptContext context) throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java new file mode 100644 index 0000000..e5a78c2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import java.io.IOException; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.edge.Edge; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +@SuppressWarnings("rawtypes") +public abstract class EdgeWriter< + I extends WritableComparable, V extends Writable, + E extends Writable> extends + DefaultImmutableClassesGiraphConfigurable<I, V, E> { + + /** + * Writes the next vertex and associated data + * + * @param sourceId the vertex ID from which the edge originates + * @param sourceValue the vertex value; the vertex is the one from which + * the edge originates + * @param edge edge to be written + * @throws IOException + * @throws InterruptedException + */ + public abstract void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge) + throws IOException, InterruptedException; + + /** + * Use the context to setup writing the edges. + * Guaranteed to be called prior to any other function. + * + * @param context Context used to write the vertices. + * @throws IOException + * @throws InterruptedException + */ + public abstract void initialize(TaskAttemptContext context) + throws IOException, InterruptedException; + + /** + * Close this {@link EdgeWriter} to future operations. + * + * @param context the context of the task + * @throws IOException + * @throws InterruptedException + */ + public abstract void close(TaskAttemptContext context) + throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java new file mode 100644 index 0000000..582dea2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * The text output format used for Giraph text writing. + */ +public abstract class GiraphTextOutputFormat + extends TextOutputFormat<Text, Text> { + + @Override + public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + String extension = ""; + CompressionCodec codec = null; + Configuration conf = job.getConfiguration(); + boolean isCompressed = getCompressOutput(job); + + if (isCompressed) { + Class<? extends CompressionCodec> codecClass = + getOutputCompressorClass(job, GzipCodec.class); + codec = + (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); + extension = codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(job, extension); + + /* adjust the path */ + FSDataOutputStream fileOut; + FileSystem fs = file.getFileSystem(conf); + String subdir = getSubdir(); + if (!subdir.isEmpty()) { + Path subdirPath = new Path(subdir); + Path subdirAbsPath = new Path(file.getParent(), subdirPath); + Path vertexFile = new Path(subdirAbsPath, file.getName()); + fileOut = fs.create(vertexFile, false); + } else { + fileOut = fs.create(file, false); + } + + String separator = "\t"; + + if (!isCompressed) { + return new LineRecordWriter<Text, Text>(fileOut, separator); + } else { + DataOutputStream out = + new DataOutputStream(codec.createOutputStream(fileOut)); + return new LineRecordWriter<Text, Text>(out, separator); + } + } + + /** + * This function is used to provide an additional path level to keep + * different text outputs into different directories. + * + * @return the subdirectory to be created under the output path + */ + protected abstract String getSubdir(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java index bd69586..e886059 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java @@ -77,19 +77,18 @@ public class IdWithValueTextOutputFormat<I extends WritableComparable, @Override protected Text convertVertexToLine(Vertex<I, V, E> vertex) throws IOException { - String first; - String second; + + StringBuilder str = new StringBuilder(); if (reverseOutput) { - first = vertex.getValue().toString(); - second = vertex.getId().toString(); + str.append(vertex.getValue().toString()); + str.append(delimiter); + str.append(vertex.getId().toString()); } else { - first = vertex.getId().toString(); - second = vertex.getValue().toString(); + str.append(vertex.getId().toString()); + str.append(delimiter); + str.append(vertex.getValue().toString()); } - Text line = new Text(first + delimiter + second); - return line; + return new Text(str.toString()); } - } - } http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java new file mode 100644 index 0000000..1d7478f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import java.io.IOException; + +import org.apache.giraph.edge.Edge; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR; +import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE; + +/** + * Write out Edge Value with Source and Destination ID, but not the vertex + * value. + * This is a demostration output format to show the possibility to separately + * output edges from vertices. + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + */ +@SuppressWarnings("rawtypes") +public class SrcIdDstIdEdgeValueTextOutputFormat<I extends WritableComparable, + V extends Writable, E extends Writable> + extends TextEdgeOutputFormat<I, V, E> { + + @Override + public TextEdgeWriter createEdgeWriter(TaskAttemptContext context) { + return new SrcIdDstIdEdgeValueEdgeWriter(); + } + + /** + * Edge writer used with {@link SrcIdDstIdEdgeValueTextOutputFormat}. + */ + protected class SrcIdDstIdEdgeValueEdgeWriter + extends TextEdgeWriterToEachLine { + + /** Saved delimiter */ + private String delimiter; + /** Cached reserve option */ + private boolean reverseOutput; + + @Override + public void initialize(TaskAttemptContext context) + throws IOException, InterruptedException { + super.initialize(context); + delimiter = GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR.get(getConf()); + reverseOutput = GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE.get(getConf()); + } + + @Override + protected Text convertEdgeToLine(I sourceId, V sourceValue, Edge<I, E> edge) + throws IOException { + StringBuilder msg = new StringBuilder(); + if (reverseOutput) { + msg.append(edge.getValue().toString()); + msg.append(delimiter); + msg.append(edge.getTargetVertexId().toString()); + msg.append(delimiter); + msg.append(sourceId.toString()); + } else { + msg.append(sourceId.toString()); + msg.append(delimiter); + msg.append(edge.getTargetVertexId().toString()); + msg.append(delimiter); + msg.append(edge.getValue().toString()); + } + return new Text(msg.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java new file mode 100644 index 0000000..1b20c57 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR; + +import java.io.IOException; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.EdgeOutputFormat; +import org.apache.giraph.io.EdgeWriter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Abstract class that users should subclass to use their own text based + * edge output format. + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + */ +@SuppressWarnings("rawtypes") +public abstract class TextEdgeOutputFormat<I extends WritableComparable, + V extends Writable, E extends Writable> + extends EdgeOutputFormat<I, V, E> { + /** Uses the TextOutputFormat to do everything */ + protected GiraphTextOutputFormat textOutputFormat = + new GiraphTextOutputFormat() { + @Override + protected String getSubdir() { + return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf()); + } + }; + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + textOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return textOutputFormat.getOutputCommitter(context); + } + + /** + * The factory method which produces the {@link TextEdgeWriter} used by this + * output format. + * + * @param context the information about the task + * @return the text edge writer to be used + */ + @Override + public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext + context) throws IOException, InterruptedException; + + /** + * Abstract class to be implemented by the user based on their specific + * edge output. Easiest to ignore the key value separator and only use + * key instead. + */ + protected abstract class TextEdgeWriter + extends EdgeWriter<I, V, E> { + /** Internal line record writer */ + private RecordWriter<Text, Text> lineRecordWriter; + /** Context passed to initialize */ + private TaskAttemptContext context; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, + InterruptedException { + lineRecordWriter = createLineRecordWriter(context); + this.context = context; + } + + /** + * Create the line record writer. Override this to use a different + * underlying record writer (useful for testing). + * + * @param context the context passed to initialize + * @return the record writer to be used + * @throws IOException exception that can be thrown during creation + * @throws InterruptedException exception that can be thrown during creation + */ + protected RecordWriter<Text, Text> createLineRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return textOutputFormat.getRecordWriter(context); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + lineRecordWriter.close(context); + } + + /** + * Get the line record writer. + * + * @return Record writer to be used for writing. + */ + public RecordWriter<Text, Text> getRecordWriter() { + return lineRecordWriter; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + public TaskAttemptContext getContext() { + return context; + } + } + + /** + * Abstract class to be implemented by the user to write a line for each + * edge. + */ + protected abstract class TextEdgeWriterToEachLine extends TextEdgeWriter { + + @Override + public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge) + throws IOException, InterruptedException { + + // Note we are writing line as key with null value + getRecordWriter().write( + convertEdgeToLine(sourceId, sourceValue, edge), null); + } + + /** + * Writes a line for the given edge. + * + * @param sourceId the current id of the source vertex + * @param sourceValue the current value of the source vertex + * @param edge the current vertex for writing + * @return the text line to be written + * @throws IOException exception that can be thrown while writing + */ + protected abstract Text convertEdgeToLine(I sourceId, + V sourceValue, Edge<I, E> edge) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java index c91d543..c57ecd7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.giraph.io.formats; import java.io.IOException; + import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; @@ -29,7 +30,8 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR; /** * Abstract class that users should subclass to use their own text based @@ -43,10 +45,14 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public abstract class TextVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends VertexOutputFormat<I, V, E> { - /** Uses the TextOutputFormat to do everything */ - protected TextOutputFormat<Text, Text> textOutputFormat = - new TextOutputFormat<Text, Text>(); + protected GiraphTextOutputFormat textOutputFormat = + new GiraphTextOutputFormat() { + @Override + protected String getSubdir() { + return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf()); + } + }; @Override public void checkOutputSpecs(JobContext context) @@ -161,5 +167,4 @@ public abstract class TextVertexOutputFormat<I extends WritableComparable, protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex) throws IOException; } - } http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java new file mode 100644 index 0000000..2222255 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.internal; + +import java.io.IOException; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.EdgeOutputFormat; +import org.apache.giraph.io.EdgeWriter; +import org.apache.giraph.job.HadoopUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * For internal use only. + * + * Wraps user set {@link EdgeOutputFormat} to make sure proper configuration + * parameters are passed around, that user can set parameters in + * configuration and they will be available in other methods related to this + * format. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +@SuppressWarnings("rawtypes") +public class WrappedEdgeOutputFormat<I extends WritableComparable, + V extends Writable, E extends Writable> + extends EdgeOutputFormat<I, V, E> { + + /** {@link EdgeOutputFormat} which is wrapped */ + private final EdgeOutputFormat<I, V, E> originalOutputFormat; + + /** + * Constructor + * + * @param edgeOutputFormat Edge output format to wrap + */ + public WrappedEdgeOutputFormat( + EdgeOutputFormat<I, V, E> edgeOutputFormat) { + originalOutputFormat = edgeOutputFormat; + } + + @Override + public EdgeWriter<I, V, E> createEdgeWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + final EdgeWriter<I, V, E> edgeWriter = + originalOutputFormat.createEdgeWriter( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + return new EdgeWriter<I, V, E>() { + @Override + public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) { + super.setConf(conf); + edgeWriter.setConf(conf); + } + + @Override + public void initialize(TaskAttemptContext context) + throws IOException, InterruptedException { + edgeWriter.initialize( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + } + + @Override + public void close( + TaskAttemptContext context) throws IOException, InterruptedException { + edgeWriter.close( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + } + + @Override + public void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge) + throws IOException, InterruptedException { + edgeWriter.writeEdge(sourceId, sourceValue, edge); + } + }; + } + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + originalOutputFormat.checkOutputSpecs( + HadoopUtils.makeJobContext(getConf(), context)); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + + final OutputCommitter outputCommitter = + originalOutputFormat.getOutputCommitter( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + + return new OutputCommitter() { + @Override + public void setupJob(JobContext context) throws IOException { + outputCommitter.setupJob( + HadoopUtils.makeJobContext(getConf(), context)); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + outputCommitter.setupTask( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + } + + @Override + public boolean needsTaskCommit( + TaskAttemptContext context) throws IOException { + return outputCommitter.needsTaskCommit( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + outputCommitter.commitTask( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + outputCommitter.abortTask( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + } + + @Override + public void cleanupJob(JobContext context) throws IOException { + outputCommitter.cleanupJob( + HadoopUtils.makeJobContext(getConf(), context)); + } + + /*if_not[HADOOP_NON_COMMIT_JOB]*/ + @Override + public void commitJob(JobContext context) throws IOException { + outputCommitter.commitJob( + HadoopUtils.makeJobContext(getConf(), context)); + } + + @Override + public void abortJob(JobContext context, + JobStatus.State state) throws IOException { + outputCommitter.abortJob( + HadoopUtils.makeJobContext(getConf(), context), state); + } + /*end[HADOOP_NON_COMMIT_JOB]*/ + }; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java index 745764b..4bc4f4d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java @@ -36,6 +36,7 @@ import org.apache.giraph.factories.VertexValueFactory; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.Language; import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeOutputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.formats.GiraphFileInputFormat; @@ -97,10 +98,16 @@ public final class ConfigurationUtils { OPTIONS.addOption("w", "workers", true, "Number of workers"); OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format"); OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format"); - OPTIONS.addOption("of", "outputFormat", true, "Vertex output format"); + OPTIONS.addOption("vof", "vertexOutputFormat", true, + "Vertex output format"); + OPTIONS.addOption("eof", "edgeOutputFormat", true, "Edge output format"); OPTIONS.addOption("vip", "vertexInputPath", true, "Vertex input path"); OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path"); - OPTIONS.addOption("op", "outputPath", true, "Vertex output path"); + OPTIONS.addOption("op", "outputPath", true, "Output path"); + OPTIONS.addOption("vsd", "vertexSubDir", true, "subdirectory to be used " + + "for the vertex output"); + OPTIONS.addOption("esd", "edgeSubDir", true, "subdirectory to be used " + + "for the edge output"); OPTIONS.addOption("c", "combiner", true, "Combiner class"); OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class"); OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class"); @@ -316,14 +323,46 @@ public final class ConfigurationUtils { "InputFormat does not require one."); } } - if (cmd.hasOption("of")) { + if (cmd.hasOption("vof")) { conf.setVertexOutputFormatClass( (Class<? extends VertexOutputFormat>) Class - .forName(cmd.getOptionValue("of"))); + .forName(cmd.getOptionValue("vof"))); } else { if (LOG.isInfoEnabled()) { - LOG.info("No output format specified. Ensure your OutputFormat " + - "does not require one."); + LOG.info("No vertex output format specified. Ensure your " + + "OutputFormat does not require one."); + } + } + if (cmd.hasOption("vof")) { + if (cmd.hasOption("vsd")) { + conf.setVertexOutputFormatSubdir(cmd.getOptionValue("vsd")); + } + } + if (cmd.hasOption("eof")) { + conf.setEdgeOutputFormatClass( + (Class<? extends EdgeOutputFormat>) Class + .forName(cmd.getOptionValue("eof"))); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("No edge output format specified. Ensure your " + + "OutputFormat does not require one."); + } + } + if (cmd.hasOption("eof")) { + if (cmd.hasOption("esd")) { + conf.setEdgeOutputFormatSubdir(cmd.getOptionValue("esd")); + } + } + /* check for path clashes */ + if (cmd.hasOption("vof") && cmd.hasOption("eof") && cmd.hasOption("op")) { + if (!cmd.hasOption("vsd") || cmd.hasOption("esd")) { + if (!conf.hasEdgeOutputFormatSubdir() || + !conf.hasVertexOutputFormatSubdir()) { + + throw new IllegalArgumentException("If VertexOutputFormat and " + + "EdgeOutputFormat are both set, it is mandatory to provide " + + "both vertex subdirectory as well as edge subdirectory"); + } } } if (cmd.hasOption("pc")) { @@ -385,7 +424,7 @@ public final class ConfigurationUtils { Integer.parseInt(cmd.getOptionValue("yh"))); } /*if[PURE_YARN] - if (cmd.hasOption("of")) { + if (cmd.hasOption("vof") || cmd.hasOption("eof")) { if (cmd.hasOption("op")) { // For YARN conf to get the out dir we need w/o a Job obj Path outputDir = http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 9311fbd..112b76d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -32,6 +32,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; import org.apache.giraph.comm.netty.NettyWorkerServer; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.GlobalStats; @@ -40,6 +41,8 @@ import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.InputSplitPaths; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.io.EdgeOutputFormat; +import org.apache.giraph.io.EdgeWriter; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; import org.apache.giraph.io.superstep_output.SuperstepOutput; @@ -919,13 +922,15 @@ public class BspServiceWorker<I extends WritableComparable, */ private void saveVertices(long numLocalVertices) throws IOException, InterruptedException { - if (getConfiguration().getVertexOutputFormatClass() == null) { + ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration(); + + if (conf.getVertexOutputFormatClass() == null) { LOG.warn("saveVertices: " + GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS + " not specified -- there will be no saved output"); return; } - if (getConfiguration().doOutputDuringComputation()) { + if (conf.doOutputDuringComputation()) { if (LOG.isInfoEnabled()) { LOG.info("saveVertices: The option for doing output during " + "computation is selected, so there will be no saving of the " + @@ -1024,12 +1029,126 @@ public class BspServiceWorker<I extends WritableComparable, } } + /** + * Save the edges using the user-defined EdgeOutputFormat from our + * vertexArray based on the split. + * + * @throws InterruptedException + */ + private void saveEdges() throws IOException, InterruptedException { + final ImmutableClassesGiraphConfiguration<I, V, E> conf = + getConfiguration(); + + if (conf.getEdgeOutputFormatClass() == null) { + LOG.warn("saveEdges: " + + GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS + + "Make sure that the EdgeOutputFormat is not required."); + return; + } + + final int numPartitions = getPartitionStore().getNumPartitions(); + int numThreads = Math.min(conf.getNumOutputThreads(), + numPartitions); + LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, + "saveEdges: Starting to save the edges using " + + numThreads + " threads"); + final EdgeOutputFormat<I, V, E> edgeOutputFormat = + conf.createWrappedEdgeOutputFormat(); + + final Queue<Integer> partitionIdQueue = + (numPartitions == 0) ? new LinkedList<Integer>() : + new ArrayBlockingQueue<Integer>(numPartitions); + Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds()); + + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + EdgeWriter<I, V, E> edgeWriter = + edgeOutputFormat.createEdgeWriter(getContext()); + edgeWriter.setConf(conf); + edgeWriter.initialize(getContext()); + + long nextPrintVertices = 0; + long nextPrintMsecs = System.currentTimeMillis() + 15000; + int partitionIndex = 0; + int numPartitions = getPartitionStore().getNumPartitions(); + while (!partitionIdQueue.isEmpty()) { + Integer partitionId = partitionIdQueue.poll(); + if (partitionId == null) { + break; + } + + Partition<I, V, E> partition = + getPartitionStore().getPartition(partitionId); + long vertices = 0; + long edges = 0; + long partitionEdgeCount = partition.getEdgeCount(); + for (Vertex<I, V, E> vertex : partition) { + for (Edge<I, E> edge : vertex.getEdges()) { + edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge); + ++edges; + } + ++vertices; + + // Update status at most every 250k vertices or 15 seconds + if (vertices > nextPrintVertices && + System.currentTimeMillis() > nextPrintMsecs) { + LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, + "saveEdges: Saved " + edges + + " edges out of " + partitionEdgeCount + + " partition edges, on partition " + partitionIndex + + " out of " + numPartitions); + nextPrintMsecs = System.currentTimeMillis() + 15000; + nextPrintVertices = vertices + 250000; + } + } + getPartitionStore().putPartition(partition); + ++partitionIndex; + } + edgeWriter.close(getContext()); // the temp results are saved now + return null; + } + }; + } + }; + ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads, + "save-vertices-%d", getContext()); + + LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, + "saveEdges: Done saving edges."); + // YARN: must complete the commit the "task" output, Hadoop isn't there. + if (conf.isPureYarnJob() && + conf.getVertexOutputFormatClass() != null) { + try { + OutputCommitter outputCommitter = + edgeOutputFormat.getOutputCommitter(getContext()); + if (outputCommitter.needsTaskCommit(getContext())) { + LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, + "OutputCommitter: committing task output."); + // transfer from temp dirs to "task commit" dirs to prep for + // the master's OutputCommitter#commitJob(context) call to finish. + outputCommitter.commitTask(getContext()); + } + } catch (InterruptedException ie) { + LOG.error("Interrupted while attempting to obtain " + + "OutputCommitter.", ie); + } catch (IOException ioe) { + LOG.error("Master task's attempt to commit output has " + + "FAILED.", ioe); + } + } + } + @Override public void cleanup(FinishedSuperstepStats finishedSuperstepStats) throws IOException, InterruptedException { workerClient.closeConnections(); setCachedSuperstep(getSuperstep() - 1); saveVertices(finishedSuperstepStats.getLocalVertexCount()); + saveEdges(); getPartitionStore().shutdown(); // All worker processes should denote they are done by adding special // znode. Once the number of znodes equals the number of partitions @@ -1331,7 +1450,6 @@ else[HADOOP_NON_SECURE]*/ } } - try { workerClientRequestProcessor.flush(); workerClient.waitAllRequests();
