[FLINK-5913] [gelly] Example drivers

Replace existing and create new algorithm Driver implementations for
each of the library methods.

This closes #3635


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a48357db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a48357db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a48357db

Branch: refs/heads/table-retraction
Commit: a48357db8c4187fd08f3b17880899ebbcb5d3b5e
Parents: ded25be
Author: Greg Hogan <c...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Fri Mar 31 11:17:26 2017 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Usage.java |   2 -
 .../apache/flink/graph/drivers/AdamicAdar.java  |  71 ++++
 .../graph/drivers/ClusteringCoefficient.java    | 378 +++++--------------
 .../graph/drivers/ConnectedComponents.java      | 105 ++++++
 .../apache/flink/graph/drivers/EdgeList.java    |  92 +++++
 .../apache/flink/graph/drivers/Graph500.java    | 165 --------
 .../flink/graph/drivers/GraphMetrics.java       | 265 ++++---------
 .../org/apache/flink/graph/drivers/HITS.java    | 188 ++-------
 .../flink/graph/drivers/JaccardIndex.java       | 224 ++---------
 .../apache/flink/graph/drivers/PageRank.java    |  74 ++++
 .../flink/graph/drivers/SimpleDriver.java       |  65 ++++
 .../flink/graph/drivers/TriangleListing.java    | 362 +++++-------------
 .../drivers/parameter/IterationConvergence.java |  89 +++++
 .../graph/examples/ConnectedComponents.java     | 141 -------
 .../examples/GSASingleSourceShortestPaths.java  |   4 +-
 .../parameter/IterationConvergenceTest.java     |  66 ++++
 .../examples/ConnectedComponentsITCase.java     |  72 ----
 .../main/java/org/apache/flink/graph/Graph.java |  16 +-
 .../graph/library/ConnectedComponents.java      |   5 +-
 .../graph/library/GSAConnectedComponents.java   |   8 +-
 .../flink/graph/library/LabelPropagation.java   |   5 +-
 .../clustering/directed/TriangleListing.java    |   2 +-
 .../undirected/LocalClusteringCoefficient.java  |   2 +-
 .../graph/library/link_analysis/PageRank.java   |   8 +-
 .../graph/library/similarity/AdamicAdar.java    |   2 +-
 .../graph/library/similarity/JaccardIndex.java  |   2 +-
 .../apache/flink/graph/utils/GraphUtils.java    |  10 +-
 .../flink/graph/utils/NullValueEdgeMapper.java  |  32 --
 28 files changed, 919 insertions(+), 1536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
