Updated Branches:
  refs/heads/trunk 90f3116e3 -> 7fb9b5f3a

GIRAPH-681: Graphviz Output Format (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7fb9b5f3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7fb9b5f3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7fb9b5f3

Branch: refs/heads/trunk
Commit: 7fb9b5f3a2d4fe377c014d16f54d06a626228526
Parents: 90f3116
Author: Nitay Joffe <[email protected]>
Authored: Tue Jun 11 00:41:25 2013 -0400
Committer: Nitay Joffe <[email protected]>
Committed: Tue Jun 11 00:41:25 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../giraph/io/formats/GraphvizOutputFormat.java | 244 +++++++++++++++++++
 2 files changed, 246 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/7fb9b5f3/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e66ec42..f929feb 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-681: Graphviz Output Format (nitay)
+
   GIRAPH-468: Make Vertex an Interface (nitay)
 
   GIRAPH-667: all workers suspended at 'saveVertices' when use 

http://git-wip-us.apache.org/repos/asf/giraph/blob/7fb9b5f3/giraph-core/src/main/java/org/apache/giraph/io/formats/GraphvizOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/GraphvizOutputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/GraphvizOutputFormat.java
new file mode 100644
index 0000000..6f10e2e
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/GraphvizOutputFormat.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Writes graph to a dot file (graphviz format). This writes it out in parts. 
At
+ * the end of the job you can use the following to get a single graphviz file:
+ *
+ * hadoop fs -getmerge /hadoop/output/path data.txt
+ */
+public class GraphvizOutputFormat extends TextVertexOutputFormat<
+    WritableComparable, Writable, Writable> {
+  /** Color of node text */
+  private static final String NODE_TEXT_COLOR = "blue:orange";
+
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new VertexWriter();
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new GraphvizOutputCommitter(super.getOutputCommitter(context));
+  }
+
+  /**
+   * Get output directory job is using
+   * @param context job context
+   * @return Path for output directory
+   */
+
+  private static Path getOutputDir(JobContext context) {
+    return FileOutputFormat.getOutputPath(context);
+  }
+
+  /**
+   * Get path which sorts at the beginning of a directory
+   * @param context job context
+   * @return Path at beginning
+   */
+
+  private static Path getPathAtBeginning(JobContext context) {
+    return new Path(getOutputDir(context), "____" + 
System.currentTimeMillis());
+  }
+
+  /**
+   * Get path which sorts at the end of a directory
+   * @param context job context
+   * @return Path at end
+   */
+  private static Path getPathAtEnd(JobContext context) {
+    return new Path(getOutputDir(context), "zzz_" + 
System.currentTimeMillis());
+  }
+
+  /**
+   * Write start of graph data
+   * @param context task attempt context
+   * @throws IOException I/O errors
+   */
+  private static void writeStart(JobContext context) throws IOException {
+    Path path = getPathAtBeginning(context);
+    FileSystem fs = path.getFileSystem(context.getConfiguration());
+    FSDataOutputStream file = fs.create(path, false);
+    file.writeBytes("digraph g {\n");
+    file.close();
+  }
+
+  /**
+   * Write end of graph data
+   * @param context job attempt context
+   * @throws IOException I/O errors
+   */
+  private static void writeEnd(JobContext context) throws IOException {
+    Path path = getPathAtEnd(context);
+    FileSystem fs = path.getFileSystem(context.getConfiguration());
+    FSDataOutputStream file = fs.create(path, false);
+    file.writeBytes("}\n");
+    file.close();
+  }
+
+  /**
+   * Add node information to output
+   * @param vertex node
+   * @param sb string builder
+   */
+  private static void addNodeInfo(
+      Vertex<WritableComparable, Writable, Writable> vertex, StringBuilder sb) 
{
+    sb.append('"').append(vertex.getId()).append('"');
+    sb.append(" [").append("label=").append('"').append("<id> ");
+    sb.append(vertex.getId());
+    if (!(vertex.getValue() instanceof NullWritable)) {
+      sb.append("|").append(vertex.getValue());
+    }
+    sb.append('"').append(",shape=record,fillcolor=")
+        .append('"').append(NODE_TEXT_COLOR).append('"')
+        .append("];");
+  }
+
+  /**
+   * Write an edge
+   * @param sb string builder
+   * @param sourceID source vertex ID
+   * @param edge the edge
+   */
+  private static void addEdge(StringBuilder sb, Writable sourceID,
+      Edge<WritableComparable, Writable> edge) {
+    sb.append(sourceID).append(":id")
+        .append(" -> ")
+        .append(edge.getTargetVertexId()).append(":id");
+    addEdgeInfo(sb, edge);
+    sb.append("\n");
+  }
+
+  /**
+   * Add edge information to output
+   * @param sb string builder
+   * @param edge the edge
+   */
+  private static void addEdgeInfo(StringBuilder sb,
+    Edge<WritableComparable, Writable> edge) {
+    if (!(edge.getValue() instanceof NullWritable)) {
+      sb.append(" [label=").append(edge.getValue()).append(" ];");
+    }
+  }
+
+  /**
+   * Wrapper around output committer which writes our begin/end files.
+   */
+  private static class GraphvizOutputCommitter extends OutputCommitter {
+    /** delegate committer */
+    private final OutputCommitter delegate;
+
+    /**
+     * Constructor with delegate
+     * @param delegate committer to use
+     */
+    private GraphvizOutputCommitter(OutputCommitter delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override public boolean equals(Object o) {
+      return delegate.equals(o);
+    }
+
+    @Override public String toString() {
+      return delegate.toString();
+    }
+
+    @Override public int hashCode() {
+      return delegate.hashCode();
+    }
+
+    @Override public void abortJob(JobContext jobContext, JobStatus.State 
state)
+      throws IOException {
+      delegate.abortJob(jobContext, state);
+    }
+
+    @Override public void abortTask(TaskAttemptContext taskContext)
+      throws IOException {
+      delegate.abortTask(taskContext);
+    }
+
+    @Override @Deprecated public void cleanupJob(JobContext context)
+      throws IOException {
+      delegate.cleanupJob(context);
+    }
+
+    @Override public void commitJob(JobContext jobContext) throws IOException {
+      writeEnd(jobContext);
+      delegate.commitJob(jobContext);
+    }
+
+    @Override public void commitTask(TaskAttemptContext taskContext)
+      throws IOException {
+      delegate.commitTask(taskContext);
+    }
+
+    @Override public boolean needsTaskCommit(TaskAttemptContext taskContext)
+      throws IOException {
+      return delegate.needsTaskCommit(taskContext);
+    }
+
+    @Override public void setupJob(JobContext jobContext) throws IOException {
+      delegate.setupJob(jobContext);
+      writeStart(jobContext);
+    }
+
+    @Override public void setupTask(TaskAttemptContext taskContext)
+      throws IOException {
+      delegate.setupTask(taskContext);
+    }
+  }
+
+  /**
+   * Writes vertices to graphviz files.
+   */
+  private class VertexWriter extends TextVertexWriter {
+    @Override
+    public void writeVertex(
+      Vertex<WritableComparable, Writable, Writable> vertex)
+      throws IOException, InterruptedException {
+      StringBuilder sb = new StringBuilder(vertex.getNumEdges() * 10);
+      for (Edge<WritableComparable, Writable> edge : vertex.getEdges()) {
+        addEdge(sb, vertex.getId(), edge);
+      }
+      addNodeInfo(vertex, sb);
+      getRecordWriter().write(new Text(sb.toString()), null);
+    }
+  }
+}

Reply via email to