Updated Branches: refs/heads/trunk 1684891ec -> 7a04bfd29
GIRAPH-476: SequenceFileVertexOutputFormat (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7a04bfd2 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7a04bfd2 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7a04bfd2 Branch: refs/heads/trunk Commit: 7a04bfd29dff23604f465299e0ef0f262b0da931 Parents: 1684891 Author: Nitay Joffe <[email protected]> Authored: Wed Jan 9 14:39:08 2013 -0500 Committer: Nitay Joffe <[email protected]> Committed: Wed Jan 9 14:42:23 2013 -0500 ---------------------------------------------------------------------- CHANGELOG | 2 + .../io/formats/SequenceFileVertexOutputFormat.java | 124 +++++++++++++++ 2 files changed, 126 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7a04bfd2/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index bfb9a55..d67df78 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-476: SequenceFileVertexOutputFormat (nitay) + GIRAPH-409: Refactor / cleanups (nitay) GIRAPH-465: MapFunctions cleanup (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/7a04bfd2/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java new file mode 100644 index 0000000..0538db9 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.formats; + +import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.VertexWriter; +import org.apache.giraph.vertex.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +import java.io.IOException; + +/** + * Sequence file vertex output format. It allows to convert a vertex into a key + * and value pair of desired types, and output the pair into a sequence file. + * A subclass has to provide two conversion methods convertToSequenceFileKey() + * and convertToSequenceFileValue(). + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <OK> Output key data type for a sequence file + * @param <OV> Output value data type for a sequence file + */ +public abstract class SequenceFileVertexOutputFormat< + I extends WritableComparable, + V extends Writable, + E extends Writable, + OK extends Writable, + OV extends Writable> + extends VertexOutputFormat<I, V, E> { + /** + * Output format of a sequence file that stores key-value pairs of the + * desired types. + */ + private SequenceFileOutputFormat<OK, OV> sequenceFileOutputFormat = + new SequenceFileOutputFormat<OK, OV>(); + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + sequenceFileOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return sequenceFileOutputFormat.getOutputCommitter(context); + } + + @Override + public VertexWriter createVertexWriter(TaskAttemptContext + context) throws IOException, InterruptedException { + return new SequenceFileVertexWriter(); + } + + /** + * Converts a vertex identifier into a sequence file key. + * @param vertexId Vertex identifier. + * @return Sequence file key. + */ + protected abstract OK convertToSequenceFileKey(I vertexId); + + /** + * Converts a vertex value into a sequence file value. + * @param vertexValue Vertex value. + * @return Sequence file value. + */ + protected abstract OV convertToSequenceFileValue(V vertexValue); + + /** + * Vertex writer that converts a vertex into a key-value pair and writes + * the result into a sequence file for a context. + */ + private class SequenceFileVertexWriter implements VertexWriter<I, V, E> { + /** + * A record writer that will write into a sequence file initialized for + * a context. + */ + private RecordWriter<OK, OV> recordWriter; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, + InterruptedException { + recordWriter = sequenceFileOutputFormat.getRecordWriter(context); + } + + @Override + public final void writeVertex(Vertex<I, V, E, ?> vertex) throws + IOException, InterruptedException { + // Convert vertex id to type OK. + OK outKey = convertToSequenceFileKey(vertex.getId()); + // Convert vertex value to type OV. + OV outValue = convertToSequenceFileValue(vertex.getValue()); + recordWriter.write(outKey, outValue); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + recordWriter.close(context); + } + } +}