index d923bf0..642fe5b 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -28,7 +28,6 @@ public class Usage {
 
        private static final Class[] DRIVERS = new Class[]{
                org.apache.flink.graph.drivers.ClusteringCoefficient.class,
-               org.apache.flink.graph.drivers.Graph500.class,
                org.apache.flink.graph.drivers.GraphMetrics.class,
                org.apache.flink.graph.drivers.HITS.class,
                org.apache.flink.graph.drivers.JaccardIndex.class,
@@ -36,7 +35,6 @@ public class Usage {
        };
 
        private static final Class[] EXAMPLES = new Class[]{
-               org.apache.flink.graph.examples.ConnectedComponents.class,
                org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
                
org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
                org.apache.flink.graph.examples.IncrementalSSSP.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
new file mode 100644
index 0000000..742c1de
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
+import org.apache.flink.types.CopyableValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
+ */
+public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
+
+       @Override
+       public String getShortDescription() {
+               return "similarity score weighted by centerpoint degree";
+       }
+
+       @Override
+       public String getLongDescription() {
+               return WordUtils.wrap(new StrBuilder()
+                       .appendln("Adamic-Adar measures the similarity between 
vertex neighborhoods and is " +
+                               "computed as the sum of the inverse logarithm 
of centerpoint degree over shared " +
+                               "neighbors.")
+                       .appendNewLine()
+                       .append("The algorithm result contains two vertex IDs 
and the similarity score.")
+                       .toString(), 80);
+       }
+
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               int lp = littleParallelism.getValue().intValue();
+
+               result = graph
+                       .run(new 
org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
+                               .setLittleParallelism(lp));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 004390d..c463c0a 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -18,333 +18,127 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.CopyableValue;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Driver for the library implementations of Global and Local Clustering 
Coefficient.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then calculates
- * the local clustering coefficient for each vertex and the global clustering
- * coefficient for the graph.
+ * Driver for directed and undirected clustering coefficient algorithm and 
analytics.
  *
+ * @see 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient
  * @see 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
  * @see 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
+ * @see 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient
  * @see 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
  * @see 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
  */
-public class ClusteringCoefficient {
-
-       private static final int DEFAULT_SCALE = 10;
-
-       private static final int DEFAULT_EDGE_FACTOR = 16;
-
-       private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       private static String getUsage(String message) {
-               return new StrBuilder()
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("The local clustering 
coefficient measures the connectedness of each" +
-                               " vertex's neighborhood and the global 
clustering coefficient measures the connectedness of the graph." +
-                               " Scores range from 0.0 (no edges between 
neighbors or vertices) to 1.0 (neighborhood or graph" +
-                               " is a clique).", 80))
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("This algorithm returns tuples 
containing the vertex ID, the degree of" +
-                               " the vertex, and the number of edges between 
vertex neighbors.", 80))
-                       .appendNewLine()
-                       .appendln("usage: ClusteringCoefficient --directed 
<true | false> --input <csv | rmat> --output <print | hash | csv>")
-                       .appendNewLine()
-                       .appendln("options:")
-                       .appendln("  --input csv --type <integer | string> 
[--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter 
LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-                       .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
-                       .appendNewLine()
-                       .appendln("  --output print")
-                       .appendln("  --output hash")
-                       .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
-                       .appendNewLine()
-                       .appendln("Usage error: " + message)
-                       .toString();
-       }
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               env.getConfig().setGlobalJobParameters(parameters);
-
-               if (! parameters.has("directed")) {
-                       throw new 
ProgramParametrizationException(getUsage("must declare execution mode as 
'--directed true' or '--directed false'"));
-               }
-               boolean directedAlgorithm = parameters.getBoolean("directed");
-
-               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-               // global and local clustering coefficient results
-               GraphAnalytic gcc;
-               GraphAnalytic acc;
-               DataSet lcc;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.get("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               Graph<LongValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(LongValue.class);
+public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, 
VV, EV>
+extends SimpleDriver<PrintableResult>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
 
-                                               if (directedAlgorithm) {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>()
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
+       private static final String DIRECTED = "directed";
 
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               } else {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false)
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
+       private static final String UNDIRECTED = "undirected";
 
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                       } break;
+       private ChoiceParameter order = new ChoiceParameter(this, "order")
+               .addChoices(DIRECTED, UNDIRECTED);
 
-                                       case "string": {
-                                               Graph<StringValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(StringValue.class);
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
 
-                                               if (directedAlgorithm) {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>()
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
+       private GraphAnalytic<K, VV, EV, ? extends PrintableResult> 
globalClusteringCoefficient;
 
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               } else {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false)
-                                                                               
.setParallelism(little_parallelism));
-                                                       }
+       private GraphAnalytic<K, VV, EV, ? extends PrintableResult> 
averageClusteringCoefficient;
 
-                                                       gcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       acc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                                       lcc = graph
-                                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue,
 NullValue, NullValue>()
-                                                                       
.setLittleParallelism(little_parallelism));
-                                               }
-                                       } break;
-
-                                       default:
-                                               throw new 
ProgramParametrizationException(getUsage("invalid CSV type"));
-                               }
-                       } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .setParallelism(little_parallelism)
-                                       .generate();
-
-                               if (directedAlgorithm) {
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>()
-                                                               
.setParallelism(little_parallelism));
-
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } else {
-                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
-                                                               
.setParallelism(little_parallelism))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>()
-                                                               
.setParallelism(little_parallelism));
-
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       }
-                               } else {
-                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
 
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip)
-                                                               
.setParallelism(little_parallelism));
+       @Override
+       public String getShortDescription() {
+               return "measure the connectedness of vertex neighborhoods";
+       }
 
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } else {
-                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
-                                                               
.setParallelism(little_parallelism))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip)
-                                                               
.setParallelism(little_parallelism));
+       @Override
+       public String getLongDescription() {
+               return WordUtils.wrap(new StrBuilder()
+                       .appendln("The local clustering coefficient measures 
the connectedness of each " +
+                               "vertex's neighborhood. The global clustering 
coefficient measures the " +
+                               "connected of the graph. The average clustering 
coefficient is the mean local " +
+                               "clustering coefficient. Each score ranges from 
0.0 (no edges between vertex " +
+                               "neighbors) to 1.0 (neighborhood or graph is a 
clique).")
+                       .appendNewLine()
+                       .append("The algorithm result contains the vertex ID, 
degree, and number of edges " +
+                               "connecting neighbors.")
+                       .toString(), 80);
+       }
 
-                                               gcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               acc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                               lcc = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>()
-                                                               
.setIncludeZeroDegreeVertices(false)
-                                                               
.setLittleParallelism(little_parallelism));
-                                       }
-                               }
-                       } break;
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               int lp = littleParallelism.getValue().intValue();
 
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid input type"));
-               }
+               switch (order.getValue()) {
+                       case DIRECTED:
+                               result = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K,
 VV, EV>()
+                                               .setLittleParallelism(lp));
 
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               if (directedAlgorithm) {
-                                       for (Object e: lcc.collect()) {
-                                               
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result
 result =
-                                                       
(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
-                                               
System.out.println(result.toPrintableString());
-                                       }
-                               } else {
-                                       for (Object e: lcc.collect()) {
-                                               
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result
 result =
-                                                       
(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
-                                               
System.out.println(result.toPrintableString());
-                                       }
-                               }
-                               break;
+                               globalClusteringCoefficient = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K,
 VV, EV>()
+                                               .setLittleParallelism(lp));
 
-                       case "hash":
-                               
System.out.println(DataSetUtils.checksumHashCode(lcc));
+                               averageClusteringCoefficient = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K,
 VV, EV>()
+                                               .setLittleParallelism(lp));
                                break;
 
