GIRAPH-732

Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ae01f039
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ae01f039
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ae01f039

Branch: refs/heads/trunk
Commit: ae01f0399cae6baab045b2ece0d71096aebe8ca3
Parents: fa6b754
Author: Claudio Martella <[email protected]>
Authored: Mon Aug 26 22:15:18 2013 +0200
Committer: Claudio Martella <[email protected]>
Committed: Mon Aug 26 22:15:18 2013 +0200

----------------------------------------------------------------------
 CHANGELOG                                       |    2 +
 .../java/org/apache/giraph/GiraphRunner.java    |    2 +-
 .../org/apache/giraph/conf/GiraphClasses.java   |   25 +
 .../apache/giraph/conf/GiraphConfiguration.java |   57 +
 .../org/apache/giraph/conf/GiraphConstants.java |   23 +
 .../ImmutableClassesGiraphConfiguration.java    |   45 +
 .../org/apache/giraph/io/EdgeOutputFormat.java  |   82 +
 .../java/org/apache/giraph/io/EdgeWriter.java   |   74 +
 .../io/formats/GiraphTextOutputFormat.java      |   90 +
 .../io/formats/IdWithValueTextOutputFormat.java |   19 +-
 .../SrcIdDstIdEdgeValueTextOutputFormat.java    |   91 ++
 .../giraph/io/formats/TextEdgeOutputFormat.java |  165 ++
 .../io/formats/TextVertexOutputFormat.java      |   15 +-
 .../io/internal/WrappedEdgeOutputFormat.java    |  169 ++
 .../apache/giraph/utils/ConfigurationUtils.java |   53 +-
 .../apache/giraph/worker/BspServiceWorker.java  |  124 +-
 .../giraph/worker/BspServiceWorker.java.orig    | 1535 ++++++++++++++++++
 ...TestSrcIdDstIdEdgeValueTextOutputFormat.java |  114 ++
 18 files changed, 2659 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index af43ef8..deca52b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-732: EdgeOutputFormat (aarmax00 via claudio)
