Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java?rev=1405175&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java Fri Nov 2 21:37:30 2012 @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.EdgeInputFormat; +import org.apache.giraph.graph.EdgeReader; +import org.apache.giraph.graph.EdgeWithSource; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * Abstract class that users should subclass to use their own text based + * edge output format. + * + * @param <I> Vertex id + * @param <E> Edge data + */ +@SuppressWarnings("rawtypes") +public abstract class TextEdgeInputFormat<I extends WritableComparable, + E extends Writable> extends EdgeInputFormat<I, E> { + /** Underlying GiraphTextInputFormat. */ + protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); + + @Override + public List<InputSplit> getSplits( + JobContext context, int numWorkers) throws IOException, + InterruptedException { + // Ignore the hint of numWorkers here since we are using + // GiraphTextInputFormat to do this for us + return textInputFormat.getEdgeSplits(context); + } + + @Override + public abstract TextEdgeReader createEdgeReader( + InputSplit split, TaskAttemptContext context) throws IOException; + + /** + * {@link EdgeReader} for {@link TextEdgeInputFormat}. + */ + protected abstract class TextEdgeReader implements EdgeReader<I, E> { + /** Internal line record reader */ + private RecordReader<LongWritable, Text> lineRecordReader; + /** Context passed to initialize */ + private TaskAttemptContext context; + /** + * Cached configuration. We don't care about vertex value and message type. + */ + private ImmutableClassesGiraphConfiguration<I, NullWritable, E, + NullWritable> conf; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + this.context = context; + conf = new ImmutableClassesGiraphConfiguration<I, NullWritable, E, + NullWritable>(context.getConfiguration()); + lineRecordReader = createLineRecordReader(inputSplit, context); + lineRecordReader.initialize(inputSplit, context); + } + + /** + * Create the line record reader. Override this to use a different + * underlying record reader (useful for testing). + * + * @param inputSplit + * the split to read + * @param context + * the context passed to initialize + * @return + * the record reader to be used + * @throws IOException + * exception that can be thrown during creation + * @throws InterruptedException + * exception that can be thrown during creation + */ + protected RecordReader<LongWritable, Text> + createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return textInputFormat.createRecordReader(inputSplit, context); + } + + @Override + public void close() throws IOException { + lineRecordReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return lineRecordReader.getProgress(); + } + + /** + * Get the line record reader. + * + * @return Record reader to be used for reading. + */ + protected RecordReader<LongWritable, Text> getRecordReader() { + return lineRecordReader; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + + /** + * Get the configuration. + * + * @return Configuration for this reader + */ + protected ImmutableClassesGiraphConfiguration<I, NullWritable, E, + NullWritable> getConf() { + return conf; + } + } + + /** + * Abstract class to be implemented by the user to read an edge from each + * text line. + */ + protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader { + @Override + public final EdgeWithSource<I, E> getCurrentEdge() throws IOException, + InterruptedException { + Text line = getRecordReader().getCurrentValue(); + I sourceVertexId = getSourceVertexId(line); + I targetVertexId = getTargetVertexId(line); + E edgeValue = getValue(line); + return new EdgeWithSource<I, E>(sourceVertexId, + new Edge<I, E>(targetVertexId, edgeValue)); + } + + @Override + public final boolean nextEdge() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /** + * Reads source vertex id from the current line. + * + * @param line + * the current line + * @return + * the source vertex id corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getSourceVertexId(Text line) throws IOException; + + + /** + * Reads target vertex id from the current line. + * + * @param line + * the current line + * @return + * the target vertex id corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getTargetVertexId(Text line) throws IOException; + + /** + * Reads edge value from the current line. + * + * @param line + * the current line + * @return + * the edge value corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract E getValue(Text line) throws IOException; + } + + /** + * Abstract class to be implemented by the user to read an edge from each + * text line after preprocessing it. + * + * @param <T> + * The resulting type of preprocessing. + */ + protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends + TextEdgeReader { + @Override + public final EdgeWithSource<I, E> getCurrentEdge() throws IOException, + InterruptedException { + Text line = getRecordReader().getCurrentValue(); + T processed = preprocessLine(line); + I sourceVertexId = getSourceVertexId(processed); + I targetVertexId = getTargetVertexId(processed); + E edgeValue = getValue(processed); + return new EdgeWithSource<I, E>(sourceVertexId, + new Edge<I, E>(targetVertexId, edgeValue)); + } + + @Override + public final boolean nextEdge() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /** + * Preprocess the line so other methods can easily read necessary + * information for creating edge + * + * @param line + * the current line to be read + * @return + * the preprocessed object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract T preprocessLine(Text line) throws IOException; + + /** + * Reads target vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the target vertex id + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getTargetVertexId(T line) throws IOException; + + /** + * Reads source vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the source vertex id + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getSourceVertexId(T line) throws IOException; + + /** + * Reads edge value from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the edge value + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract E getValue(T line) throws IOException; + } +}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1405175&r1=1405174&r2=1405175&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Fri Nov 2 21:37:30 2012 @@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.Input import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; import java.util.List; @@ -39,7 +38,7 @@ import java.util.Map; /** * Abstract class that users should subclass to use their own text based - * vertex output format. + * vertex input format. * * @param <I> Vertex index value * @param <V> Vertex value @@ -51,15 +50,15 @@ public abstract class TextVertexInputFor V extends Writable, E extends Writable, M extends Writable> extends VertexInputFormat<I, V, E, M> { - /** Uses the TextInputFormat to do everything */ - protected TextInputFormat textInputFormat = new TextInputFormat(); + /** Uses the GiraphTextInputFormat to do everything */ + protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); @Override public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException { - // Ignore the hint of numWorkers here since we are using TextInputFormat - // to do this for us - return textInputFormat.getSplits(context); + // Ignore the hint of numWorkers here since we are using + // GiraphTextInputFormat to do this for us + return textInputFormat.getVertexSplits(context); } /** Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java?rev=1405175&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java Fri Nov 2 21:37:30 2012 @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.graph.VertexValueInputFormat; +import org.apache.giraph.graph.VertexValueReader; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * Abstract class that users should subclass to use their own text based + * vertex value input format. + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + */ +@SuppressWarnings("rawtypes") +public abstract class TextVertexValueInputFormat<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends VertexValueInputFormat<I, V, E, M> { + /** Uses the GiraphTextInputFormat to do everything */ + protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); + + @Override + public List<InputSplit> getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + // Ignore the hint of numWorkers here since we are using + // GiraphTextInputFormat to do this for us + return textInputFormat.getVertexSplits(context); + } + + @Override + public abstract TextVertexValueReader createVertexValueReader( + InputSplit split, TaskAttemptContext context) throws IOException; + + /** + * {@link VertexValueReader} for {@link VertexValueInputFormat}. + */ + protected abstract class TextVertexValueReader extends + VertexValueReader<I, V, E, M> { + /** Internal line record reader */ + private RecordReader<LongWritable, Text> lineRecordReader; + /** Context passed to initialize */ + private TaskAttemptContext context; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + super.initialize(inputSplit, context); + this.context = context; + lineRecordReader = createLineRecordReader(inputSplit, context); + lineRecordReader.initialize(inputSplit, context); + } + + /** + * Create the line record reader. Override this to use a different + * underlying record reader (useful for testing). + * + * @param inputSplit + * the split to read + * @param context + * the context passed to initialize + * @return + * the record reader to be used + * @throws IOException + * exception that can be thrown during creation + * @throws InterruptedException + * exception that can be thrown during creation + */ + protected RecordReader<LongWritable, Text> + createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return textInputFormat.createRecordReader(inputSplit, context); + } + + @Override + public void close() throws IOException { + lineRecordReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return lineRecordReader.getProgress(); + } + + /** + * Get the line record reader. + * + * @return Record reader to be used for reading. + */ + protected RecordReader<LongWritable, Text> getRecordReader() { + return lineRecordReader; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + } + + /** + * Abstract class to be implemented by the user to read a vertex value from + * each text line. + */ + protected abstract class TextVertexValueReaderFromEachLine extends + TextVertexValueReader { + @Override + public final I getCurrentVertexId() throws IOException, + InterruptedException { + return getId(getRecordReader().getCurrentValue()); + } + + @Override + public final V getCurrentVertexValue() throws IOException, + InterruptedException { + return getValue(getRecordReader().getCurrentValue()); + } + + @Override + public final boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + /** + * Reads vertex id from the current line. + * + * @param line + * the current line + * @return + * the vertex id corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getId(Text line) throws IOException; + + /** + * Reads vertex value from the current line. + * + * @param line + * the current line + * @return + * the vertex value corresponding to the line + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract V getValue(Text line) throws IOException; + } + + /** + * Abstract class to be implemented by the user to read a vertex value from + * each text line after preprocessing it. + * + * @param <T> + * The resulting type of preprocessing. + */ + protected abstract class TextVertexValueReaderFromEachLineProcessed<T> + extends TextVertexValueReader { + /** Last preprocessed line. */ + private T processedLine = null; + + /** Get last preprocessed line. Generate it if missing. + * + * @return The last preprocessed line + * @throws IOException + * @throws InterruptedException + */ + private T getProcessedLine() throws IOException, InterruptedException { + if (processedLine == null) { + processedLine = preprocessLine(getRecordReader().getCurrentValue()); + } + return processedLine; + } + + @Override + public I getCurrentVertexId() throws IOException, + InterruptedException { + return getId(getProcessedLine()); + } + + @Override + public V getCurrentVertexValue() throws IOException, + InterruptedException { + return getValue(getProcessedLine()); + } + + @Override + public final boolean nextVertex() throws IOException, InterruptedException { + processedLine = null; + return getRecordReader().nextKeyValue(); + } + + /** + * Preprocess the line so other methods can easily read necessary + * information for creating vertex. + * + * @param line + * the current line to be read + * @return + * the preprocessed object + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract T preprocessLine(Text line) throws IOException; + + /** + * Reads vertex id from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex id + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract I getId(T line) throws IOException; + + /** + * Reads vertex value from the preprocessed line. + * + * @param line + * the object obtained by preprocessing the line + * @return + * the vertex value + * @throws IOException + * exception that can be thrown while reading + */ + protected abstract V getValue(T line) throws IOException; + } +} Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java?rev=1405175&r1=1405174&r2=1405175&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java Fri Nov 2 21:37:30 2012 @@ -18,13 +18,14 @@ package org.apache.giraph.utils; -import com.google.common.base.Charsets; -import com.google.common.io.Closeables; -import com.google.common.io.Files; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import com.google.common.base.Charsets; +import com.google.common.io.Closeables; +import com.google.common.io.Files; + import java.io.File; import java.io.FileFilter; import java.io.IOException; @@ -114,7 +115,7 @@ public class FileUtils { * @param lines Strings written to the file * @throws IOException */ - public static void writeLines(File file, String... lines) + public static void writeLines(File file, String[] lines) throws IOException { Writer writer = Files.newWriter(file, Charsets.UTF_8); try { Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java?rev=1405175&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java Fri Nov 2 21:37:30 2012 @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +/** + * A pair of integers. + */ +public class IntPair { + /** First element. */ + private int first; + /** Second element. */ + private int second; + + /** Constructor. + * + * @param fst First element + * @param snd Second element + */ + public IntPair(int fst, int snd) { + first = fst; + second = snd; + } + + /** + * Get the first element. + * + * @return The first element + */ + public int getFirst() { + return first; + } + + /** + * Set the first element. + * + * @param first The first element + */ + public void setFirst(int first) { + this.first = first; + } + + /** + * Get the second element. + * + * @return The second element + */ + public int getSecond() { + return second; + } + + /** + * Set the second element. + * + * @param second The second element + */ + public void setSecond(int second) { + this.second = second; + } +} Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1405175&r1=1405174&r2=1405175&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Fri Nov 2 21:37:30 2012 @@ -18,14 +18,8 @@ package org.apache.giraph.utils; -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.giraph.GiraphConfiguration; +import org.apache.giraph.graph.EdgeInputFormat; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.MasterCompute; import org.apache.giraph.graph.Vertex; @@ -33,9 +27,9 @@ import org.apache.giraph.graph.VertexCom import org.apache.giraph.graph.VertexInputFormat; import org.apache.giraph.graph.VertexOutputFormat; import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.io.GiraphFileInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; @@ -44,6 +38,13 @@ import org.apache.zookeeper.server.quoru import com.google.common.base.Charsets; import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * A base class for running internal tests on a vertex * @@ -81,7 +82,8 @@ public class InternalVertexRunner { Class<? extends Vertex> vertexClass, Class<? extends VertexInputFormat> vertexInputFormatClass, Class<? extends VertexOutputFormat> vertexOutputFormatClass, - Map<String, String> params, String... data) throws Exception { + Map<String, String> params, + String[] data) throws Exception { return run(vertexClass, null, vertexInputFormatClass, vertexOutputFormatClass, params, data); } @@ -107,7 +109,7 @@ public class InternalVertexRunner { Class<? extends VertexInputFormat> vertexInputFormatClass, Class<? extends VertexOutputFormat> vertexOutputFormatClass, Map<String, String> params, - String... data) throws Exception { + String[] data) throws Exception { return InternalVertexRunner.run(vertexClass, vertexCombinerClass, vertexInputFormatClass, vertexOutputFormatClass, null, null, params, data); @@ -129,32 +131,90 @@ public class InternalVertexRunner { * @throws Exception */ @SuppressWarnings("rawtypes") + public static Iterable<String> run( + Class<? extends Vertex> vertexClass, + Class<? extends VertexCombiner> vertexCombinerClass, + Class<? extends VertexInputFormat> vertexInputFormatClass, + Class<? extends VertexOutputFormat> vertexOutputFormatClass, + Class<? extends WorkerContext> workerContextClass, + Class<? extends MasterCompute> masterComputeClass, + Map<String, String> params, + String[] data) throws Exception { + return run(vertexClass, vertexCombinerClass, vertexInputFormatClass, null, + vertexOutputFormatClass, workerContextClass, masterComputeClass, + params, data, null); + } + + // CHECKSTYLE: stop ParameterNumberCheck + /** + * Attempts to run the vertex internally in the current JVM, reading from and + * writing to a temporary folder on local disk. Will start its own zookeeper + * instance. + * @param vertexClass the vertex class to instantiate + * @param vertexCombinerClass the vertex combiner to use (or null) + * @param vertexInputFormatClass the vertex inputformat to use + * @param edgeInputFormatClass the edge inputformat to use + * @param vertexOutputFormatClass the outputformat to use + * @param workerContextClass the worker context to use + * @param masterComputeClass the master compute class to use + * @param params a map of parameters to add to the hadoop configuration + * @param vertexInputData linewise vertex input data + * @param edgeInputData linewise edge input data + * @return linewise output data + * @throws Exception + */ + @SuppressWarnings("rawtypes") public static Iterable<String> run(Class<? extends Vertex> vertexClass, Class<? extends VertexCombiner> vertexCombinerClass, Class<? extends VertexInputFormat> vertexInputFormatClass, + Class<? extends EdgeInputFormat> edgeInputFormatClass, Class<? extends VertexOutputFormat> vertexOutputFormatClass, Class<? extends WorkerContext> workerContextClass, Class<? extends MasterCompute> masterComputeClass, - Map<String, String> params, String... data) throws Exception { + Map<String, String> params, + String[] vertexInputData, + String[] edgeInputData) throws Exception { + + boolean useVertexInputFormat = vertexInputFormatClass != null; + boolean useEdgeInputFormat = edgeInputFormatClass != null; File tmpDir = null; try { // Prepare input file, output folder and temporary folders tmpDir = FileUtils.createTestDir(vertexClass); - File inputFile = FileUtils.createTempFile(tmpDir, "graph.txt"); + + File vertexInputFile = null; + File edgeInputFile = null; + if (useVertexInputFormat) { + vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt"); + } + if (useEdgeInputFormat) { + edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt"); + } + File outputDir = FileUtils.createTempDir(tmpDir, "output"); File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints"); // Write input data to disk - FileUtils.writeLines(inputFile, data); + if (useVertexInputFormat) { + FileUtils.writeLines(vertexInputFile, vertexInputData); + } + if (useEdgeInputFormat) { + FileUtils.writeLines(edgeInputFile, edgeInputData); + } // Create and configure the job to run the vertex GiraphJob job = new GiraphJob(vertexClass.getName()); job.getConfiguration().setVertexClass(vertexClass); - job.getConfiguration().setVertexInputFormatClass( - vertexInputFormatClass); + if (useVertexInputFormat) { + job.getConfiguration().setVertexInputFormatClass( + vertexInputFormatClass); + } + if (useEdgeInputFormat) { + job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass); + } job.getConfiguration().setVertexOutputFormatClass( vertexOutputFormatClass); if (workerContextClass != null) { @@ -185,8 +245,14 @@ public class InternalVertexRunner { conf.set(param.getKey(), param.getValue()); } - FileInputFormat.addInputPath(job.getInternalJob(), - new Path(inputFile.toString())); + if (useVertexInputFormat) { + GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(), + new Path(vertexInputFile.toString())); + } + if (useEdgeInputFormat) { + GiraphFileInputFormat.addEdgeInputPath(job.getInternalJob(), + new Path(edgeInputFile.toString())); + } FileOutputFormat.setOutputPath(job.getInternalJob(), new Path(outputDir.toString())); @@ -235,6 +301,7 @@ public class InternalVertexRunner { FileUtils.delete(tmpDir); } } + // CHECKSTYLE: resume ParameterNumberCheck /** * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1405175&r1=1405174&r2=1405175&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Fri Nov 2 21:37:30 2012 @@ -18,15 +18,8 @@ package org.apache.giraph; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; - -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.io.Closeables; import org.apache.giraph.examples.GeneratedVertexReader; +import org.apache.giraph.graph.EdgeInputFormat; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.MasterCompute; import org.apache.giraph.graph.Vertex; @@ -36,13 +29,26 @@ import org.apache.giraph.graph.WorkerCon import org.apache.giraph.utils.FileUtils; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.junit.After; import org.junit.Before; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.io.Closeables; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + /** * Extended TestCase for making setting up Bsp testing. */ @@ -211,17 +217,49 @@ public class BspCase implements Watcher Class<? extends VertexInputFormat> vertexInputFormatClass, Class<? extends VertexOutputFormat> vertexOutputFormatClass, Path outputPath) throws IOException { + return prepareJob(name, vertexClass, workerContextClass, + masterComputeClass, vertexInputFormatClass, null, + vertexOutputFormatClass, outputPath); + } + + /** + * Prepare a GiraphJob for test purposes + * + * @param name identifying name for the job + * @param vertexClass class of the vertex to run + * @param workerContextClass class of the workercontext to use + * @param masterComputeClass class of mastercompute to use + * @param vertexInputFormatClass vertex inputformat to use + * @param edgeInputFormatClass edge inputformat to use + * @param vertexOutputFormatClass outputformat to use + * @param outputPath destination path for the output + * @return fully configured job instance + * @throws IOException + */ + protected GiraphJob prepareJob( + String name, + Class<? extends Vertex> vertexClass, + Class<? extends WorkerContext> workerContextClass, + Class<? extends MasterCompute> masterComputeClass, + Class<? extends VertexInputFormat> vertexInputFormatClass, + Class<? extends EdgeInputFormat> edgeInputFormatClass, + Class<? extends VertexOutputFormat> vertexOutputFormatClass, + Path outputPath) throws IOException { GiraphJob job = new GiraphJob(name); setupConfiguration(job); job.getConfiguration().setVertexClass(vertexClass); - job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass); - if (workerContextClass != null) { job.getConfiguration().setWorkerContextClass(workerContextClass); } if (masterComputeClass != null) { job.getConfiguration().setMasterComputeClass(masterComputeClass); } + if (vertexInputFormatClass != null) { + job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass); + } + if (edgeInputFormatClass != null) { + job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass); + } if (vertexOutputFormatClass != null) { job.getConfiguration().setVertexOutputFormatClass( vertexOutputFormatClass); Added: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java?rev=1405175&view=auto ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java (added) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java Fri Nov 2 21:37:30 2012 @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph; + +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.io.IdWithValueTextOutputFormat; +import org.apache.giraph.io.IntIntTextVertexValueInputFormat; +import org.apache.giraph.io.IntNullTextEdgeInputFormat; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Maps; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * A test case to ensure that loading a graph from a list of edges works as + * expected. + */ +public class TestEdgeInput extends BspCase { + public TestEdgeInput() { + super(TestEdgeInput.class.getName()); + } + + // It should be able to build a graph starting from the edges only. + // Vertices should be implicitly created with default values. + @Test + public void testEdgesOnly() throws Exception { + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1" + }; + + Iterable<String> results = InternalVertexRunner.run( + TestVertexWithNumEdges.class, + null, + null, + IntNullTextEdgeInputFormat.class, + IdWithValueTextOutputFormat.class, + null, + null, + Collections.<String, String>emptyMap(), + null, + edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with outgoing edges have been created + assertEquals(3, values.size()); + // Check the number of edges for each vertex + assertEquals(1, (int) values.get(1)); + assertEquals(2, (int) values.get(2)); + assertEquals(1, (int) values.get(4)); + } + + // It should be able to build a graph by specifying vertex data and edges + // as separate input formats. + @Test + public void testMixedFormat() throws Exception { + String[] vertices = new String[] { + "1 75", + "2 34", + "3 13", + "4 32" + }; + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1", + "5 3" + }; + + // Run a job with a vertex that does nothing + Iterable<String> results = InternalVertexRunner.run( + TestVertexDoNothing.class, + null, + IntIntTextVertexValueInputFormat.class, + IntNullTextEdgeInputFormat.class, + IdWithValueTextOutputFormat.class, + null, + null, + Collections.<String, String>emptyMap(), + vertices, + edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with either initial values or outgoing edges + // have been created + assertEquals(5, values.size()); + // Check that the vertices have been created with correct values + assertEquals(75, (int) values.get(1)); + assertEquals(34, (int) values.get(2)); + assertEquals(13, (int) values.get(3)); + assertEquals(32, (int) values.get(4)); + // A vertex with edges but no initial value should have the default value + assertEquals(0, (int) values.get(5)); + + // Run a job with a vertex that counts outgoing edges + results = InternalVertexRunner.run( + TestVertexWithNumEdges.class, + null, + IntIntTextVertexValueInputFormat.class, + IntNullTextEdgeInputFormat.class, + IdWithValueTextOutputFormat.class, + null, + null, + Collections.<String, String>emptyMap(), + vertices, + edges); + + values = parseResults(results); + + // Check the number of edges for each vertex + assertEquals(1, (int) values.get(1)); + assertEquals(2, (int) values.get(2)); + assertEquals(0, (int) values.get(3)); + assertEquals(1, (int) values.get(4)); + assertEquals(1, (int) values.get(5)); + } + + public static class TestVertexWithNumEdges extends EdgeListVertex<IntWritable, + IntWritable, NullWritable, NullWritable> { + @Override + public void compute(Iterable<NullWritable> messages) throws IOException { + setValue(new IntWritable(getNumEdges())); + voteToHalt(); + } + } + + public static class TestVertexDoNothing extends EdgeListVertex<IntWritable, + IntWritable, NullWritable, NullWritable> { + @Override + public void compute(Iterable<NullWritable> messages) throws IOException { + voteToHalt(); + } + } + + private static Map<Integer, Integer> parseResults(Iterable<String> results) { + Map<Integer, Integer> values = Maps.newHashMap(); + for (String line : results) { + String[] tokens = line.split("\\s+"); + int id = Integer.valueOf(tokens[0]); + int value = Integer.valueOf(tokens[1]); + values.put(id, value); + } + return values; + } +} Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1405175&r1=1405174&r2=1405175&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java Fri Nov 2 21:37:30 2012 @@ -17,22 +17,22 @@ */ package org.apache.giraph; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; - import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark; import org.apache.giraph.benchmark.PageRankComputation; -import org.apache.giraph.io.PseudoRandomVertexInputFormat; import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.io.GiraphFileInputFormat; import org.apache.giraph.io.JsonBase64VertexInputFormat; import org.apache.giraph.io.JsonBase64VertexOutputFormat; +import org.apache.giraph.io.PseudoRandomVertexInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + /** * Test out the JsonBase64 format. */ @@ -78,7 +78,7 @@ public class TestJsonBase64Format extend JsonBase64VertexOutputFormat.class, outputPath2); job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3); - FileInputFormat.setInputPaths(job.getInternalJob(), outputPath); + GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(), outputPath); assertTrue(job.run(true)); Path outputPath3 = getTempPath(getCallingMethodName() + "3"); Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1405175&r1=1405174&r2=1405175&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java Fri Nov 2 21:37:30 2012 @@ -168,7 +168,7 @@ public class TestTextDoubleDoubleAdjacen @Test public void testHappyPath() throws Exception { - String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345"; + String input = "Hi\t0\tMyEdgeInputFormat\t1.123\tBomdia\t2.234\tOla\t3.345"; when(rr.getCurrentValue()).thenReturn(new Text(input)); TextVertexReader vr = createVertexReader(rr); @@ -179,7 +179,7 @@ public class TestTextDoubleDoubleAdjacen vr.getCurrentVertex(); setGraphState(vertex, graphState); assertValidVertex(conf, graphState, vertex, new Text("Hi"), new DoubleWritable(0), - new Edge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)), + new Edge<Text, DoubleWritable>(new Text("MyEdgeInputFormat"), new DoubleWritable(1.123d)), new Edge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)), new Edge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d))); assertEquals(vertex.getNumEdges(), 3); @@ -187,7 +187,7 @@ public class TestTextDoubleDoubleAdjacen @Test public void testLineSanitizer() throws Exception { - String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001"; + String input = "Bye\t0.01\tMyEdgeInputFormat\t1.001\tTchau\t2.0001\tAdios\t3.00001"; AdjacencyListTextVertexInputFormat.LineSanitizer toUpper = new AdjacencyListTextVertexInputFormat.LineSanitizer() {