-                       case "csv":
-                               String filename = 
parameters.get("output_filename");
+                       case UNDIRECTED:
+                               result = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K,
 VV, EV>()
+                                               .setLittleParallelism(lp));
 
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+                               globalClusteringCoefficient = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K,
 VV, EV>()
+                                               .setLittleParallelism(lp));
 
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               lcc.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                               env.execute("Clustering Coefficient");
+                               averageClusteringCoefficient = graph
+                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K,
 VV, EV>()
+                                               .setLittleParallelism(lp));
                                break;
-
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid output type"));
                }
+       }
 
-               System.out.println(gcc.getResult());
-               System.out.println(acc.getResult());
+       @Override
+       public void hash(String executionName) throws Exception {
+               super.hash(executionName);
+               printAnalytics();
+       }
 
-               JobExecutionResult result = env.getLastJobExecutionResult();
+       @Override
+       public void print(String executionName) throws Exception {
+               super.print(executionName);
+               printAnalytics();
+       }
+
+       @Override
+       public void writeCSV(String filename, String lineDelimiter, String 
fieldDelimiter) {
+               super.writeCSV(filename, lineDelimiter, fieldDelimiter);
+               printAnalytics();
+       }
 
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+       private void printAnalytics() {
+               
System.out.println(globalClusteringCoefficient.getResult().toPrintableString());
+               
System.out.println(averageClusteringCoefficient.getResult().toPrintableString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
new file mode 100644
index 0000000..32263cf
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+
+import java.util.List;
+
+/**
+ * Driver for {@link org.apache.flink.graph.library.GSAConnectedComponents}.
+ *
+ * The gather-sum-apply implementation is used because scatter-gather does not
+ * handle object reuse (see FLINK-5891).
+ */
+public class ConnectedComponents<K extends Comparable<K>, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, CSV, Hash, Print {
+
+       private DataSet<Vertex<K, K>> components;
+
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
+
+       @Override
+       public String getShortDescription() {
+               return "ConnectedComponents";
+       }
+
+       @Override
+       public String getLongDescription() {
+               return "ConnectedComponents";
+       }
+
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               components = graph
+                       .mapVertices(new MapVertices<K, VV>())
+                       .run(new GSAConnectedComponents<K, K, 
EV>(Integer.MAX_VALUE));
+       }
+
+       @Override
+       public void hash(String executionName) throws Exception {
+               Checksum checksum = new ChecksumHashCode<Vertex<K, K>>()
+                       .run(components)
+                       .execute(executionName);
+
+               System.out.println(checksum);
+       }
+
+       @Override
+       public void print(String executionName) throws Exception {
+               Collect<Vertex<K, K>> collector = new Collect<>();
+
+               // Refactored due to openjdk7 compile error: 
https://travis-ci.org/greghogan/flink/builds/200487761
+               List<Vertex<K, K>> records = 
collector.run(components).execute(executionName);
+
+               for (Vertex<K, K> result : records) {
+                       System.out.println(result);
+               }
+       }
+
+       @Override
+       public void writeCSV(String filename, String lineDelimiter, String 
fieldDelimiter) {
+               components
+                       .writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+                               .name("CSV: " + filename);
+       }
+
+       private static final class MapVertices<T, VT>
+       implements MapFunction<Vertex<T, VT>, T> {
+               @Override
+               public T map(Vertex<T, VT> value) throws Exception {
+                       return value.f0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
new file mode 100644
index 0000000..85f32c3
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+import java.util.List;
+
+/**
+ * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s.
+ */
+public class EdgeList<K, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, CSV, Hash, Print {
+
+       private DataSet<Edge<K, EV>> edges;
+
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
+
+       @Override
+       public String getShortDescription() {
+               return "the edge list";
+       }
+
+       @Override
+       public String getLongDescription() {
+               return "Pass-through of the graph's edge list.";
+       }
+
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               edges = graph
+                       .getEdges();
+       }
+
+       @Override
+       public void hash(String executionName) throws Exception {
+               Checksum checksum = new ChecksumHashCode<Edge<K, EV>>()
+                       .run(edges)
+                       .execute(executionName);
+
+               System.out.println(checksum);
+       }
+
+       @Override
+       public void print(String executionName) throws Exception {
+               Collect<Edge<K, EV>> collector = new Collect<>();
+
+               // Refactored due to openjdk7 compile error: 
https://travis-ci.org/greghogan/flink/builds/200487761
+               List<Edge<K, EV>> records = 
collector.run(edges).execute(executionName);
+
+               for (Edge<K, EV> result : records) {
+                       System.out.println(result);
+               }
+
+       }
+
+       @Override
+       public void writeCSV(String filename, String lineDelimiter, String 
fieldDelimiter) {
+               edges
+                       .writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+                               .name("CSV: " + filename);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
deleted file mode 100644
index c2abbf7..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.drivers;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-import java.text.NumberFormat;
-
-/**
- * Generate an RMat graph for Graph 500.
- *
- * Note that this does not yet implement permutation of vertex labels or edges.
- *
- * @see <a href="http://www.graph500.org/specifications";>Graph 500</a>
- */
-public class Graph500 {
-
-       private static final int DEFAULT_SCALE = 10;
-
-       private static final int DEFAULT_EDGE_FACTOR = 16;
-
-       private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       private static String getUsage(String message) {
-               return new StrBuilder()
-                       .appendNewLine()
-                       .appendln("A Graph500 generator using the Recursive 
Matrix (RMat) graph generator.")
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("The graph matrix contains 
2^scale vertices although not every vertex will" +
-                               " be represented in an edge. The number of 
edges is edge_factor * 2^scale edges" +
-                               " although some edges may be duplicates.", 80))
-                       .appendNewLine()
-                       .appendln("Note: this does not yet implement 
permutation of vertex labels or edges.")
-                       .appendNewLine()
-                       .appendln("usage: Graph500 --directed <true | false> 
--simplify <true | false> --output <print | hash | csv [options]>")
-                       .appendNewLine()
-                       .appendln("options:")
-                       .appendln("  --output print")
-                       .appendln("  --output hash")
-                       .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
-                       .appendNewLine()
-                       .appendln("Usage error: " + message)
-                       .toString();
-       }
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               env.getConfig().setGlobalJobParameters(parameters);
-
-               if (! parameters.has("directed")) {
-                       throw new 
ProgramParametrizationException(getUsage("must declare execution mode as 
'--directed true' or '--directed false'"));
-               }
-               boolean directed = parameters.getBoolean("directed");
-
-               if (! parameters.has("simplify")) {
-                       throw new 
ProgramParametrizationException(getUsage("must declare '--simplify true' or 
'--simplify false'"));
-               }
-               boolean simplify = parameters.getBoolean("simplify");
-
-
-               // Generate RMat graph
-               int scale = parameters.getInt("scale", DEFAULT_SCALE);
-               int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
-
-               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
-
-               long vertexCount = 1L << scale;
-               long edgeCount = vertexCount * edgeFactor;
-
-               boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
-
-               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                       .generate();
-
-               if (directed) {
-                       if (simplify) {
-                               graph = graph
-                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>());
-                       }
-               } else {
-                       if (simplify) {
-                               graph = graph
-                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
-                       } else {
-                               graph = graph.getUndirected();
-                       }
-               }
-
-               DataSet<Tuple2<LongValue, LongValue>> edges = graph
-                       .getEdges()
-                       .project(0, 1);
-
-               // Print, hash, or write RMat graph to disk
-               switch (parameters.get("output", "")) {
-               case "print":
-                       System.out.println();
-                       edges.print();
-                       break;
-
-               case "hash":
-                       System.out.println();
-                       
System.out.println(DataSetUtils.checksumHashCode(edges));
-                       break;
-
-               case "csv":
-                       String filename = 
parameters.getRequired("output_filename");
-
-                       String lineDelimiter = StringEscapeUtils.unescapeJava(
-                               parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                       String fieldDelimiter = StringEscapeUtils.unescapeJava(
-                               parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                       edges.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                       env.execute("Graph500");
-                       break;
-               default:
-                       throw new 
ProgramParametrizationException(getUsage("invalid output type"));
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 9b246df..cc5a894 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -18,224 +18,109 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.CopyableValue;
 
 /**
- * Computes vertex and edge metrics on a directed or undirected graph.
+ * Driver for directed and undirected graph metrics analytics.
  *
  * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
  * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
  * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
  * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
  */
-public class GraphMetrics {
+public class GraphMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, Hash, Print {
 
-       private static final int DEFAULT_SCALE = 10;
+       private static final String DIRECTED = "directed";
 
-       private static final int DEFAULT_EDGE_FACTOR = 16;
+       private static final String UNDIRECTED = "undirected";
 
-       private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+       private ChoiceParameter order = new ChoiceParameter(this, "order")
+               .addChoices(DIRECTED, UNDIRECTED);
 
-       private static String getUsage(String message) {
+       private GraphAnalytic<K, VV, EV, ? extends PrintableResult> 
vertexMetrics;
+
+       private GraphAnalytic<K, VV, EV, ? extends PrintableResult> edgeMetrics;
+
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
+
+       @Override
+       public String getShortDescription() {
+               return "compute vertex and edge metrics";
+       }
+
+       @Override
+       public String getLongDescription() {
                return new StrBuilder()
+                       .appendln("Computes metrics on a directed or undirected 
graph.")
                        .appendNewLine()
-                       .appendln(WordUtils.wrap("Computes vertex and edge 
metrics on a directed or undirected graph.", 80))
-                       .appendNewLine()
-                       .appendln("usage: GraphMetrics --directed <true | 
false> --input <csv | rmat>")
+                       .appendln("Vertex metrics:")
+                       .appendln("- number of vertices")
+                       .appendln("- number of edges")
+                       .appendln("- number of unidirectional edges (directed 
only)")
+                       .appendln("- number of bidirectional edges (directed 
only)")
+                       .appendln("- average degree")
+                       .appendln("- number of triplets")
+                       .appendln("- maximum degree")
+                       .appendln("- maximum out degree (directed only)")
+                       .appendln("- maximum in degree (directed only)")
+                       .appendln("- maximum number of triplets")
                        .appendNewLine()
-                       .appendln("options:")
-                       .appendln("  --input csv --type <integer | string> 
[--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter 
LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-                       .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
-                       .appendNewLine()
-                       .appendln("Usage error: " + message)
+                       .appendln("Edge metrics:")
+                       .appendln("- number of triangle triplets")
+                       .appendln("- number of rectangle triplets")
+                       .appendln("- maximum number of triangle triplets")
+                       .append("- maximum number of rectangle triplets")
                        .toString();
        }
 
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               switch (order.getValue()) {
+                       case DIRECTED:
+                               vertexMetrics = graph
+                                       .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>());
 
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               env.getConfig().setGlobalJobParameters(parameters);
+                               edgeMetrics = graph
+                                       .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>());
+                               break;
 
-               if (! parameters.has("directed")) {
-                       throw new 
ProgramParametrizationException(getUsage("must declare execution mode as 
'--directed true' or '--directed false'"));
-               }
-               boolean directedAlgorithm = parameters.getBoolean("directed");
-
-               GraphAnalytic vm;
-               GraphAnalytic em;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.getRequired("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               Graph<LongValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(LongValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, 
NullValue, NullValue>());
-                                                       }
-
-                                                       vm = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, 
NullValue, NullValue>());
-                                                       em = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, 
NullValue, NullValue>());
-                                               } else {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, 
NullValue, NullValue>(false));
-                                                       }
-
-                                                       vm = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, 
NullValue, NullValue>());
-                                                       em = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, 
NullValue, NullValue>());
-                                               }
-                                       } break;
-
-                                       case "string": {
-                                               Graph<StringValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(StringValue.class);
-
-                                               if (directedAlgorithm) {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, 
NullValue, NullValue>());
-                                                       }
-
-                                                       vm = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, 
NullValue, NullValue>());
-                                                       em = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, 
NullValue, NullValue>());
-                                               } else {
-                                                       if 
(parameters.getBoolean("simplify", false)) {
-                                                               graph = graph
-                                                                       
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, 
NullValue, NullValue>(false));
-                                                       }
-
-                                                       vm = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, 
NullValue, NullValue>());
-                                                       em = graph
-                                                               .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, 
NullValue, NullValue>());
-                                               }
-                                       } break;
-
-                                       default:
-                                               throw new 
ProgramParametrizationException(getUsage("invalid CSV type"));
-                               }
-                               } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .generate();
-
-                               if (directedAlgorithm) {
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>());
-
-                                               vm = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, 
NullValue, NullValue>());
-                                               em = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, 
NullValue, NullValue>());
-                                       } else {
-                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>());
-
-                                               vm = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, 
NullValue, NullValue>());
-                                               em = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, 
NullValue>());
-                                       }
-                               } else {
-                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-                                       if (scale > 32) {
-                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
-
-                                               vm = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, 
NullValue, NullValue>());
-                                               em = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, 
NullValue, NullValue>());
-                                       } else {
-                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip));
-
-                                               vm = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, 
NullValue, NullValue>());
-                                               em = newGraph
-                                                       .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, 
NullValue, NullValue>());
-                                       }
-                               }
-                               } break;
-
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid input type"));
+                       case UNDIRECTED:
+                               vertexMetrics = graph
+                                       .run(new 
org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>());
+
+                               edgeMetrics = graph
+                                       .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>());
+                               break;
                }
