Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java Thu Sep 20 23:00:27 2012 @@ -18,8 +18,7 @@ package org.apache.giraph.io; -import java.io.IOException; - +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexOutputFormat; import org.apache.giraph.graph.VertexWriter; import org.apache.hadoop.io.Text; @@ -31,6 +30,8 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import java.io.IOException; + /** * Abstract class that users should subclass to use their own text based * vertex output format. @@ -43,38 +44,70 @@ import org.apache.hadoop.mapreduce.lib.o public abstract class TextVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends VertexOutputFormat<I, V, E> { + /** Uses the TextOutputFormat to do everything */ protected TextOutputFormat<Text, Text> textOutputFormat = new TextOutputFormat<Text, Text>(); + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + textOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return textOutputFormat.getOutputCommitter(context); + } + + /** + * The factory method which produces the {@link TextVertexWriter} used by this + * output format. + * + * @param context + * the information about the task + * @return + * the text vertex writer to be used + */ + @Override + public abstract TextVertexWriter createVertexWriter(TaskAttemptContext + context) throws IOException, InterruptedException; + /** * Abstract class to be implemented by the user based on their specific * vertex output. Easiest to ignore the key value separator and only use * key instead. - * - * @param <I> Vertex index value - * @param <V> Vertex value - * @param <E> Edge value */ - public abstract static class TextVertexWriter<I extends WritableComparable, - V extends Writable, E extends Writable> implements VertexWriter<I, V, E> { + protected abstract class TextVertexWriter implements VertexWriter<I, V, E> { + /** Internal line record writer */ + private RecordWriter<Text, Text> lineRecordWriter; /** Context passed to initialize */ private TaskAttemptContext context; - /** Internal line record writer */ - private final RecordWriter<Text, Text> lineRecordWriter; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, + InterruptedException { + lineRecordWriter = createLineRecordWriter(context); + this.context = context; + } /** - * Initialize with the LineRecordWriter. + * Create the line record writer. Override this to use a different + * underlying record writer (useful for testing). * - * @param lineRecordWriter Line record writer from TextOutputFormat + * @param context + * the context passed to initialize + * @return + * the record writer to be used + * @throws IOException + * exception that can be thrown during creation + * @throws InterruptedException + * exception that can be thrown during creation */ - public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) { - this.lineRecordWriter = lineRecordWriter; - } - - @Override - public void initialize(TaskAttemptContext context) throws IOException { - this.context = context; + protected RecordWriter<Text, Text> createLineRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return textOutputFormat.getRecordWriter(context); } @Override @@ -102,15 +135,31 @@ public abstract class TextVertexOutputFo } } - @Override - public void checkOutputSpecs(JobContext context) - throws IOException, InterruptedException { - textOutputFormat.checkOutputSpecs(context); - } + /** + * Abstract class to be implemented by the user to write a line for each + * vertex. + */ + protected abstract class TextVertexWriterToEachLine extends TextVertexWriter { - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return textOutputFormat.getOutputCommitter(context); + @SuppressWarnings("unchecked") + @Override + public final void writeVertex(Vertex vertex) throws + IOException, InterruptedException { + // Note we are writing line as key with null value + getRecordWriter().write(convertVertexToLine(vertex), null); + } + + /** + * Writes a line for the given vertex. + * + * @param vertex + * the current vertex for writing + * @return the text line to be written + * @throws IOException + * exception that can be thrown while writing + */ + protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex) + throws IOException; } + }
Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java Thu Sep 20 23:00:27 2012 @@ -19,7 +19,6 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.Vertex; -import org.apache.giraph.io.AdjacencyListTextVertexOutputFormat.AdjacencyListVertexWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; @@ -38,7 +37,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -public class TestAdjacencyListTextVertexOutputFormat { +public class TestAdjacencyListTextVertexOutputFormat extends + AdjacencyListTextVertexOutputFormat<Text, DoubleWritable, DoubleWritable> { + + protected AdjacencyListTextVertexWriter createVertexWriter( + final RecordWriter<Text, Text> tw) { + AdjacencyListTextVertexWriter writer = new AdjacencyListTextVertexWriter() { + @Override + protected RecordWriter<Text, Text> createLineRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return tw; + } + }; + return writer; + } @Test public void testVertexWithNoEdges() throws IOException, InterruptedException { @@ -53,7 +65,7 @@ public class TestAdjacencyListTextVertex when(vertex.getEdges()).thenReturn(new ArrayList<Text>()); RecordWriter<Text, Text> tw = mock(RecordWriter.class); - AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw); + AdjacencyListTextVertexWriter writer = createVertexWriter(tw); writer.initialize(tac); writer.writeVertex(vertex); @@ -84,7 +96,7 @@ public class TestAdjacencyListTextVertex when(vertex.getEdges()).thenReturn(cities); RecordWriter<Text,Text> tw = mock(RecordWriter.class); - AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw); + AdjacencyListTextVertexWriter writer = createVertexWriter(tw); writer.initialize(tac); writer.writeVertex(vertex); @@ -97,7 +109,7 @@ public class TestAdjacencyListTextVertex @Test public void testWithDifferentDelimiter() throws IOException, InterruptedException { Configuration conf = new Configuration(); - conf.set(AdjacencyListVertexWriter.LINE_TOKENIZE_VALUE, ":::"); + conf.set(AdjacencyListTextVertexOutputFormat.LINE_TOKENIZE_VALUE, ":::"); TaskAttemptContext tac = mock(TaskAttemptContext.class); when(tac.getConfiguration()).thenReturn(conf); @@ -116,7 +128,7 @@ public class TestAdjacencyListTextVertex when(vertex.getEdges()).thenReturn(cities); RecordWriter<Text,Text> tw = mock(RecordWriter.class); - AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw); + AdjacencyListTextVertexWriter writer = createVertexWriter(tw); writer.initialize(tac); writer.writeVertex(vertex); @@ -125,4 +137,5 @@ public class TestAdjacencyListTextVertex verify(tw).write(expected, null); verify(vertex, times(1)).getEdges(); } + } Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java Thu Sep 20 23:00:27 2012 @@ -19,18 +19,16 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Vertex; -import org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.junit.Test; import org.mockito.Matchers; -import static org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter.LINE_TOKENIZE_VALUE; -import static org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter.REVERSE_ID_AND_VALUE; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,7 +37,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; -public class TestIdWithValueTextOutputFormat { +public class TestIdWithValueTextOutputFormat extends + IdWithValueTextOutputFormat<Text, DoubleWritable, Writable> { @Test public void testHappyPath() throws IOException, InterruptedException { Configuration conf = new Configuration(); @@ -79,8 +78,14 @@ public class TestIdWithValueTextOutputFo // Create empty iterator == no edges when(vertex.getEdges()).thenReturn(new ArrayList<Text>()); - RecordWriter<Text, Text> tw = mock(RecordWriter.class); - IdWithValueVertexWriter writer = new IdWithValueVertexWriter(tw); + final RecordWriter<Text, Text> tw = mock(RecordWriter.class); + IdWithValueVertexWriter writer = new IdWithValueVertexWriter() { + @Override + protected RecordWriter<Text, Text> createLineRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return tw; + } + }; writer.initialize(tac); writer.writeVertex(vertex); Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012 @@ -29,6 +29,7 @@ import org.apache.hadoop.io.DoubleWritab import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.junit.Before; @@ -44,7 +45,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; -public class TestLongDoubleDoubleAdjacencyListVertexInputFormat { +public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends + LongDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> { private RecordReader<LongWritable, Text> rr; private Configuration conf; @@ -64,13 +66,30 @@ public class TestLongDoubleDoubleAdjacen when(tac.getConfiguration()).thenReturn(conf); } + protected TextVertexReader createVertexReader( + RecordReader<LongWritable, Text> rr) { + return createVertexReader(rr, null); + } + + protected TextVertexReader createVertexReader( + final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) { + return new LongDoubleDoubleAdjacencyListVertexReader(lineSanitizer) { + @Override + protected RecordReader<LongWritable, Text> createLineRecordReader( + InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return rr; + } + }; + } + @Test public void testIndexMustHaveValue() throws IOException, InterruptedException { String input = "123"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); + TextVertexReader vr = createVertexReader(rr); + vr.initialize(null, tac); @@ -88,8 +107,8 @@ public class TestLongDoubleDoubleAdjacen String input = "99\t55.2\t100"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); + TextVertexReader vr = createVertexReader(rr); + vr.initialize(null, tac); @@ -107,8 +126,8 @@ public class TestLongDoubleDoubleAdjacen String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); + TextVertexReader vr = createVertexReader(rr); + vr.initialize(null, tac); @@ -129,9 +148,8 @@ public class TestLongDoubleDoubleAdjacen String input = "12345:42.42:9999999:99.9"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":"); - LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); + conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":"); + TextVertexReader vr = createVertexReader(rr); vr.initialize(null, tac); assertTrue("Should have been able to read vertex", vr.nextVertex()); Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012 @@ -30,6 +30,7 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.junit.Before; @@ -51,7 +52,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -public class TestTextDoubleDoubleAdjacencyListVertexInputFormat { +public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends + TextDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> { private RecordReader<LongWritable, Text> rr; private Configuration conf; @@ -71,13 +73,29 @@ public class TestTextDoubleDoubleAdjacen when(tac.getConfiguration()).thenReturn(conf); } + protected TextVertexReader createVertexReader( + RecordReader<LongWritable, Text> rr) { + return createVertexReader(rr, null); + } + + protected TextVertexReader createVertexReader( + final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) { + return new TextDoubleDoubleAdjacencyListVertexReader(lineSanitizer) { + @Override + protected RecordReader<LongWritable, Text> createLineRecordReader( + InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return rr; + } + }; + } + @Test public void testIndexMustHaveValue() throws IOException, InterruptedException { String input = "hi"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); + TextVertexReader vr = createVertexReader(rr); vr.initialize(null, tac); @@ -95,8 +113,7 @@ public class TestTextDoubleDoubleAdjacen String input = "index\t55.66\tindex2"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); + TextVertexReader vr = createVertexReader(rr); vr.initialize(null, tac); try { vr.nextVertex(); @@ -153,8 +170,7 @@ public class TestTextDoubleDoubleAdjacen String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); + TextVertexReader vr = createVertexReader(rr); vr.initialize(null, tac); assertTrue("Should have been able to add a vertex", vr.nextVertex()); @@ -172,8 +188,8 @@ public class TestTextDoubleDoubleAdjacen public void testLineSanitizer() throws Exception { String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001"; - AdjacencyListVertexReader.LineSanitizer toUpper = - new AdjacencyListVertexReader.LineSanitizer() { + AdjacencyListTextVertexInputFormat.LineSanitizer toUpper = + new AdjacencyListTextVertexInputFormat.LineSanitizer() { @Override public String sanitize(String s) { return s.toUpperCase(); @@ -181,8 +197,7 @@ public class TestTextDoubleDoubleAdjacen }; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr, toUpper); + TextVertexReader vr = createVertexReader(rr, toUpper); vr.initialize(null, tac); assertTrue("Should have been able to read vertex", vr.nextVertex()); @@ -203,9 +218,8 @@ public class TestTextDoubleDoubleAdjacen String input = "alpha:42:beta:99"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":"); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); + conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":"); + TextVertexReader vr = createVertexReader(rr); vr.initialize(null, tac); assertTrue("Should have been able to read vertex", vr.nextVertex());
