http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java new file mode 100644 index 0000000..293bd0e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Edge; +import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.graph.EdgeWithSource; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * Abstract class that users should subclass to use their own text based + * edge output format. + * + * @param <I> Vertex id + * @param <E> Edge data + */ +@SuppressWarnings("rawtypes") +public abstract class TextEdgeInputFormat<I extends WritableComparable, + E extends Writable> extends EdgeInputFormat<I, E> { + /** Underlying GiraphTextInputFormat. */ + protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); + + @Override + public List<InputSplit> getSplits( + JobContext context, int numWorkers) throws IOException, + InterruptedException { + // Ignore the hint of numWorkers here since we are using + // GiraphTextInputFormat to do this for us + return textInputFormat.getEdgeSplits(context); + } + + @Override + public abstract TextEdgeReader createEdgeReader( + InputSplit split, TaskAttemptContext context) throws IOException; + + /** + * {@link EdgeReader} for {@link TextEdgeInputFormat}. + */ + protected abstract class TextEdgeReader implements EdgeReader<I, E> { + /** Internal line record reader */ + private RecordReader<LongWritable, Text> lineRecordReader; + /** Context passed to initialize */ + private TaskAttemptContext context; + /** + * Cached configuration. We don't care about vertex value and message type. + */ + private ImmutableClassesGiraphConfiguration<I, NullWritable, E, + NullWritable> conf; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + this.context = context; + conf = new ImmutableClassesGiraphConfiguration<I, NullWritable, E, + NullWritable>(context.getConfiguration()); + lineRecordReader = createLineRecordReader(inputSplit, context); + lineRecordReader.initialize(inputSplit, context); + } + + /** + * Create the line record reader. Override this to use a different + * underlying record reader (useful for testing). + * + * @param inputSplit + * the split to read + * @param context + * the context passed to initialize + * @return + * the record reader to be used + * @throws IOException + * exception that can be thrown during creation + * @throws InterruptedException + * exception that can be thrown during creation + */ + protected RecordReader<LongWritable, Text> + createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return textInputFormat.createRecordReader(inputSplit, context); + } + + @Override + public void close() throws IOException { + lineRecordReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return lineRecordReader.getProgress(); + } + + /** + * Get the line record reader. + * + * @return Record reader to be used for reading. + */ + protected RecordReader<LongWritable, Text> getRecordReader() { + return lineRecordReader; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + + /** + * Get the configuration. + * + * @return Configuration for this reader + */ + protected ImmutableClassesGiraphConfiguration<I, NullWritable, E, + NullWritable> getConf() { + return conf; + } + } + + /** + * Abstract class to be implemented by the user to read an edge from each + * text line. + */ + protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader { + @Override + public final EdgeWithSource<I, E> getCurrentEdge() throws IOException, + InterruptedException { + Text line = getRecordReader().getCurrentValue(); + I sourceVertexId = getSourceVertexId(line); + I targetVertexId = getTargetVertexId(line); + E edgeValue = getValue(line); + return new EdgeWithSource<I, E>(sourceVertexId, + new Edge<I, E>(targetVertexId, edgeValue)); + } + + @Override + public final boolean nextEdge() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /** + * Reads source vertex id from the current line. + * + * @param line + * the current line + * @return + * the source vertex id corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getSourceVertexId(Text line) throws IOException; + + + /** + * Reads target vertex id from the current line. + * + * @param line + * the current line + * @return + * the target vertex id corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getTargetVertexId(Text line) throws IOException; + + /** + * Reads edge value from the current line. + * + * @param line + * the current line + * @return + * the edge value corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract E getValue(Text line) throws IOException; + } + + /** + * Abstract class to be implemented by the user to read an edge from each + * text line after preprocessing it. + * + * @param <T> + * The resulting type of preprocessing. + */ + protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends + TextEdgeReader { + @Override + public final EdgeWithSource<I, E> getCurrentEdge() throws IOException, + InterruptedException { + Text line = getRecordReader().getCurrentValue(); + T processed = preprocessLine(line); + I sourceVertexId = getSourceVertexId(processed); + I targetVertexId = getTargetVertexId(processed); + E edgeValue = getValue(processed); + return new EdgeWithSource<I, E>(sourceVertexId, + new Edge<I, E>(targetVertexId, edgeValue)); + } + + @Override + public final boolean nextEdge() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /** + * Preprocess the line so other methods can easily read necessary + * information for creating edge + * + * @param line + * the current line to be read + * @return + * the preprocessed object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract T preprocessLine(Text line) throws IOException; + + /** + * Reads target vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the target vertex id + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getTargetVertexId(T line) throws IOException; + + /** + * Reads source vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the source vertex id + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getSourceVertexId(T line) throws IOException; + + /** + * Reads edge value from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the edge value + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract E getValue(T line) throws IOException; + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java new file mode 100644 index 0000000..7beddb8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Edge; +import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.io.VertexInputFormat; +import org.apache.giraph.io.VertexReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * Abstract class that users should subclass to use their own text based + * vertex input format. + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + */ +@SuppressWarnings("rawtypes") +public abstract class TextVertexInputFormat<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends VertexInputFormat<I, V, E, M> { + + /** Uses the GiraphTextInputFormat to do everything */ + protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); + + @Override + public List<InputSplit> getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + // Ignore the hint of numWorkers here since we are using + // GiraphTextInputFormat to do this for us + return textInputFormat.getVertexSplits(context); + } + + /** + * The factory method which produces the {@link TextVertexReader} used by this + * input format. + * + * @param split + * the split to be read + * @param context + * the information about the task + * @return + * the text vertex reader to be used + */ + @Override + public abstract TextVertexReader createVertexReader(InputSplit split, + TaskAttemptContext context) throws IOException; + + /** + * Abstract class to be implemented by the user based on their specific + * vertex input. Easiest to ignore the key value separator and only use + * key instead. + * + * When reading a vertex from each line, extend + * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line + * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you + * need common exception handling while preprocessing, then extend + * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}. + */ + protected abstract class TextVertexReader implements + VertexReader<I, V, E, M> { + /** Internal line record reader */ + private RecordReader<LongWritable, Text> lineRecordReader; + /** Context passed to initialize */ + private TaskAttemptContext context; + /** Cached configuration */ + private ImmutableClassesGiraphConfiguration<I, V, E, M> conf; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + this.context = context; + conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>( + context.getConfiguration()); + lineRecordReader = createLineRecordReader(inputSplit, context); + lineRecordReader.initialize(inputSplit, context); + } + + /** + * Create the line record reader. Override this to use a different + * underlying record reader (useful for testing). + * + * @param inputSplit + * the split to read + * @param context + * the context passed to initialize + * @return + * the record reader to be used + * @throws IOException + * exception that can be thrown during creation + * @throws InterruptedException + * exception that can be thrown during creation + */ + protected RecordReader<LongWritable, Text> + createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return textInputFormat.createRecordReader(inputSplit, context); + } + + @Override + public void close() throws IOException { + lineRecordReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return lineRecordReader.getProgress(); + } + + /** + * Get the line record reader. + * + * @return Record reader to be used for reading. + */ + protected RecordReader<LongWritable, Text> getRecordReader() { + return lineRecordReader; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + + /** + * Get the configuration. + * + * @return Configuration for this reader + */ + protected ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() { + return conf; + } + } + + /** + * Abstract class to be implemented by the user to read a vertex from each + * text line. + */ + protected abstract class TextVertexReaderFromEachLine extends + TextVertexReader { + + @Override + public final Vertex<I, V, E, M> getCurrentVertex() throws IOException, + InterruptedException { + Text line = getRecordReader().getCurrentValue(); + Vertex<I, V, E, M> vertex = getConf().createVertex(); + vertex.initialize(getId(line), getValue(line), getEdges(line)); + return vertex; + } + + @Override + public final boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /** + * Reads vertex id from the current line. + * + * @param line + * the current line + * @return + * the vertex id corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getId(Text line) throws IOException; + + /** + * Reads vertex value from the current line. + * + * @param line + * the current line + * @return + * the vertex value corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract V getValue(Text line) throws IOException; + + /** + * Reads edges value from the current line. + * + * @param line + * the current line + * @return + * the edges + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract Iterable<Edge<I, E>> getEdges(Text line) throws + IOException; + + } + + /** + * Abstract class to be implemented by the user to read a vertex from each + * text line after preprocessing it. + * + * @param <T> + * The resulting type of preprocessing. + */ + protected abstract class TextVertexReaderFromEachLineProcessed<T> extends + TextVertexReader { + + @Override + public final boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + @Override + public final Vertex<I, V, E, M> getCurrentVertex() throws IOException, + InterruptedException { + Text line = getRecordReader().getCurrentValue(); + Vertex<I, V, E, M> vertex; + T processed = preprocessLine(line); + vertex = getConf().createVertex(); + vertex.initialize(getId(processed), getValue(processed), + getEdges(processed)); + return vertex; + } + + /** + * Preprocess the line so other methods can easily read necessary + * information for creating vertex. + * + * @param line + * the current line to be read + * @return + * the preprocessed object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract T preprocessLine(Text line) throws IOException; + + /** + * Reads vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex id + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getId(T line) throws IOException; + + /** + * Reads vertex value from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex value + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract V getValue(T line) throws IOException; + + /** + * Reads edges from the preprocessed line. + * + * + * @param line + * the object obtained by preprocessing the line + * @return + * the edges + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract Iterable<Edge<I, E>> getEdges(T line) throws + IOException; + + } + + // CHECKSTYLE: stop RedundantThrows + /** + * Abstract class to be implemented by the user to read a vertex from each + * text line after preprocessing it with exception handling. + * + * @param <T> + * The resulting type of preprocessing. + * @param <X> + * The exception type that can be thrown due to preprocessing. + */ + protected abstract class + TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends + Throwable> extends TextVertexReader { + + @Override + public final boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + @SuppressWarnings("unchecked") + @Override + public final Vertex<I, V, E, M> getCurrentVertex() throws IOException, + InterruptedException { + // Note we are reading from value only since key is the line number + Text line = getRecordReader().getCurrentValue(); + Vertex<I, V, E, M> vertex; + T processed = null; + try { + processed = preprocessLine(line); + Configuration conf = getContext().getConfiguration(); + vertex = getConf().createVertex(); + vertex.initialize(getId(processed), getValue(processed), + getEdges(processed)); + } catch (IOException e) { + throw e; + // CHECKSTYLE: stop IllegalCatch + } catch (Throwable t) { + return handleException(line, processed, (X) t); + // CHECKSTYLE: resume IllegalCatch + } + return vertex; + } + + /** + * Preprocess the line so other methods can easily read necessary + * information for creating vertex. + * + * @param line + * the current line to be read + * @return + * the preprocessed object + * @throws X + * exception that can be thrown while preprocessing the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract T preprocessLine(Text line) throws X, IOException; + + /** + * Reads vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex id + * @throws X + * exception that can be thrown while reading the preprocessed + * object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getId(T line) throws X, IOException; + + /** + * Reads vertex value from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex value + * @throws X + * exception that can be thrown while reading the preprocessed + * object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract V getValue(T line) throws X, IOException; + + /** + * Reads edges from the preprocessed line. + * + * + * @param line + * the object obtained by preprocessing the line + * @return + * the edges + * @throws X + * exception that can be thrown while reading the preprocessed + * object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract Iterable<Edge<I, E>> getEdges(T line) throws X, + IOException; + + /** + * Handles exceptions while reading vertex from each line. + * + * @param line + * the line that was being read when the exception was thrown + * @param processed + * the object obtained by preprocessing the line. Can be null if + * exception was thrown during preprocessing. + * @param e + * the exception thrown while reading the line + * @return the recovered/alternative vertex to be used + */ + protected Vertex<I, V, E, M> handleException(Text line, T processed, X e) { + throw new IllegalArgumentException(e); + } + + } + // CHECKSTYLE: resume RedundantThrows + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java new file mode 100644 index 0000000..9f1fe1f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.VertexWriter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import java.io.IOException; + +/** + * Abstract class that users should subclass to use their own text based + * vertex output format. + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + */ +@SuppressWarnings("rawtypes") +public abstract class TextVertexOutputFormat<I extends WritableComparable, + V extends Writable, E extends Writable> + extends VertexOutputFormat<I, V, E> { + + /** Uses the TextOutputFormat to do everything */ + protected TextOutputFormat<Text, Text> textOutputFormat = + new TextOutputFormat<Text, Text>(); + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + textOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return textOutputFormat.getOutputCommitter(context); + } + + /** + * The factory method which produces the {@link TextVertexWriter} used by this + * output format. + * + * @param context + * the information about the task + * @return + * the text vertex writer to be used + */ + @Override + public abstract TextVertexWriter createVertexWriter(TaskAttemptContext + context) throws IOException, InterruptedException; + + /** + * Abstract class to be implemented by the user based on their specific + * vertex output. Easiest to ignore the key value separator and only use + * key instead. + */ + protected abstract class TextVertexWriter implements VertexWriter<I, V, E> { + /** Internal line record writer */ + private RecordWriter<Text, Text> lineRecordWriter; + /** Context passed to initialize */ + private TaskAttemptContext context; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, + InterruptedException { + lineRecordWriter = createLineRecordWriter(context); + this.context = context; + } + + /** + * Create the line record writer. Override this to use a different + * underlying record writer (useful for testing). + * + * @param context + * the context passed to initialize + * @return + * the record writer to be used + * @throws IOException + * exception that can be thrown during creation + * @throws InterruptedException + * exception that can be thrown during creation + */ + protected RecordWriter<Text, Text> createLineRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return textOutputFormat.getRecordWriter(context); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + lineRecordWriter.close(context); + } + + /** + * Get the line record writer. + * + * @return Record writer to be used for writing. + */ + public RecordWriter<Text, Text> getRecordWriter() { + return lineRecordWriter; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + public TaskAttemptContext getContext() { + return context; + } + } + + /** + * Abstract class to be implemented by the user to write a line for each + * vertex. + */ + protected abstract class TextVertexWriterToEachLine extends TextVertexWriter { + + @SuppressWarnings("unchecked") + @Override + public final void writeVertex(Vertex vertex) throws + IOException, InterruptedException { + // Note we are writing line as key with null value + getRecordWriter().write(convertVertexToLine(vertex), null); + } + + /** + * Writes a line for the given vertex. + * + * @param vertex + * the current vertex for writing + * @return the text line to be written + * @throws IOException + * exception that can be thrown while writing + */ + protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex) + throws IOException; + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java new file mode 100644 index 0000000..4e607c2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.io.VertexValueInputFormat; +import org.apache.giraph.io.VertexValueReader; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * Abstract class that users should subclass to use their own text based + * vertex value input format. + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + */ +@SuppressWarnings("rawtypes") +public abstract class TextVertexValueInputFormat<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends VertexValueInputFormat<I, V, E, M> { + /** Uses the GiraphTextInputFormat to do everything */ + protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); + + @Override + public List<InputSplit> getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + // Ignore the hint of numWorkers here since we are using + // GiraphTextInputFormat to do this for us + return textInputFormat.getVertexSplits(context); + } + + @Override + public abstract TextVertexValueReader createVertexValueReader( + InputSplit split, TaskAttemptContext context) throws IOException; + + /** + * {@link VertexValueReader} for {@link VertexValueInputFormat}. + */ + protected abstract class TextVertexValueReader extends + VertexValueReader<I, V, E, M> { + /** Internal line record reader */ + private RecordReader<LongWritable, Text> lineRecordReader; + /** Context passed to initialize */ + private TaskAttemptContext context; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + super.initialize(inputSplit, context); + this.context = context; + lineRecordReader = createLineRecordReader(inputSplit, context); + lineRecordReader.initialize(inputSplit, context); + } + + /** + * Create the line record reader. Override this to use a different + * underlying record reader (useful for testing). + * + * @param inputSplit + * the split to read + * @param context + * the context passed to initialize + * @return + * the record reader to be used + * @throws IOException + * exception that can be thrown during creation + * @throws InterruptedException + * exception that can be thrown during creation + */ + protected RecordReader<LongWritable, Text> + createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return textInputFormat.createRecordReader(inputSplit, context); + } + + @Override + public void close() throws IOException { + lineRecordReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return lineRecordReader.getProgress(); + } + + /** + * Get the line record reader. + * + * @return Record reader to be used for reading. + */ + protected RecordReader<LongWritable, Text> getRecordReader() { + return lineRecordReader; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + } + + /** + * Abstract class to be implemented by the user to read a vertex value from + * each text line. + */ + protected abstract class TextVertexValueReaderFromEachLine extends + TextVertexValueReader { + @Override + public final I getCurrentVertexId() throws IOException, + InterruptedException { + return getId(getRecordReader().getCurrentValue()); + } + + @Override + public final V getCurrentVertexValue() throws IOException, + InterruptedException { + return getValue(getRecordReader().getCurrentValue()); + } + + @Override + public final boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /** + * Reads vertex id from the current line. + * + * @param line + * the current line + * @return + * the vertex id corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getId(Text line) throws IOException; + + /** + * Reads vertex value from the current line. + * + * @param line + * the current line + * @return + * the vertex value corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract V getValue(Text line) throws IOException; + } + + /** + * Abstract class to be implemented by the user to read a vertex value from + * each text line after preprocessing it. + * + * @param <T> + * The resulting type of preprocessing. + */ + protected abstract class TextVertexValueReaderFromEachLineProcessed<T> + extends TextVertexValueReader { + /** Last preprocessed line. */ + private T processedLine = null; + + /** Get last preprocessed line. Generate it if missing. + * + * @return The last preprocessed line + * @throws IOException + * @throws InterruptedException + */ + private T getProcessedLine() throws IOException, InterruptedException { + if (processedLine == null) { + processedLine = preprocessLine(getRecordReader().getCurrentValue()); + } + return processedLine; + } + + @Override + public I getCurrentVertexId() throws IOException, + InterruptedException { + return getId(getProcessedLine()); + } + + @Override + public V getCurrentVertexValue() throws IOException, + InterruptedException { + return getValue(getProcessedLine()); + } + + @Override + public final boolean nextVertex() throws IOException, InterruptedException { + processedLine = null; + return getRecordReader().nextKeyValue(); + } + + /** + * Preprocess the line so other methods can easily read necessary + * information for creating vertex. + * + * @param line + * the current line to be read + * @return + * the preprocessed object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract T preprocessLine(Text line) throws IOException; + + /** + * Reads vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex id + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getId(T line) throws IOException; + + /** + * Reads vertex value from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex value + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract V getValue(T line) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java new file mode 100644 index 0000000..27df034 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of reusable library Giraph objects. + */ +package org.apache.giraph.io.formats; http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/package-info.java index 3b0519a..fd631db 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/package-info.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/package-info.java @@ -16,6 +16,6 @@ * limitations under the License. */ /** - * Package of reusable library Giraph objects. + * Input/Output related things. */ package org.apache.giraph.io;
