Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java?rev=1405175&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java 
(added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java 
Fri Nov  2 21:37:30 2012
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * edge output format.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextEdgeInputFormat<I extends WritableComparable,
+    E extends Writable> extends EdgeInputFormat<I, E> {
+  /** Underlying GiraphTextInputFormat. */
+  protected GiraphTextInputFormat textInputFormat = new 
GiraphTextInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(
+      JobContext context, int numWorkers) throws IOException,
+      InterruptedException {
+    // Ignore the hint of numWorkers here since we are using
+    // GiraphTextInputFormat to do this for us
+    return textInputFormat.getEdgeSplits(context);
+  }
+
+  @Override
+  public abstract TextEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+
+  /**
+   * {@link EdgeReader} for {@link TextEdgeInputFormat}.
+   */
+  protected abstract class TextEdgeReader implements EdgeReader<I, E> {
+    /** Internal line record reader */
+    private RecordReader<LongWritable, Text> lineRecordReader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+    /**
+     * Cached configuration. We don't care about vertex value and message type.
+     */
+    private ImmutableClassesGiraphConfiguration<I, NullWritable, E,
+        NullWritable> conf;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      this.context = context;
+      conf = new ImmutableClassesGiraphConfiguration<I, NullWritable, E,
+          NullWritable>(context.getConfiguration());
+      lineRecordReader = createLineRecordReader(inputSplit, context);
+      lineRecordReader.initialize(inputSplit, context);
+    }
+
+    /**
+     * Create the line record reader. Override this to use a different
+     * underlying record reader (useful for testing).
+     *
+     * @param inputSplit
+     *          the split to read
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record reader to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
+     */
+    protected RecordReader<LongWritable, Text>
+    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return textInputFormat.createRecordReader(inputSplit, context);
+    }
+
+    @Override
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return lineRecordReader.getProgress();
+    }
+
+    /**
+     * Get the line record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<LongWritable, Text> getRecordReader() {
+      return lineRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+
+    /**
+     * Get the configuration.
+     *
+     * @return Configuration for this reader
+     */
+    protected ImmutableClassesGiraphConfiguration<I, NullWritable, E,
+        NullWritable> getConf() {
+      return conf;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read an edge from each
+   * text line.
+   */
+  protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
+    @Override
+    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+        InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      I sourceVertexId = getSourceVertexId(line);
+      I targetVertexId = getTargetVertexId(line);
+      E edgeValue = getValue(line);
+      return new EdgeWithSource<I, E>(sourceVertexId,
+          new Edge<I, E>(targetVertexId, edgeValue));
+    }
+
+    @Override
+    public final boolean nextEdge() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Reads source vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the source vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getSourceVertexId(Text line) throws IOException;
+
+
+    /**
+     * Reads target vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the target vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getTargetVertexId(Text line) throws IOException;
+
+    /**
+     * Reads edge value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the edge value corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract E getValue(Text line) throws IOException;
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read an edge from each
+   * text line after preprocessing it.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   */
+  protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
+      TextEdgeReader {
+    @Override
+    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+        InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      T processed = preprocessLine(line);
+      I sourceVertexId = getSourceVertexId(processed);
+      I targetVertexId = getTargetVertexId(processed);
+      E edgeValue = getValue(processed);
+      return new EdgeWithSource<I, E>(sourceVertexId,
+          new Edge<I, E>(targetVertexId, edgeValue));
+    }
+
+    @Override
+    public final boolean nextEdge() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating edge
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws IOException;
+
+    /**
+     * Reads target vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the target vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getTargetVertexId(T line) throws IOException;
+
+    /**
+     * Reads source vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the source vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getSourceVertexId(T line) throws IOException;
+
+    /**
+     * Reads edge value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the edge value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract E getValue(T line) throws IOException;
+  }
+}

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
 Fri Nov  2 21:37:30 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
 import java.io.IOException;
 import java.util.List;
@@ -39,7 +38,7 @@ import java.util.Map;
 
 /**
  * Abstract class that users should subclass to use their own text based
- * vertex output format.
+ * vertex input format.
  *
  * @param <I> Vertex index value
  * @param <V> Vertex value
@@ -51,15 +50,15 @@ public abstract class TextVertexInputFor
     V extends Writable, E extends Writable, M extends Writable>
     extends VertexInputFormat<I, V, E, M> {
 
-  /** Uses the TextInputFormat to do everything */
-  protected TextInputFormat textInputFormat = new TextInputFormat();
+  /** Uses the GiraphTextInputFormat to do everything */
+  protected GiraphTextInputFormat textInputFormat = new 
GiraphTextInputFormat();
 
   @Override
   public List<InputSplit> getSplits(JobContext context, int numWorkers)
     throws IOException, InterruptedException {
-    // Ignore the hint of numWorkers here since we are using TextInputFormat
-    // to do this for us
-    return textInputFormat.getSplits(context);
+    // Ignore the hint of numWorkers here since we are using
+    // GiraphTextInputFormat to do this for us
+    return textInputFormat.getVertexSplits(context);
   }
 
   /**

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java?rev=1405175&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
 Fri Nov  2 21:37:30 2012
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.VertexValueInputFormat;
+import org.apache.giraph.graph.VertexValueReader;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * vertex value input format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexValueInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexValueInputFormat<I, V, E, M> {
+  /** Uses the GiraphTextInputFormat to do everything */
+  protected GiraphTextInputFormat textInputFormat = new 
GiraphTextInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // Ignore the hint of numWorkers here since we are using
+    // GiraphTextInputFormat to do this for us
+    return textInputFormat.getVertexSplits(context);
+  }
+
+  @Override
+  public abstract TextVertexValueReader createVertexValueReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+
+  /**
+   * {@link VertexValueReader} for {@link VertexValueInputFormat}.
+   */
+  protected abstract class TextVertexValueReader extends
+      VertexValueReader<I, V, E, M> {
+    /** Internal line record reader */
+    private RecordReader<LongWritable, Text> lineRecordReader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+      this.context = context;
+      lineRecordReader = createLineRecordReader(inputSplit, context);
+      lineRecordReader.initialize(inputSplit, context);
+    }
+
+    /**
+     * Create the line record reader. Override this to use a different
+     * underlying record reader (useful for testing).
+     *
+     * @param inputSplit
+     *          the split to read
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record reader to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
+     */
+    protected RecordReader<LongWritable, Text>
+    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return textInputFormat.createRecordReader(inputSplit, context);
+    }
+
+    @Override
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return lineRecordReader.getProgress();
+    }
+
+    /**
+     * Get the line record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<LongWritable, Text> getRecordReader() {
+      return lineRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read a vertex value from
+   * each text line.
+   */
+  protected abstract class TextVertexValueReaderFromEachLine extends
+      TextVertexValueReader {
+    @Override
+    public final I getCurrentVertexId() throws IOException,
+        InterruptedException {
+      return getId(getRecordReader().getCurrentValue());
+    }
+
+    @Override
+    public final V getCurrentVertexValue() throws IOException,
+        InterruptedException {
+      return getValue(getRecordReader().getCurrentValue());
+    }
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException 
{
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Reads vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(Text line) throws IOException;
+
+    /**
+     * Reads vertex value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex value corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(Text line) throws IOException;
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read a vertex value from
+   * each text line after preprocessing it.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   */
+  protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
+      extends TextVertexValueReader {
+    /** Last preprocessed line. */
+    private T processedLine = null;
+
+    /** Get last preprocessed line. Generate it if missing.
+     *
+     * @return The last preprocessed line
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private T getProcessedLine() throws IOException, InterruptedException {
+      if (processedLine == null) {
+        processedLine = preprocessLine(getRecordReader().getCurrentValue());
+      }
+      return processedLine;
+    }
+
+    @Override
+    public I getCurrentVertexId() throws IOException,
+        InterruptedException {
+      return getId(getProcessedLine());
+    }
+
+    @Override
+    public V getCurrentVertexValue() throws IOException,
+        InterruptedException {
+      return getValue(getProcessedLine());
+    }
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException 
{
+      processedLine = null;
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating vertex.
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws IOException;
+
+    /**
+     * Reads vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(T line) throws IOException;
+
+    /**
+     * Reads vertex value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(T line) throws IOException;
+  }
+}

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java 
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java 
Fri Nov  2 21:37:30 2012
@@ -18,13 +18,14 @@
 
 package org.apache.giraph.utils;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Closeables;
-import com.google.common.io.Files;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
@@ -114,7 +115,7 @@ public class FileUtils {
    * @param lines Strings written to the file
    * @throws IOException
    */
-  public static void writeLines(File file, String... lines)
+  public static void writeLines(File file, String[] lines)
     throws IOException {
     Writer writer = Files.newWriter(file, Charsets.UTF_8);
     try {

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java 
(added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java Fri 
Nov  2 21:37:30 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * A pair of integers.
+ */
+public class IntPair {
+  /** First element. */
+  private int first;
+  /** Second element. */
+  private int second;
+
+  /** Constructor.
+   *
+   * @param fst First element
+   * @param snd Second element
+   */
+  public IntPair(int fst, int snd) {
+    first = fst;
+    second = snd;
+  }
+
+  /**
+   * Get the first element.
+   *
+   * @return The first element
+   */
+  public int getFirst() {
+    return first;
+  }
+
+  /**
+   * Set the first element.
+   *
+   * @param first The first element
+   */
+  public void setFirst(int first) {
+    this.first = first;
+  }
+
+  /**
+   * Get the second element.
+   *
+   * @return The second element
+   */
+  public int getSecond() {
+    return second;
+  }
+
+  /**
+   * Set the second element.
+   *
+   * @param second The second element
+   */
+  public void setSecond(int second) {
+    this.second = second;
+  }
+}

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 Fri Nov  2 21:37:30 2012
@@ -18,14 +18,8 @@
 
 package org.apache.giraph.utils;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.Vertex;
@@ -33,9 +27,9 @@ import org.apache.giraph.graph.VertexCom
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.io.GiraphFileInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
@@ -44,6 +38,13 @@ import org.apache.zookeeper.server.quoru
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  * A base class for running internal tests on a vertex
  *
@@ -81,7 +82,8 @@ public class InternalVertexRunner {
       Class<? extends Vertex> vertexClass,
       Class<? extends VertexInputFormat> vertexInputFormatClass,
       Class<? extends VertexOutputFormat> vertexOutputFormatClass,
-      Map<String, String> params, String... data) throws Exception {
+      Map<String, String> params,
+      String[] data) throws Exception {
     return run(vertexClass, null, vertexInputFormatClass,
         vertexOutputFormatClass, params, data);
   }
@@ -107,7 +109,7 @@ public class InternalVertexRunner {
       Class<? extends VertexInputFormat> vertexInputFormatClass,
       Class<? extends VertexOutputFormat> vertexOutputFormatClass,
       Map<String, String> params,
-      String... data) throws Exception {
+      String[] data) throws Exception {
     return InternalVertexRunner.run(vertexClass, vertexCombinerClass,
         vertexInputFormatClass, vertexOutputFormatClass, null, null, params,
         data);
@@ -129,32 +131,90 @@ public class InternalVertexRunner {
    * @throws Exception
    */
   @SuppressWarnings("rawtypes")
+  public static Iterable<String> run(
+      Class<? extends Vertex> vertexClass,
+      Class<? extends VertexCombiner> vertexCombinerClass,
+      Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass,
+      Class<? extends WorkerContext> workerContextClass,
+      Class<? extends MasterCompute> masterComputeClass,
+      Map<String, String> params,
+      String[] data) throws Exception {
+    return run(vertexClass, vertexCombinerClass, vertexInputFormatClass, null,
+        vertexOutputFormatClass, workerContextClass, masterComputeClass,
+        params, data, null);
+  }
+
+  // CHECKSTYLE: stop ParameterNumberCheck
+  /**
+   * Attempts to run the vertex internally in the current JVM, reading from and
+   * writing to a temporary folder on local disk. Will start its own zookeeper
+   * instance.
+   * @param vertexClass the vertex class to instantiate
+   * @param vertexCombinerClass the vertex combiner to use (or null)
+   * @param vertexInputFormatClass the vertex inputformat to use
+   * @param edgeInputFormatClass the edge inputformat to use
+   * @param vertexOutputFormatClass the outputformat to use
+   * @param workerContextClass the worker context to use
+   * @param masterComputeClass the master compute class to use
+   * @param params a map of parameters to add to the hadoop configuration
+   * @param vertexInputData linewise vertex input data
+   * @param edgeInputData linewise edge input data
+   * @return linewise output data
+   * @throws Exception
+   */
+  @SuppressWarnings("rawtypes")
   public static Iterable<String> run(Class<? extends Vertex> vertexClass,
       Class<? extends VertexCombiner> vertexCombinerClass,
       Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends EdgeInputFormat> edgeInputFormatClass,
       Class<? extends VertexOutputFormat> vertexOutputFormatClass,
       Class<? extends WorkerContext> workerContextClass,
       Class<? extends MasterCompute> masterComputeClass,
-      Map<String, String> params, String... data) throws Exception {
+      Map<String, String> params,
+      String[] vertexInputData,
+      String[] edgeInputData) throws Exception {
+
+    boolean useVertexInputFormat = vertexInputFormatClass != null;
+    boolean useEdgeInputFormat = edgeInputFormatClass != null;
 
     File tmpDir = null;
     try {
       // Prepare input file, output folder and temporary folders
       tmpDir = FileUtils.createTestDir(vertexClass);
-      File inputFile = FileUtils.createTempFile(tmpDir, "graph.txt");
+
+      File vertexInputFile = null;
+      File edgeInputFile = null;
+      if (useVertexInputFormat) {
+        vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
+      }
+      if (useEdgeInputFormat) {
+        edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
+      }
+
       File outputDir = FileUtils.createTempDir(tmpDir, "output");
       File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
       File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
       File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
 
       // Write input data to disk
-      FileUtils.writeLines(inputFile, data);
+      if (useVertexInputFormat) {
+        FileUtils.writeLines(vertexInputFile, vertexInputData);
+      }
+      if (useEdgeInputFormat) {
+        FileUtils.writeLines(edgeInputFile, edgeInputData);
+      }
 
       // Create and configure the job to run the vertex
       GiraphJob job = new GiraphJob(vertexClass.getName());
       job.getConfiguration().setVertexClass(vertexClass);
-      job.getConfiguration().setVertexInputFormatClass(
-          vertexInputFormatClass);
+      if (useVertexInputFormat) {
+        job.getConfiguration().setVertexInputFormatClass(
+            vertexInputFormatClass);
+      }
+      if (useEdgeInputFormat) {
+        job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
+      }
       job.getConfiguration().setVertexOutputFormatClass(
           vertexOutputFormatClass);
       if (workerContextClass != null) {
@@ -185,8 +245,14 @@ public class InternalVertexRunner {
         conf.set(param.getKey(), param.getValue());
       }
 
-      FileInputFormat.addInputPath(job.getInternalJob(),
-                                   new Path(inputFile.toString()));
+      if (useVertexInputFormat) {
+        GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(),
+            new Path(vertexInputFile.toString()));
+      }
+      if (useEdgeInputFormat) {
+        GiraphFileInputFormat.addEdgeInputPath(job.getInternalJob(),
+            new Path(edgeInputFile.toString()));
+      }
       FileOutputFormat.setOutputPath(job.getInternalJob(),
                                      new Path(outputDir.toString()));
 
@@ -235,6 +301,7 @@ public class InternalVertexRunner {
       FileUtils.delete(tmpDir);
     }
   }
+  // CHECKSTYLE: resume ParameterNumberCheck
 
   /**
    * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Fri Nov  2 
21:37:30 2012
@@ -18,15 +18,8 @@
 
 package org.apache.giraph;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
 import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.Vertex;
@@ -36,13 +29,26 @@ import org.apache.giraph.graph.WorkerCon
 import org.apache.giraph.utils.FileUtils;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.junit.After;
 import org.junit.Before;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
 /**
  * Extended TestCase for making setting up Bsp testing.
  */
@@ -211,17 +217,49 @@ public class BspCase implements Watcher 
       Class<? extends VertexInputFormat> vertexInputFormatClass,
       Class<? extends VertexOutputFormat> vertexOutputFormatClass,
       Path outputPath) throws IOException {
+    return prepareJob(name, vertexClass, workerContextClass,
+        masterComputeClass, vertexInputFormatClass, null,
+        vertexOutputFormatClass, outputPath);
+  }
+
+  /**
+   * Prepare a GiraphJob for test purposes
+   *
+   * @param name  identifying name for the job
+   * @param vertexClass class of the vertex to run
+   * @param workerContextClass class of the workercontext to use
+   * @param masterComputeClass class of mastercompute to use
+   * @param vertexInputFormatClass  vertex inputformat to use
+   * @param edgeInputFormatClass  edge inputformat to use
+   * @param vertexOutputFormatClass outputformat to use
+   * @param outputPath  destination path for the output
+   * @return  fully configured job instance
+   * @throws IOException
+   */
+  protected GiraphJob prepareJob(
+      String name,
+      Class<? extends Vertex> vertexClass,
+      Class<? extends WorkerContext> workerContextClass,
+      Class<? extends MasterCompute> masterComputeClass,
+      Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends EdgeInputFormat> edgeInputFormatClass,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass,
+      Path outputPath) throws IOException {
     GiraphJob job = new GiraphJob(name);
     setupConfiguration(job);
     job.getConfiguration().setVertexClass(vertexClass);
-    job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
-
     if (workerContextClass != null) {
       job.getConfiguration().setWorkerContextClass(workerContextClass);
     }
     if (masterComputeClass != null) {
       job.getConfiguration().setMasterComputeClass(masterComputeClass);
     }
+    if (vertexInputFormatClass != null) {
+      job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
+    }
+    if (edgeInputFormatClass != null) {
+      job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
+    }
     if (vertexOutputFormatClass != null) {
       job.getConfiguration().setVertexOutputFormatClass(
           vertexOutputFormatClass);

Added: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java 
(added)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java Fri 
Nov  2 21:37:30 2012
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.io.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.IntIntTextVertexValueInputFormat;
+import org.apache.giraph.io.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A test case to ensure that loading a graph from a list of edges works as
+ * expected.
+ */
+public class TestEdgeInput extends BspCase {
+  public TestEdgeInput() {
+    super(TestEdgeInput.class.getName());
+  }
+
+  // It should be able to build a graph starting from the edges only.
+  // Vertices should be implicitly created with default values.
+  @Test
+  public void testEdgesOnly() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    Iterable<String> results = InternalVertexRunner.run(
+        TestVertexWithNumEdges.class,
+        null,
+        null,
+        IntNullTextEdgeInputFormat.class,
+        IdWithValueTextOutputFormat.class,
+        null,
+        null,
+        Collections.<String, String>emptyMap(),
+        null,
+        edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges have been created
+    assertEquals(3, values.size());
+    // Check the number of edges for each vertex
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(1, (int) values.get(4));
+  }
+
+  // It should be able to build a graph by specifying vertex data and edges
+  // as separate input formats.
+  @Test
+  public void testMixedFormat() throws Exception {
+    String[] vertices = new String[] {
+        "1 75",
+        "2 34",
+        "3 13",
+        "4 32"
+    };
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1",
+        "5 3"
+    };
+
+    // Run a job with a vertex that does nothing
+    Iterable<String> results = InternalVertexRunner.run(
+        TestVertexDoNothing.class,
+        null,
+        IntIntTextVertexValueInputFormat.class,
+        IntNullTextEdgeInputFormat.class,
+        IdWithValueTextOutputFormat.class,
+        null,
+        null,
+        Collections.<String, String>emptyMap(),
+        vertices,
+        edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with either initial values or outgoing edges
+    // have been created
+    assertEquals(5, values.size());
+    // Check that the vertices have been created with correct values
+    assertEquals(75, (int) values.get(1));
+    assertEquals(34, (int) values.get(2));
+    assertEquals(13, (int) values.get(3));
+    assertEquals(32, (int) values.get(4));
+    // A vertex with edges but no initial value should have the default value
+    assertEquals(0, (int) values.get(5));
+
+    // Run a job with a vertex that counts outgoing edges
+    results = InternalVertexRunner.run(
+        TestVertexWithNumEdges.class,
+        null,
+        IntIntTextVertexValueInputFormat.class,
+        IntNullTextEdgeInputFormat.class,
+        IdWithValueTextOutputFormat.class,
+        null,
+        null,
+        Collections.<String, String>emptyMap(),
+        vertices,
+        edges);
+
+    values = parseResults(results);
+
+    // Check the number of edges for each vertex
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(0, (int) values.get(3));
+    assertEquals(1, (int) values.get(4));
+    assertEquals(1, (int) values.get(5));
+  }
+
+  public static class TestVertexWithNumEdges extends 
EdgeListVertex<IntWritable,
+      IntWritable, NullWritable, NullWritable> {
+    @Override
+    public void compute(Iterable<NullWritable> messages) throws IOException {
+      setValue(new IntWritable(getNumEdges()));
+      voteToHalt();
+    }
+  }
+
+  public static class TestVertexDoNothing extends EdgeListVertex<IntWritable,
+      IntWritable, NullWritable, NullWritable> {
+    @Override
+    public void compute(Iterable<NullWritable> messages) throws IOException {
+      voteToHalt();
+    }
+  }
+
+  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
+    Map<Integer, Integer> values = Maps.newHashMap();
+    for (String line : results) {
+      String[] tokens = line.split("\\s+");
+      int id = Integer.valueOf(tokens[0]);
+      int value = Integer.valueOf(tokens[1]);
+      values.put(id, value);
+    }
+    return values;
+  }
+}

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java 
(original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java 
Fri Nov  2 21:37:30 2012
@@ -17,22 +17,22 @@
  */
 
 package org.apache.giraph;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
 import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
 import org.apache.giraph.benchmark.PageRankComputation;
-import org.apache.giraph.io.PseudoRandomVertexInputFormat;
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.GiraphFileInputFormat;
 import org.apache.giraph.io.JsonBase64VertexInputFormat;
 import org.apache.giraph.io.JsonBase64VertexOutputFormat;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
 /**
  * Test out the JsonBase64 format.
  */
@@ -78,7 +78,7 @@ public class TestJsonBase64Format extend
         JsonBase64VertexOutputFormat.class,
         outputPath2);
     job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3);
-    FileInputFormat.setInputPaths(job.getInternalJob(), outputPath);
+    GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(), outputPath);
     assertTrue(job.run(true));
 
     Path outputPath3 = getTempPath(getCallingMethodName() + "3");

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
 (original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
 Fri Nov  2 21:37:30 2012
@@ -168,7 +168,7 @@ public class TestTextDoubleDoubleAdjacen
 
   @Test
   public void testHappyPath() throws Exception {
-    String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345";
+    String input = 
"Hi\t0\tMyEdgeInputFormat\t1.123\tBomdia\t2.234\tOla\t3.345";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
     TextVertexReader vr = createVertexReader(rr);
@@ -179,7 +179,7 @@ public class TestTextDoubleDoubleAdjacen
         vr.getCurrentVertex();
     setGraphState(vertex, graphState);
     assertValidVertex(conf, graphState, vertex, new Text("Hi"), new 
DoubleWritable(0),
-        new Edge<Text, DoubleWritable>(new Text("Ciao"), new 
DoubleWritable(1.123d)),
+        new Edge<Text, DoubleWritable>(new Text("MyEdgeInputFormat"), new 
DoubleWritable(1.123d)),
         new Edge<Text, DoubleWritable>(new Text("Bomdia"), new 
DoubleWritable(2.234d)),
         new Edge<Text, DoubleWritable>(new Text("Ola"), new 
DoubleWritable(3.345d)));
     assertEquals(vertex.getNumEdges(), 3);
@@ -187,7 +187,7 @@ public class TestTextDoubleDoubleAdjacen
 
   @Test
   public void testLineSanitizer() throws Exception {
-    String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001";
+    String input = 
"Bye\t0.01\tMyEdgeInputFormat\t1.001\tTchau\t2.0001\tAdios\t3.00001";
 
     AdjacencyListTextVertexInputFormat.LineSanitizer toUpper =
         new AdjacencyListTextVertexInputFormat.LineSanitizer() {


Reply via email to