+
   GIRAPH-512: JavaDoc warnings (tdn120 via nitay)
 
   GIRAPH-736: Bring back FindBugs (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java 
b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 1bd79b5..9af50e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -102,7 +102,7 @@ public class GiraphRunner implements Tool {
    */
   private void prepareHadoopMRJob(final GiraphJob job, final CommandLine cmd)
     throws Exception {
-    if (cmd.hasOption("of")) {
+    if (cmd.hasOption("vof") || cmd.hasOption("eof")) {
       if (cmd.hasOption("op")) {
         FileOutputFormat.setOutputPath(job.getInternalJob(),
           new Path(cmd.getOptionValue("op")));

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 71fe885..f97446f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -28,6 +28,7 @@ import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -85,6 +86,9 @@ public class GiraphClasses<I extends WritableComparable,
   /** Edge input format class - cached for fast access */
   protected Class<? extends EdgeInputFormat<I, E>>
   edgeInputFormatClass;
+  /** Edge output format class - cached for fast access */
+  protected Class<? extends EdgeOutputFormat<I, V, E>>
+  edgeOutputFormatClass;
 
   /** Aggregator writer class - cached for fast access */
   protected Class<? extends AggregatorWriter> aggregatorWriterClass;
@@ -168,6 +172,8 @@ public class GiraphClasses<I extends WritableComparable,
         VERTEX_OUTPUT_FORMAT_CLASS.get(conf);
     edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
         EDGE_INPUT_FORMAT_CLASS.get(conf);
+    edgeOutputFormatClass = (Class<? extends EdgeOutputFormat<I, V, E>>)
+        EDGE_OUTPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
     combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
@@ -347,6 +353,25 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Check if EdgeOutputFormat is set
+   *
+   * @return true if EdgeOutputFormat is set
+   */
+  public boolean hasEdgeOutputFormat() {
+    return edgeOutputFormatClass != null;
+  }
+
+  /**
+   * Get VertexOutputFormat set
+   *
+   * @return VertexOutputFormat
+   */
+  public Class<? extends EdgeOutputFormat<I, V, E>>
+  getEdgeOutputFormatClass() {
+    return edgeOutputFormatClass;
+  }
+
+  /**
    * Check if AggregatorWriter is set
    *
    * @return true if AggregatorWriter is set

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 23bcd32..15ff861 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
@@ -345,6 +346,25 @@ public class GiraphConfiguration extends Configuration
     VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
   }
 
+
+  /**
+   * Does the job have a {@link EdgeOutputFormat} subdir?
+   *
+   * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
+   */
+  public boolean hasVertexOutputFormatSubdir() {
+    return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
+  }
+
+  /**
+   * Set the vertex output format path
+   *
+   * @param path path where the verteces will be written
+   */
+  public final void setVertexOutputFormatSubdir(String path) {
+    VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
+  }
+
   /**
    * Check if output should be done during computation
    *
@@ -386,6 +406,43 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Does the job have a {@link EdgeOutputFormat}?
+   *
+   * @return True iff a {@link EdgeOutputFormat} has been specified.
+   */
+  public boolean hasEdgeOutputFormat() {
+    return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
+  }
+
+  /**
+   * Set the edge output format class (optional)
+   *
+   * @param edgeOutputFormatClass Determines how graph is output
+   */
+  public final void setEdgeOutputFormatClass(
+      Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
+    EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
+  }
+
+  /**
+   * Does the job have a {@link EdgeOutputFormat} subdir?
+   *
+   * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
+   */
+  public boolean hasEdgeOutputFormatSubdir() {
+    return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
+  }
+
+  /**
+   * Set the edge output format path
+   *
+   * @param path path where the edges will be written
+   */
+  public final void setEdgeOutputFormatSubdir(String path) {
+    EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
+  }
+
+  /**
    * Get the number of threads to use for writing output in the end of the
    * application. If output format is not thread safe, returns 1.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c276c2a..604729a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -38,6 +38,7 @@ import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -210,6 +211,28 @@ public interface GiraphConstants {
   ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
       ClassConfOption.create("giraph.vertexOutputFormatClass", null,
           VertexOutputFormat.class, "VertexOutputFormat class");
+  /** EdgeOutputFormat sub-directory */
+  StrConfOption VERTEX_OUTPUT_FORMAT_SUBDIR =
+    new StrConfOption("giraph.vertex.output.subdir", "",
+                      "VertexOutputFormat sub-directory");
+  /** EdgeOutputFormat class */
+  ClassConfOption<EdgeOutputFormat> EDGE_OUTPUT_FORMAT_CLASS =
+      ClassConfOption.create("giraph.edgeOutputFormatClass", null,
+          EdgeOutputFormat.class, "EdgeOutputFormat class");
+  /** EdgeOutputFormat sub-directory */
+  StrConfOption EDGE_OUTPUT_FORMAT_SUBDIR =
+    new StrConfOption("giraph.edge.output.subdir", "edges",
+                      "EdgeOutputFormat sub-directory");
+
+  /** GiraphTextOuputFormat Separator */
+  StrConfOption GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR =
+    new StrConfOption("giraph.textoutputformat.separator", "\t",
+                      "GiraphTextOuputFormat Separator");
+  /** Reverse values in the output */
+  BooleanConfOption GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE =
+      new BooleanConfOption("giraph.textoutputformat.reverse", false,
+                            "Reverse values in the output");
+
   /**
    * If you use this option, instead of having saving vertices in the end of
    * application, saveVertex will be called right after each vertex.compute()

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 49a2ebc..2506c21 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -36,11 +36,13 @@ import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
+import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
 import org.apache.giraph.io.internal.WrappedVertexInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
@@ -289,6 +291,49 @@ public class ImmutableClassesGiraphConfiguration<I extends 
WritableComparable,
     return wrappedVertexOutputFormat;
   }
 
+  @Override
+  public boolean hasEdgeOutputFormat() {
+    return classes.hasEdgeOutputFormat();
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.io.EdgeOutputFormat}.
+   *
+   * @return User's edge output format class
+   */
+  public Class<? extends EdgeOutputFormat<I, V, E>>
+  getEdgeOutputFormatClass() {
+    return classes.getEdgeOutputFormatClass();
+  }
+
+  /**
+   * Create a user edge output format class.
+   * Note: Giraph should only use WrappedEdgeOutputFormat,
+   * which makes sure that Configuration parameters are set properly.
+   *
+   * @return Instantiated user edge output format class
+   */
+  private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
+    Class<? extends EdgeOutputFormat<I, V, E>> klass =
+        getEdgeOutputFormatClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
+   * Create a wrapper for user edge output format,
+   * which makes sure that Configuration parameters are set properly in all
+   * methods related to this format.
+   *
+   * @return Wrapper around user edge output format
+   */
+  public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
+    WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
+        new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
+    configureIfPossible(wrappedEdgeOutputFormat);
+    return wrappedEdgeOutputFormat;
+  }
+
   /**
    * Create the proper superstep output, based on the configuration settings.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java 
b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
new file mode 100644
index 0000000..ac4c6ce
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * abstract class which can only write edges
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeOutputFormat<
+    I extends WritableComparable, V extends Writable,
+    E extends Writable> extends
+    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+  /**
+   * Create an edge writer for a given split. The framework will call
+   * {@link EdgeWriter#initialize(TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param context the information about the task
+   * @return a new vertex writer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract EdgeWriter<I, V, E> createEdgeWriter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
+
+  /**
+   * Check for validity of the output-specification for the job.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * <p>This is to validate the output specification for the job when it is
+   * a job is submitted.  Typically checks that it does not already exist,
+   * throwing an exception when it already exists, so that output is not
+   * overwritten.</p>
+   *
+   * @param  context information about the job
+   * @throws IOException when output should not be attempted
+   */
+  public abstract void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   * Get the output committer for this output format. This is responsible
+   * for ensuring the output is committed correctly.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * @param context the task context
+   * @return an output committer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract OutputCommitter getOutputCommitter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java 
b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
new file mode 100644
index 0000000..e5a78c2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeWriter<
+    I extends WritableComparable, V extends Writable,
+    E extends Writable> extends
+    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+
+  /**
+   * Writes the next vertex and associated data
+   *
+   * @param   sourceId    the vertex ID from which the edge originates
+   * @param   sourceValue the vertex value; the vertex is the one from which
+   *                      the edge originates
+   * @param   edge        edge to be written
+   * @throws  IOException
+   * @throws  InterruptedException
+   */
+  public abstract void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+    throws IOException, InterruptedException;
+
+  /**
+   * Use the context to setup writing the edges.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param  context Context used to write the vertices.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract void initialize(TaskAttemptContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   * Close this {@link EdgeWriter} to future operations.
+   *
+   * @param  context the context of the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract void close(TaskAttemptContext context)
+    throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
new file mode 100644
index 0000000..582dea2
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The text output format used for Giraph text writing.
+ */
+public abstract class GiraphTextOutputFormat
+  extends TextOutputFormat<Text, Text> {
+
+  @Override
+  public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
+    throws IOException, InterruptedException {
+    String extension = "";
+    CompressionCodec codec = null;
+    Configuration conf = job.getConfiguration();
+    boolean isCompressed = getCompressOutput(job);
+
+    if (isCompressed) {
+      Class<? extends CompressionCodec> codecClass =
+        getOutputCompressorClass(job, GzipCodec.class);
+      codec =
+        (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+      extension = codec.getDefaultExtension();
+    }
+    Path file = getDefaultWorkFile(job, extension);
+
+    /* adjust the path */
+    FSDataOutputStream fileOut;
+    FileSystem fs = file.getFileSystem(conf);
+    String subdir = getSubdir();
+    if (!subdir.isEmpty()) {
+      Path subdirPath = new Path(subdir);
+      Path subdirAbsPath = new Path(file.getParent(), subdirPath);
+      Path vertexFile = new Path(subdirAbsPath, file.getName());
+      fileOut = fs.create(vertexFile, false);
+    } else {
+      fileOut = fs.create(file, false);
+    }
+
+    String separator = "\t";
+
+    if (!isCompressed) {
+      return new LineRecordWriter<Text, Text>(fileOut, separator);
+    } else {
+      DataOutputStream out =
+        new DataOutputStream(codec.createOutputStream(fileOut));
+      return new LineRecordWriter<Text, Text>(out, separator);
+    }
+  }
+
+  /**
+   * This function is used to provide an additional path level to keep
+   * different text outputs into different directories.
+   *
+   * @return  the subdirectory to be created under the output path
+   */
+  protected abstract String getSubdir();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
index bd69586..e886059 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -77,19 +77,18 @@ public class IdWithValueTextOutputFormat<I extends 
WritableComparable,
     @Override
     protected Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException {
-      String first;
-      String second;
+
+      StringBuilder str = new StringBuilder();
       if (reverseOutput) {
-        first = vertex.getValue().toString();
-        second = vertex.getId().toString();
+        str.append(vertex.getValue().toString());
+        str.append(delimiter);
+        str.append(vertex.getId().toString());
       } else {
-        first = vertex.getId().toString();
-        second = vertex.getValue().toString();
+        str.append(vertex.getId().toString());
+        str.append(delimiter);
+        str.append(vertex.getValue().toString());
       }
-      Text line = new Text(first + delimiter + second);
-      return line;
+      return new Text(str.toString());
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
new file mode 100644
index 0000000..1d7478f
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static 
org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR;
+import static 
org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE;
+
+/**
+ * Write out Edge Value with Source and Destination ID, but not the vertex
+ * value.
+ * This is a demostration output format to show the possibility to separately
+ * output edges from vertices.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class SrcIdDstIdEdgeValueTextOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextEdgeOutputFormat<I, V, E> {
+
+  @Override
+  public TextEdgeWriter createEdgeWriter(TaskAttemptContext context) {
+    return new SrcIdDstIdEdgeValueEdgeWriter();
+  }
+
+  /**
+   * Edge writer used with {@link SrcIdDstIdEdgeValueTextOutputFormat}.
+   */
+  protected class SrcIdDstIdEdgeValueEdgeWriter
+    extends TextEdgeWriterToEachLine {
+
+    /** Saved delimiter */
+    private String delimiter;
+    /** Cached reserve option */
+    private boolean reverseOutput;
+
+    @Override
+    public void initialize(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(context);
+      delimiter = GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR.get(getConf());
+      reverseOutput = GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE.get(getConf());
+    }
+
+    @Override
+    protected Text convertEdgeToLine(I sourceId, V sourceValue, Edge<I, E> 
edge)
+      throws IOException {
+      StringBuilder msg = new StringBuilder();
+      if (reverseOutput) {
+        msg.append(edge.getValue().toString());
+        msg.append(delimiter);
+        msg.append(edge.getTargetVertexId().toString());
+        msg.append(delimiter);
+        msg.append(sourceId.toString());
+      } else {
+        msg.append(sourceId.toString());
+        msg.append(delimiter);
+        msg.append(edge.getTargetVertexId().toString());
+        msg.append(delimiter);
+        msg.append(edge.getValue().toString());
+      }
+      return new Text(msg.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
new file mode 100644
index 0000000..1b20c57
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * edge output format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextEdgeOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends EdgeOutputFormat<I, V, E> {
+  /** Uses the TextOutputFormat to do everything */
+  protected GiraphTextOutputFormat textOutputFormat =
+    new GiraphTextOutputFormat() {
+      @Override
+      protected String getSubdir() {
+        return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf());
+      }
+    };
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    textOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return textOutputFormat.getOutputCommitter(context);
+  }
+
+  /**
+   * The factory method which produces the {@link TextEdgeWriter} used by this
+   * output format.
+   *
+   * @param context  the information about the task
+   * @return         the text edge writer to be used
+   */
+  @Override
+  public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext
+      context) throws IOException, InterruptedException;
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * edge output.  Easiest to ignore the key value separator and only use
+   * key instead.
+   */
+  protected abstract class TextEdgeWriter
+      extends EdgeWriter<I, V, E> {
+    /** Internal line record writer */
+    private RecordWriter<Text, Text> lineRecordWriter;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+           InterruptedException {
+      lineRecordWriter = createLineRecordWriter(context);
+      this.context = context;
+    }
+
+    /**
+     * Create the line record writer. Override this to use a different
+     * underlying record writer (useful for testing).
+     *
+     * @param  context the context passed to initialize
+     * @return the record writer to be used
+     * @throws IOException          exception that can be thrown during 
creation
+     * @throws InterruptedException exception that can be thrown during 
creation
+     */
+    protected RecordWriter<Text, Text> createLineRecordWriter(
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      return textOutputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      lineRecordWriter.close(context);
+    }
+
+    /**
+     * Get the line record writer.
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<Text, Text> getRecordWriter() {
+      return lineRecordWriter;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to write a line for each
+   * edge.
+   */
+  protected abstract class TextEdgeWriterToEachLine extends TextEdgeWriter {
+
+    @Override
+    public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+      throws IOException, InterruptedException {
+
+      // Note we are writing line as key with null value
+      getRecordWriter().write(
+          convertEdgeToLine(sourceId, sourceValue, edge), null);
+    }
+
+    /**
+     * Writes a line for the given edge.
+     *
+     * @param sourceId    the current id of the source vertex
+     * @param sourceValue the current value of the source vertex
+     * @param edge        the current vertex for writing
+     * @return the text line to be written
+     * @throws IOException exception that can be thrown while writing
+     */
+    protected abstract Text convertEdgeToLine(I sourceId,
+      V sourceValue, Edge<I, E> edge) throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
index c91d543..c57ecd7 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.io.formats;
 
 import java.io.IOException;
+
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
@@ -29,7 +30,8 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import static 
org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR;
 
 /**
  * Abstract class that users should subclass to use their own text based
@@ -43,10 +45,14 @@ import 
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 public abstract class TextVertexOutputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends VertexOutputFormat<I, V, E> {
-
   /** Uses the TextOutputFormat to do everything */
-  protected TextOutputFormat<Text, Text> textOutputFormat =
-      new TextOutputFormat<Text, Text>();
+  protected GiraphTextOutputFormat textOutputFormat =
+    new GiraphTextOutputFormat() {
+      @Override
+      protected String getSubdir() {
+        return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf());
+      }
+    };
 
   @Override
   public void checkOutputSpecs(JobContext context)
@@ -161,5 +167,4 @@ public abstract class TextVertexOutputFormat<I extends 
WritableComparable,
     protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
new file mode 100644
index 0000000..2222255
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link EdgeOutputFormat} to make sure proper configuration
+ * parameters are passed around, that user can set parameters in
+ * configuration and they will be available in other methods related to this
+ * format.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public class WrappedEdgeOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends EdgeOutputFormat<I, V, E> {
+
+  /** {@link EdgeOutputFormat} which is wrapped */
+  private final EdgeOutputFormat<I, V, E> originalOutputFormat;
+
+  /**
+   * Constructor
+   *
+   * @param edgeOutputFormat Edge output format to wrap
+   */
+  public WrappedEdgeOutputFormat(
+      EdgeOutputFormat<I, V, E> edgeOutputFormat) {
+    originalOutputFormat = edgeOutputFormat;
+  }
+
+  @Override
+  public EdgeWriter<I, V, E> createEdgeWriter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    final EdgeWriter<I, V, E> edgeWriter =
+        originalOutputFormat.createEdgeWriter(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+    return new EdgeWriter<I, V, E>() {
+      @Override
+      public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+        super.setConf(conf);
+        edgeWriter.setConf(conf);
+      }
+
+      @Override
+      public void initialize(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+        edgeWriter.initialize(
+          HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void close(
+          TaskAttemptContext context) throws IOException, InterruptedException 
{
+        edgeWriter.close(
+          HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+        throws IOException, InterruptedException {
+        edgeWriter.writeEdge(sourceId, sourceValue, edge);
+      }
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    originalOutputFormat.checkOutputSpecs(
+        HadoopUtils.makeJobContext(getConf(), context));
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+
+    final OutputCommitter outputCommitter =
+        originalOutputFormat.getOutputCommitter(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+
+    return new OutputCommitter() {
+      @Override
+      public void setupJob(JobContext context) throws IOException {
+        outputCommitter.setupJob(
+            HadoopUtils.makeJobContext(getConf(), context));
+      }
+
+      @Override
+      public void setupTask(TaskAttemptContext context) throws IOException {
+        outputCommitter.setupTask(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public boolean needsTaskCommit(
+          TaskAttemptContext context) throws IOException {
+        return outputCommitter.needsTaskCommit(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void commitTask(TaskAttemptContext context) throws IOException {
+        outputCommitter.commitTask(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void abortTask(TaskAttemptContext context) throws IOException {
+        outputCommitter.abortTask(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void cleanupJob(JobContext context) throws IOException {
+        outputCommitter.cleanupJob(
+            HadoopUtils.makeJobContext(getConf(), context));
+      }
+
+      /*if_not[HADOOP_NON_COMMIT_JOB]*/
+      @Override
+      public void commitJob(JobContext context) throws IOException {
+        outputCommitter.commitJob(
+            HadoopUtils.makeJobContext(getConf(), context));
+      }
+
+      @Override
+      public void abortJob(JobContext context,
+          JobStatus.State state) throws IOException {
+        outputCommitter.abortJob(
+            HadoopUtils.makeJobContext(getConf(), context), state);
+      }
+      /*end[HADOOP_NON_COMMIT_JOB]*/
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 745764b..4bc4f4d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -36,6 +36,7 @@ import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
@@ -97,10 +98,16 @@ public final class ConfigurationUtils {
     OPTIONS.addOption("w", "workers", true, "Number of workers");
     OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
     OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
-    OPTIONS.addOption("of", "outputFormat", true, "Vertex output format");
+    OPTIONS.addOption("vof", "vertexOutputFormat", true,
+        "Vertex output format");
+    OPTIONS.addOption("eof", "edgeOutputFormat", true, "Edge output format");
     OPTIONS.addOption("vip", "vertexInputPath", true, "Vertex input path");
     OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
-    OPTIONS.addOption("op", "outputPath", true, "Vertex output path");
+    OPTIONS.addOption("op",  "outputPath", true, "Output path");
+    OPTIONS.addOption("vsd",  "vertexSubDir", true, "subdirectory to be used " 
+
+        "for the vertex output");
+    OPTIONS.addOption("esd",  "edgeSubDir", true, "subdirectory to be used " +
+        "for the edge output");
     OPTIONS.addOption("c", "combiner", true, "Combiner class");
     OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
     OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
@@ -316,14 +323,46 @@ public final class ConfigurationUtils {
           "InputFormat does not require one.");
       }
     }
-    if (cmd.hasOption("of")) {
+    if (cmd.hasOption("vof")) {
       conf.setVertexOutputFormatClass(
           (Class<? extends VertexOutputFormat>) Class
-              .forName(cmd.getOptionValue("of")));
+              .forName(cmd.getOptionValue("vof")));
     } else {
       if (LOG.isInfoEnabled()) {
-        LOG.info("No output format specified. Ensure your OutputFormat " +
-          "does not require one.");
+        LOG.info("No vertex output format specified. Ensure your " +
+          "OutputFormat does not require one.");
+      }
+    }
+    if (cmd.hasOption("vof")) {
+      if (cmd.hasOption("vsd")) {
+        conf.setVertexOutputFormatSubdir(cmd.getOptionValue("vsd"));
+      }
+    }
+    if (cmd.hasOption("eof")) {
+      conf.setEdgeOutputFormatClass(
+          (Class<? extends EdgeOutputFormat>) Class
+              .forName(cmd.getOptionValue("eof")));
+    } else {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("No edge output format specified. Ensure your " +
+          "OutputFormat does not require one.");
+      }
+    }
+    if (cmd.hasOption("eof")) {
+      if (cmd.hasOption("esd")) {
+        conf.setEdgeOutputFormatSubdir(cmd.getOptionValue("esd"));
+      }
+    }
+    /* check for path clashes */
+    if (cmd.hasOption("vof") && cmd.hasOption("eof") && cmd.hasOption("op")) {
+      if (!cmd.hasOption("vsd") || cmd.hasOption("esd")) {
+        if (!conf.hasEdgeOutputFormatSubdir() ||
+            !conf.hasVertexOutputFormatSubdir()) {
+
+          throw new IllegalArgumentException("If VertexOutputFormat and " +
+              "EdgeOutputFormat are both set, it is mandatory to provide " +
+              "both vertex subdirectory as well as edge subdirectory");
+        }
       }
     }
     if (cmd.hasOption("pc")) {
@@ -385,7 +424,7 @@ public final class ConfigurationUtils {
           Integer.parseInt(cmd.getOptionValue("yh")));
     }
     /*if[PURE_YARN]
-    if (cmd.hasOption("of")) {
+    if (cmd.hasOption("vof") || cmd.hasOption("eof")) {
       if (cmd.hasOption("op")) {
         // For YARN conf to get the out dir we need w/o a Job obj
         Path outputDir =

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 9311fbd..112b76d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -32,6 +32,7 @@ import 
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerServer;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GlobalStats;
@@ -40,6 +41,8 @@ import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -919,13 +922,15 @@ public class BspServiceWorker<I extends 
WritableComparable,
    */
   private void saveVertices(long numLocalVertices) throws IOException,
       InterruptedException {
-    if (getConfiguration().getVertexOutputFormatClass() == null) {
+    ImmutableClassesGiraphConfiguration<I, V, E>  conf = getConfiguration();
+
+    if (conf.getVertexOutputFormatClass() == null) {
       LOG.warn("saveVertices: " +
           GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
           " not specified -- there will be no saved output");
       return;
     }
-    if (getConfiguration().doOutputDuringComputation()) {
+    if (conf.doOutputDuringComputation()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("saveVertices: The option for doing output during " +
             "computation is selected, so there will be no saving of the " +
@@ -1024,12 +1029,126 @@ public class BspServiceWorker<I extends 
WritableComparable,
     }
   }
 
+  /**
+   * Save the edges using the user-defined EdgeOutputFormat from our
+   * vertexArray based on the split.
+   *
+   * @throws InterruptedException
+   */
+  private void saveEdges() throws IOException, InterruptedException {
+    final ImmutableClassesGiraphConfiguration<I, V, E>  conf =
+      getConfiguration();
+
+    if (conf.getEdgeOutputFormatClass() == null) {
+      LOG.warn("saveEdges: " +
+               GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
+               "Make sure that the EdgeOutputFormat is not required.");
+      return;
+    }
+
+    final int numPartitions = getPartitionStore().getNumPartitions();
+    int numThreads = Math.min(conf.getNumOutputThreads(),
+        numPartitions);
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveEdges: Starting to save the edges using " +
+        numThreads + " threads");
+    final EdgeOutputFormat<I, V, E> edgeOutputFormat =
+        conf.createWrappedEdgeOutputFormat();
+
+    final Queue<Integer> partitionIdQueue =
+        (numPartitions == 0) ? new LinkedList<Integer>() :
+            new ArrayBlockingQueue<Integer>(numPartitions);
+    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            EdgeWriter<I, V, E>  edgeWriter =
+                edgeOutputFormat.createEdgeWriter(getContext());
+            edgeWriter.setConf(conf);
+            edgeWriter.initialize(getContext());
+
+            long nextPrintVertices = 0;
+            long nextPrintMsecs = System.currentTimeMillis() + 15000;
+            int partitionIndex = 0;
+            int numPartitions = getPartitionStore().getNumPartitions();
+            while (!partitionIdQueue.isEmpty()) {
+              Integer partitionId = partitionIdQueue.poll();
+              if (partitionId == null) {
+                break;
+              }
+
+              Partition<I, V, E> partition =
+                  getPartitionStore().getPartition(partitionId);
+              long vertices = 0;
+              long edges = 0;
+              long partitionEdgeCount = partition.getEdgeCount();
+              for (Vertex<I, V, E> vertex : partition) {
+                for (Edge<I, E> edge : vertex.getEdges()) {
+                  edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), 
edge);
+                  ++edges;
+                }
+                ++vertices;
+
+                // Update status at most every 250k vertices or 15 seconds
+                if (vertices > nextPrintVertices &&
+                    System.currentTimeMillis() > nextPrintMsecs) {
+                  LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+                      "saveEdges: Saved " + edges +
+                      " edges out of " + partitionEdgeCount +
+                      " partition edges, on partition " + partitionIndex +
+                      " out of " + numPartitions);
+                  nextPrintMsecs = System.currentTimeMillis() + 15000;
+                  nextPrintVertices = vertices + 250000;
+                }
+              }
+              getPartitionStore().putPartition(partition);
+              ++partitionIndex;
+            }
+            edgeWriter.close(getContext()); // the temp results are saved now
+            return null;
+          }
+        };
+      }
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "save-vertices-%d", getContext());
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+      "saveEdges: Done saving edges.");
+    // YARN: must complete the commit the "task" output, Hadoop isn't there.
+    if (conf.isPureYarnJob() &&
+      conf.getVertexOutputFormatClass() != null) {
+      try {
+        OutputCommitter outputCommitter =
+          edgeOutputFormat.getOutputCommitter(getContext());
+        if (outputCommitter.needsTaskCommit(getContext())) {
+          LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+            "OutputCommitter: committing task output.");
+          // transfer from temp dirs to "task commit" dirs to prep for
+          // the master's OutputCommitter#commitJob(context) call to finish.
+          outputCommitter.commitTask(getContext());
+        }
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while attempting to obtain " +
+          "OutputCommitter.", ie);
+      } catch (IOException ioe) {
+        LOG.error("Master task's attempt to commit output has " +
+          "FAILED.", ioe);
+      }
+    }
+  }
+
   @Override
   public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
     throws IOException, InterruptedException {
     workerClient.closeConnections();
     setCachedSuperstep(getSuperstep() - 1);
     saveVertices(finishedSuperstepStats.getLocalVertexCount());
+    saveEdges();
     getPartitionStore().shutdown();
     // All worker processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions
@@ -1331,7 +1450,6 @@ else[HADOOP_NON_SECURE]*/
       }
     }
 
-
     try {
       workerClientRequestProcessor.flush();
       workerClient.waitAllRequests();

Reply via email to