+       }
 
-               env.execute("Graph Metrics");
+       @Override
+       public void hash(String executionName) throws Exception {
+               print(executionName);
+       }
 
-               System.out.println();
-               System.out.print("Vertex metrics:\n  ");
-               System.out.println(vm.getResult().toString().replace(";", "\n 
"));
-               System.out.println();
-               System.out.print("Edge metrics:\n  ");
-               System.out.println(em.getResult().toString().replace(";", "\n 
"));
+       @Override
+       public void print(String executionName) throws Exception {
+               vertexMetrics.execute(executionName);
 
-               JobExecutionResult result = env.getLastJobExecutionResult();
+               System.out.print("Vertex metrics:\n  ");
+               
System.out.println(vertexMetrics.getResult().toPrintableString().replace(";", 
"\n "));
 
-               NumberFormat nf = NumberFormat.getInstance();
                System.out.println();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+               System.out.print("Edge metrics:\n  ");
+               
System.out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n 
"));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index db27f0e..6081fea 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -18,177 +18,51 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence;
 import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
 
 /**
- * Driver for the library implementation of HITS (Hubs and Authorities).
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then 
calculates
- * hub and authority scores for each vertex.
- *
- * @see org.apache.flink.graph.library.link_analysis.HITS
+ * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}.
  */
-public class HITS {
+public class HITS<K, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
 
        private static final int DEFAULT_ITERATIONS = 10;
 
-       private static final int DEFAULT_SCALE = 10;
+       private IterationConvergence iterationConvergence = new 
IterationConvergence(this, DEFAULT_ITERATIONS);
 
-       private static final int DEFAULT_EDGE_FACTOR = 16;
-
-       private static String getUsage(String message) {
-               return new StrBuilder()
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("Hyperlink-Induced Topic 
Search computes two interdependent" +
-                               " scores for every vertex in a directed graph. 
A good \"hub\" links to good \"authorities\"" +
-                               " and good \"authorities\" are linked from good 
\"hubs\".", 80))
-                       .appendNewLine()
-                       .appendln("usage: HITS --input <csv | rmat> --output 
<print | hash | csv>")
-                       .appendNewLine()
-                       .appendln("options:")
-                       .appendln("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]")
-                       .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
-                       .appendNewLine()
-                       .appendln("  --output print")
-                       .appendln("  --output hash")
-                       .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
-                       .appendNewLine()
-                       .appendln("Usage error: " + message)
-                       .toString();
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
        }
 
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               env.getConfig().setGlobalJobParameters(parameters);
-
-               int iterations = parameters.getInt("iterations", 
DEFAULT_ITERATIONS);
-
-               DataSet hits;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.getRequired("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               hits = reader
-                                                       
.keyType(LongValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, 
NullValue>(iterations));
-                                       } break;
-
-                                       case "string": {
-                                               hits = reader
-                                                       
.keyType(StringValue.class)
-                                                       .run(new 
org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, 
NullValue>(iterations));
-                                       } break;
-
-                                       default:
-                                               throw new 
ProgramParametrizationException(getUsage("invalid CSV type"));
-                               }
-                               } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .generate();
-
-                               if (scale > 32) {
-                                       hits = graph
-                                               .run(new Simplify<LongValue, 
NullValue, NullValue>())
-                                               .run(new 
org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, 
NullValue>(iterations));
-                               } else {
-                                       hits = graph
-                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
-                                               .run(new Simplify<IntValue, 
NullValue, NullValue>())
-                                               .run(new 
org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, 
NullValue>(iterations));
-                               }
-                               } break;
-
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid input type"));
-               }
-
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               System.out.println();
-                               for (Object e: hits.collect()) {
-                                       
System.out.println(((Result)e).toPrintableString());
-                               }
-                               break;
-
-                       case "hash":
-                               System.out.println();
-                               
System.out.println(DataSetUtils.checksumHashCode(hits));
-                               break;
-
-                       case "csv":
-                               String filename = 
parameters.getRequired("output_filename");
-
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               hits.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                               env.execute("HITS");
-                               break;
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid output type"));
-               }
+       @Override
+       public String getShortDescription() {
+               return "score vertices as hubs and authorities";
+       }
 
-               JobExecutionResult result = env.getLastJobExecutionResult();
+       @Override
+       public String getLongDescription() {
+               return WordUtils.wrap(new StrBuilder()
+                       .appendln("Hyperlink-Induced Topic Search computes two 
interdependent scores for " +
+                               "each vertex in a directed graph. A good 
\"hub\" links to good \"authorities\" " +
+                               "and good \"authorities\" are linked to from 
good \"hubs\".")
+                       .appendNewLine()
+                       .append("The result contains the vertex ID, hub score, 
and authority score.")
+                       .toString(), 80);
+       }
 
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               result = graph
+                       .run(new 
org.apache.flink.graph.library.link_analysis.HITS<K, VV, EV>(
+                               iterationConvergence.getValue().iterations,
+                               
iterationConvergence.getValue().convergenceThreshold));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 09479a6..1c836ea 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -18,211 +18,57 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.types.CopyableValue;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Driver for the library implementation of Jaccard Index.
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then 
calculates
- * all non-zero Jaccard Index similarity scores between vertices.
- *
- * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
  */
-public class JaccardIndex {
+public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
 
-       private static final int DEFAULT_SCALE = 10;
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
 
-       private static final int DEFAULT_EDGE_FACTOR = 16;
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
 
-       private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+       @Override
+       public String getShortDescription() {
+               return "similarity score as fraction of common neighbors";
+       }
 
-       private static String getUsage(String message) {
-               return new StrBuilder()
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("The Jaccard Index measures 
the similarity between vertex" +
-                               " neighborhoods and is computed as the number 
of shared neighbors divided by the number of" +
-                               " distinct neighbors. Scores range from 0.0 (no 
shared neighbors) to 1.0 (all neighbors are" +
-                               " shared).", 80))
-                       .appendNewLine()
-                       .appendln(WordUtils.wrap("This algorithm returns 
4-tuples containing two vertex IDs, the" +
-                               " number of shared neighbors, and the number of 
distinct neighbors.", 80))
-                       .appendNewLine()
-                       .appendln("usage: JaccardIndex --input <csv | rmat> 
--output <print | hash | csv>")
-                       .appendNewLine()
-                       .appendln("options:")
-                       .appendln("  --input csv --type <integer | string> 
[--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter 
LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-                       .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
-                       .appendNewLine()
-                       .appendln("  --output print")
-                       .appendln("  --output hash")
-                       .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
+       @Override
+       public String getLongDescription() {
+               return WordUtils.wrap(new StrBuilder()
+                       .appendln("Jaccard Index measures the similarity 
between vertex neighborhoods and " +
+                               "is computed as the number of shared neighbors 
divided by the number of " +
+                               "distinct neighbors. Scores range from 0.0 (no 
shared neighbors) to 1.0 (all " +
+                               "neighbors are shared).")
                        .appendNewLine()
-                       .appendln("Usage error: " + message)
-                       .toString();
+                       .append("The result contains two vertex IDs, the number 
of shared neighbors, and " +
+                               "the number of distinct neighbors.")
+                       .toString(), 80);
        }
 
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-               env.getConfig().setGlobalJobParameters(parameters);
-
-               int little_parallelism = 
parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-               DataSet ji;
-
-               switch (parameters.get("input", "")) {
-                       case "csv": {
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               GraphCsvReader reader = Graph
-                                       
.fromCsvReader(parameters.getRequired("input_filename"), env)
-                                               .ignoreCommentsEdges("#")
-                                               
.lineDelimiterEdges(lineDelimiter)
-                                               
.fieldDelimiterEdges(fieldDelimiter);
-
-                               switch (parameters.get("type", "")) {
-                                       case "integer": {
-                                               Graph<LongValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(LongValue.class);
-
-                                               if 
(parameters.getBoolean("simplify", false)) {
-                                                       graph = graph
-                                                               .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(false)
-                                                                       
.setParallelism(little_parallelism));
-                                               }
-
-                                               ji = graph
-                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } break;
-
-                                       case "string": {
-                                               Graph<StringValue, NullValue, 
NullValue> graph = reader
-                                                       
.keyType(StringValue.class);
-
-                                               if 
(parameters.getBoolean("simplify", false)) {
-                                                       graph = graph
-                                                               .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, 
NullValue>(false)
-                                                                       
.setParallelism(little_parallelism));
-                                               }
-
-                                               ji = graph
-                                                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, 
NullValue>()
-                                                               
.setLittleParallelism(little_parallelism));
-                                       } break;
-
-                                       default:
-                                               throw new 
ProgramParametrizationException(getUsage("invalid CSV type"));
-                               }
-                               } break;
-
-                       case "rmat": {
-                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
-                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
-
-                               long vertexCount = 1L << scale;
-                               long edgeCount = vertexCount * edgeFactor;
-
-                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                                       .setParallelism(little_parallelism)
-                                       .generate();
-
-                               boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-                               if (scale > 32) {
-                                       ji = graph
-                                               .run(new Simplify<LongValue, 
NullValue, NullValue>(clipAndFlip)
-                                                       
.setParallelism(little_parallelism))
-                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>()
-                                                       
.setLittleParallelism(little_parallelism));
-                               } else {
-                                       ji = graph
-                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
-                                                       
.setParallelism(little_parallelism))
-                                               .run(new Simplify<IntValue, 
NullValue, NullValue>(clipAndFlip)
-                                                       
.setParallelism(little_parallelism))
-                                               .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, 
NullValue>()
-                                                       
.setLittleParallelism(little_parallelism));
-                               }
-                               } break;
-
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid input type"));
-               }
-
-               switch (parameters.get("output", "")) {
-                       case "print":
-                               System.out.println();
-                               for (Object e: ji.collect()) {
-                                       Result result = (Result)e;
-                                       
System.out.println(result.toPrintableString());
-                               }
-                               break;
-
-                       case "hash":
-                               System.out.println();
-                               
System.out.println(DataSetUtils.checksumHashCode(ji));
-                               break;
-
-                       case "csv":
-                               String filename = 
parameters.getRequired("output_filename");
-
-                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
-                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-                               ji.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
-
-                               env.execute("Jaccard Index");
-                               break;
-
-                       default:
-                               throw new 
ProgramParametrizationException(getUsage("invalid output type"));
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               int lp = littleParallelism.getValue().intValue();
 
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+               result = graph
+                       .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
+                               .setLittleParallelism(lp));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
new file mode 100644
index 0000000..8cef077
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.DoubleParameter;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence;
+import org.apache.flink.graph.library.link_analysis.PageRank.Result;
+
+/**
+ * @see org.apache.flink.graph.library.link_analysis.PageRank
+ */
+public class PageRank<K, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
+
+       private static final int DEFAULT_ITERATIONS = 10;
+
+       private DoubleParameter dampingFactor = new DoubleParameter(this, 
"damping_factor")
+               .setDefaultValue(0.85)
+               .setMinimumValue(0.0, false)
+               .setMaximumValue(1.0, false);
+
+       private IterationConvergence iterationConvergence = new 
IterationConvergence(this, DEFAULT_ITERATIONS);
+
+       @Override
+       public String getName() {
+               return this.getClass().getSimpleName();
+       }
+
+       @Override
+       public String getShortDescription() {
+               return "score vertices by the number and quality of incoming 
links";
+       }
+
+       @Override
+       public String getLongDescription() {
+               return new StrBuilder()
+                       .appendln("PageRank computes a per-vertex score which 
is the sum of PageRank scores " +
+                               "transmitted over in-edges. Each vertex's score 
is divided evenly among " +
+                               "out-edges. High-scoring vertices are linked to 
by other high-scoring vertices.")
+                       .appendNewLine()
+                       .append("The result contains the vertex ID and PageRank 
score.")
+                       .toString();
+       }
+
+       @Override
+       public void plan(Graph<K, VV, EV> graph) throws Exception {
+               result = graph
+                       .run(new 
org.apache.flink.graph.library.link_analysis.PageRank<K, VV, EV>(
+                               dampingFactor.getValue(),
+                               iterationConvergence.getValue().iterations,
+                               
iterationConvergence.getValue().convergenceThreshold));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
new file mode 100644
index 0000000..98bdfc5
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+import java.util.List;
+
+/**
+ * A base driver storing a single result {@link DataSet} with values
+ * implementing {@link PrintableResult}.
+ *
+ * @param <R> algorithm's result type
+ */
+public abstract class SimpleDriver<R extends PrintableResult>
+extends ParameterizedBase {
+
+       protected DataSet<? extends R> result;
+
+       public void hash(String executionName) throws Exception {
+               Checksum checksum = new ChecksumHashCode<R>()
+                       .run((DataSet<R>) result)
+                       .execute(executionName);
+
+               System.out.println(checksum);
+       }
+
+       public void print(String executionName) throws Exception {
+               Collect<R> collector = new Collect<>();
+
+               // Refactored due to openjdk7 compile error: 
https://travis-ci.org/greghogan/flink/builds/200487761
+               List<R> records = collector.run((DataSet<R>) 
result).execute(executionName);
+
+               for (R result : records) {
+                       System.out.println(result.toPrintableString());
+               }
+       }
+
+       public void writeCSV(String filename, String lineDelimiter, String 
fieldDelimiter) {
+               result
+                       .writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+                               .name("CSV: " + filename);
+       }
+}

Reply via email to