[FLINK-5912] [gelly] Inputs for CSV and graph generators

Create Input classes for reading graphs from CSV as well as for each of
the graph generators.

This closes #3626


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ded25be4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ded25be4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ded25be4

Branch: refs/heads/table-retraction
Commit: ded25be4d8fc51f2a58cbd3264daffe48dca6b04
Parents: 963f46e
Author: Greg Hogan <c...@greghogan.com>
Authored: Mon Mar 27 10:08:59 2017 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Fri Mar 31 11:16:05 2017 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/drivers/input/CSV.java   | 115 ++++++++++++
 .../graph/drivers/input/CompleteGraph.java      |  60 +++++++
 .../flink/graph/drivers/input/CycleGraph.java   |  60 +++++++
 .../flink/graph/drivers/input/EmptyGraph.java   |  55 ++++++
 .../flink/graph/drivers/input/GridGraph.java    | 144 +++++++++++++++
 .../graph/drivers/input/HypercubeGraph.java     |  60 +++++++
 .../flink/graph/drivers/input/PathGraph.java    |  60 +++++++
 .../flink/graph/drivers/input/RMatGraph.java    | 174 +++++++++++++++++++
 .../graph/drivers/input/SingletonEdgeGraph.java |  60 +++++++
 .../flink/graph/drivers/input/StarGraph.java    |  60 +++++++
 .../graph/drivers/parameter/Parameterized.java  |   2 +-
 .../drivers/parameter/ParameterizedBase.java    |   2 +-
 .../flink/graph/drivers/parameter/Simplify.java | 137 +++++++++++++++
 .../graph/drivers/parameter/SimplifyTest.java   |  56 ++++++
 14 files changed, 1043 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
new file mode 100644
index 0000000..58b65b6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.Simplify;
+import org.apache.flink.graph.drivers.parameter.StringParameter;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+/**
+ * Read a {@link Graph} from a CSV file using {@link IntValue},
+ * {@link LongValue}, or {@link StringValue} keys.
+ *
+ * @param <K> key type
+ */
+public class CSV<K extends Comparable<K>>
+extends ParameterizedBase
+implements Input<K, NullValue, NullValue> {
+
+       private static final String INTEGER = "integer";
+
+       private static final String LONG = "long";
+
+       private static final String STRING = "string";
+
+       private ChoiceParameter type = new ChoiceParameter(this, "type")
+               .setDefaultValue(INTEGER)
+               .addChoices(LONG, STRING);
+
+       private StringParameter inputFilename = new StringParameter(this, 
"input_filename");
+
+       private StringParameter commentPrefix = new StringParameter(this, 
"comment_prefix")
+               .setDefaultValue("#");
+
+       private StringParameter lineDelimiter = new StringParameter(this, 
"input_line_delimiter")
+               .setDefaultValue(CsvInputFormat.DEFAULT_LINE_DELIMITER);
+
+       private StringParameter fieldDelimiter = new StringParameter(this, 
"input_field_delimiter")
+               .setDefaultValue(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
+
+       private Simplify simplify = new Simplify(this);
+
+       @Override
+       public String getName() {
+               return CSV.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return WordUtils.capitalize(getName()) + 
WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")";
+       }
+
+       /**
+        * Generate the graph as configured.
+        *
+        * @param env execution environment
+        * @return input graph
+        */
+       public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) 
throws Exception {
+               GraphCsvReader reader = 
Graph.fromCsvReader(inputFilename.getValue(), env)
+                       .ignoreCommentsEdges(commentPrefix.getValue())
+                       .lineDelimiterEdges(lineDelimiter.getValue())
+                       .fieldDelimiterEdges(fieldDelimiter.getValue());
+
+               Graph<K, NullValue, NullValue> graph;
+
+               switch (type.getValue()) {
+                       case INTEGER:
+                               graph = (Graph<K, NullValue, NullValue>) reader
+                                       .keyType(IntValue.class);
+                               break;
+
+                       case LONG:
+                               graph = (Graph<K, NullValue, NullValue>) reader
+                                       .keyType(LongValue.class);
+                               break;
+
+                       case STRING:
+                               graph = (Graph<K, NullValue, NullValue>) reader
+                                       .keyType(StringValue.class);
+                               break;
+
+                       default:
+                               throw new 
ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
+               }
+
+               return simplify.simplify(graph);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
new file mode 100644
index 0000000..c31c5a8
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static 
org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}.
+ */
+public class CompleteGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private LongParameter vertexCount = new LongParameter(this, 
"vertex_count")
+               .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return CompleteGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + vertexCount.getValue() + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               return new org.apache.flink.graph.generator.CompleteGraph(env, 
vertexCount.getValue())
+                       .setParallelism(littleParallelism.getValue().intValue())
+                       .generate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
new file mode 100644
index 0000000..df66dab
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.CycleGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CycleGraph}.
+ */
+public class CycleGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private LongParameter vertexCount = new LongParameter(this, 
"vertex_count")
+               .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return CycleGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + vertexCount + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               return new org.apache.flink.graph.generator.CycleGraph(env, 
vertexCount.getValue())
+                       .setParallelism(littleParallelism.getValue().intValue())
+                       .generate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
new file mode 100644
index 0000000..794317b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.graph.generator.EmptyGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate an {@link org.apache.flink.graph.generator.EmptyGraph}.
+ */
+public class EmptyGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private LongParameter vertexCount = new LongParameter(this, 
"vertex_count")
+               .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+       @Override
+       public String getName() {
+               return EmptyGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + vertexCount + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               return new org.apache.flink.graph.generator.EmptyGraph(env, 
vertexCount.getValue())
+                       .generate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
new file mode 100644
index 0000000..d502215
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.GridGraph}.
+ */
+public class GridGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private static final String PREFIX = "dim";
+
+       private List<Dimension> dimensions = new ArrayList<>();
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return GridGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getUsage() {
+               return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints 
[--dim2 ...]]" + super.getUsage();
+       }
+
+       @Override
+       public void configure(ParameterTool parameterTool) throws 
ProgramParametrizationException {
+               super.configure(parameterTool);
+
+               // add dimensions as ordered by dimension ID (dim0, dim1, dim2, 
...)
+
+               Map<Integer, String> dimensionMap = new TreeMap<>();
+
+               // first parse all dimensions into a sorted map
+               for (Map.Entry<String, String> entry : 
parameterTool.toMap().entrySet()) {
+                       if (entry.getKey().startsWith(PREFIX)) {
+                               int dimensionId = 
Integer.parseInt(entry.getKey().substring(PREFIX.length()));
+                               dimensionMap.put(dimensionId, entry.getValue());
+                       }
+               }
+
+               // then store dimensions in order
+               for (String field : dimensionMap.values()) {
+                       dimensions.add(new Dimension(field));
+               }
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + dimensions + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               org.apache.flink.graph.generator.GridGraph graph = new 
org.apache.flink.graph.generator.GridGraph(env);
+
+               for (Dimension dimension : dimensions) {
+                       graph.addDimension(dimension.size, 
dimension.wrapEndpoints);
+               }
+
+               return graph
+                       .setParallelism(littleParallelism.getValue().intValue())
+                       .generate();
+       }
+
+       /**
+        * Stores and parses the size and endpoint wrapping configuration for a
+        * {@link org.apache.flink.graph.generator.GridGraph} dimension.
+        */
+       private static class Dimension {
+               private long size;
+
+               private boolean wrapEndpoints;
+
+               /**
+                * Configuration string to be parsed. The size integer and 
endpoint
+                * wrapping boolean must be separated by a colon.
+                *
+                * @param field configuration string
+                */
+               public Dimension(String field) {
+                       ProgramParametrizationException exception = new 
ProgramParametrizationException("Grid dimension must use " +
+                               "a colon to separate the integer size and 
boolean indicating whether the dimension endpoints are " +
+                               "connected: '" + field + "'");
+
+                       if (! field.contains(":")) {
+                               throw exception;
+                       }
+
+                       String[] parts = field.split(":");
+
+                       if (parts.length != 2) {
+                               throw exception;
+                       }
+
+                       try {
+                               size = Long.parseLong(parts[0]);
+                               wrapEndpoints = Boolean.parseBoolean(parts[1]);
+                       } catch(NumberFormatException ex) {
+                               throw exception;
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       return Long.toString(size) + (wrapEndpoints ? "+" : 
"⊞");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
new file mode 100644
index 0000000..3b87b00
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static 
org.apache.flink.graph.generator.HypercubeGraph.MINIMUM_DIMENSIONS;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.HypercubeGraph}.
+ */
+public class HypercubeGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private LongParameter dimensions = new LongParameter(this, "dimensions")
+               .setMinimumValue(MINIMUM_DIMENSIONS);
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return HypercubeGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + dimensions + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               return new org.apache.flink.graph.generator.HypercubeGraph(env, 
dimensions.getValue())
+                       .setParallelism(littleParallelism.getValue().intValue())
+                       .generate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
new file mode 100644
index 0000000..968628c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.PathGraph}.
+ */
+public class PathGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private LongParameter vertexCount = new LongParameter(this, 
"vertex_count")
+               .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return PathGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + vertexCount + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               return new org.apache.flink.graph.generator.PathGraph(env, 
vertexCount.getValue())
+                       .setParallelism(littleParallelism.getValue().intValue())
+                       .generate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
new file mode 100644
index 0000000..e4e6a4c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
+import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.DoubleParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.Simplify;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Generate an {@code RMatGraph} with {@link IntValue}, {@link LongValue},
+ * or {@link StringValue} keys.
+ *
+ * @see org.apache.flink.graph.generator.RMatGraph
+ *
+ * @param <K> key type
+ */
+public class RMatGraph<K extends Comparable<K>>
+extends ParameterizedBase
+implements Input<K, NullValue, NullValue> {
+
+       private static final String INTEGER = "integer";
+
+       private static final String LONG = "long";
+
+       private static final String STRING = "string";
+
+       private ChoiceParameter type = new ChoiceParameter(this, "type")
+               .setDefaultValue(INTEGER)
+               .addChoices(LONG, STRING);
+
+       // generate graph with 2^scale vertices
+       private LongParameter scale = new LongParameter(this, "scale")
+               .setDefaultValue(10)
+               .setMinimumValue(1);
+
+       // generate graph with edgeFactor * 2^scale edges
+       private LongParameter edgeFactor = new LongParameter(this, 
"edge_factor")
+               .setDefaultValue(16)
+               .setMinimumValue(1);
+
+       // matrix parameters "a", "b", "c", and implicitly "d = 1 - a - b - c"
+       // describe the skew in the recursive matrix
+       private DoubleParameter a = new DoubleParameter(this, "a")
+               
.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_A)
+               .setMinimumValue(0.0, false);
+
+       private DoubleParameter b = new DoubleParameter(this, "b")
+               
.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_B)
+               .setMinimumValue(0.0, false);
+
+       private DoubleParameter c = new DoubleParameter(this, "c")
+               
.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_C)
+               .setMinimumValue(0.0, false);
+
+       // noise randomly pertubates the matrix parameters for each successive 
bit
+       // for each generated edge
+       private BooleanParameter noiseEnabled = new BooleanParameter(this, 
"noise_enabled");
+
+       private DoubleParameter noise = new DoubleParameter(this, "noise")
+               
.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_NOISE)
+               .setMinimumValue(0.0, true)
+               .setMaximumValue(2.0, true);
+
+       private Simplify simplify = new Simplify(this);
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return RMatGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + WordUtils.capitalize(type.getValue()) +
+                       " (s" + scale + "e" + edgeFactor + 
simplify.getShortString() + ")";
+       }
+
+       /**
+        * Generate the graph as configured.
+        *
+        * @param env Flink execution environment
+        * @return input graph
+        */
+       public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) 
throws Exception {
+               int lp = littleParallelism.getValue().intValue();
+
+               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
+
+               long vertexCount = 1L << scale.getValue();
+               long edgeCount = vertexCount * edgeFactor.getValue();
+
+               Graph<LongValue, NullValue, NullValue> rmatGraph = new 
org.apache.flink.graph.generator.RMatGraph<>(
+                               env, rnd, vertexCount, edgeCount)
+                       .setConstants(a.getValue().floatValue(), 
b.getValue().floatValue(), c.getValue().floatValue())
+                       .setNoise(noiseEnabled.getValue(), 
noise.getValue().floatValue())
+                       .setParallelism(lp)
+                       .generate();
+
+               Graph<K, NullValue, NullValue> graph;
+
+               switch (type.getValue()) {
+                       case INTEGER:
+                               if (scale.getValue() > 32) {
+                                       throw new 
ProgramParametrizationException(
+                                               "Scale '" + scale.getValue() + 
"' must be no greater than 32 for type 'integer'");
+                               }
+                               graph = (Graph<K, NullValue, NullValue>) 
rmatGraph
+                                       .run(new TranslateGraphIds<LongValue, 
IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()));
+                               break;
+
+                       case LONG:
+                               if (scale.getValue() > 64) {
+                                       throw new 
ProgramParametrizationException(
+                                               "Scale '" + scale.getValue() + 
"' must be no greater than 64 for type 'long'");
+                               }
+                               graph = (Graph<K, NullValue, NullValue>) 
rmatGraph;
+                               break;
+
+                       case STRING:
+                               // scale bound is same as LONG since keys are 
generated as LongValue
+                               if (scale.getValue() > 64) {
+                                       throw new 
ProgramParametrizationException(
+                                               "Scale '" + scale.getValue() + 
"' must be no greater than 64 for type 'string'");
+                               }
+                               graph = (Graph<K, NullValue, NullValue>) 
rmatGraph
+                                       .run(new TranslateGraphIds<LongValue, 
StringValue, NullValue, NullValue>(new LongValueToStringValue()));
+                               break;
+
+                       default:
+                               throw new 
ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
+               }
+
+               // simplify *after* the translation from LongValue to IntValue 
or
+               // StringValue to improve the performance of the simplify 
operators
+               return simplify.simplify(graph);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
new file mode 100644
index 0000000..502826f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.SingletonEdgeGraph}.
+ */
+public class SingletonEdgeGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private LongParameter vertexPairCount = new LongParameter(this, 
"vertex_pair_count")
+               .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return SingletonEdgeGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + vertexPairCount + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               return new 
org.apache.flink.graph.generator.SingletonEdgeGraph(env, 
vertexPairCount.getValue())
+                       .setParallelism(littleParallelism.getValue().intValue())
+                       .generate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
new file mode 100644
index 0000000..b794f5c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.StarGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.StarGraph}.
+ */
+public class StarGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+       private LongParameter vertexCount = new LongParameter(this, 
"vertex_count")
+               .setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+       private LongParameter littleParallelism = new LongParameter(this, 
"little_parallelism")
+               .setDefaultValue(PARALLELISM_DEFAULT);
+
+       @Override
+       public String getName() {
+               return StarGraph.class.getSimpleName();
+       }
+
+       @Override
+       public String getIdentity() {
+               return getName() + " (" + vertexCount + ")";
+       }
+
+       @Override
+       public Graph<LongValue, NullValue, NullValue> 
create(ExecutionEnvironment env) {
+               return new org.apache.flink.graph.generator.StarGraph(env, 
vertexCount.getValue())
+                       .setParallelism(littleParallelism.getValue().intValue())
+                       .generate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
index b24f8cf..7b291be 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
@@ -39,7 +39,7 @@ public interface Parameterized {
         *
         * @return command-line documentation string
         */
-       String getParameterization();
+       String getUsage();
 
        /**
         * Read parameter values from the command-line arguments.

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
index 3b9b80a..5f36ff5 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
@@ -45,7 +45,7 @@ implements Parameterized {
        }
 
        @Override
-       public String getParameterization() {
+       public String getUsage() {
                StrBuilder strBuilder = new StrBuilder();
 
                // print parameters as ordered list

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
new file mode 100644
index 0000000..3e9fd9a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.Simplify.Ordering;
+import org.apache.flink.types.NullValue;
+
+/**
+ * A simple graph has no self-loops (edges where the source and target vertices
+ * are the same) and no duplicate edges. Flink stores an undirected graph as
+ * a directed graph where each undirected edge is represented by a directed
+ * edge in each direction.
+ *
+ * This {@link Parameter} indicates whether to simplify the graph and if the
+ * graph should be directed or undirected.
+ */
+public class Simplify
+implements Parameter<Ordering> {
+
+       public enum Ordering {
+               // leave the graph unchanged
+               NONE,
+
+               // create a simple, directed graph
+               DIRECTED,
+
+               // create a simple, undirected graph
+               UNDIRECTED,
+
+               // create a simple, undirected graph
+               // remove input edges where source < target before symmetrizing 
the graph
+               UNDIRECTED_CLIP_AND_FLIP,
+       }
+
+       private Ordering value;
+
+       /**
+        * Add this parameter to the list of parameters stored by owner.
+        *
+        * @param owner the {@link Parameterized} using this {@link Parameter}
+        */
+       public Simplify(ParameterizedBase owner) {
+               owner.addParameter(this);
+       }
+
+       @Override
+       public String getUsage() {
+               return "[--simplify <directed | undirected [--clip_and_flip]>]";
+       }
+
+       @Override
+       public void configure(ParameterTool parameterTool) {
+               String ordering = parameterTool.get("simplify");
+
+               if (ordering == null) {
+                       value = Ordering.NONE;
+               } else {
+                       switch (ordering.toLowerCase()) {
+                               case "directed":
+                                       value = Ordering.DIRECTED;
+                                       break;
+                               case "undirected":
+                                       value = 
parameterTool.has("clip_and_flip") ? Ordering.UNDIRECTED_CLIP_AND_FLIP : 
Ordering.UNDIRECTED;
+                                       break;
+                               default:
+                                       throw new 
ProgramParametrizationException(
+                                               "Expected 'directed' or 
'undirected' ordering but received '" + ordering + "'");
+                       }
+               }
+       }
+
+       @Override
+       public Ordering getValue() {
+               return value;
+       }
+
+       /**
+        * Simplify the given graph based on the configured value.
+        *
+        * @param graph input graph
+        * @param <T> graph key type
+        * @return output graph
+        * @throws Exception on error
+        */
+       public <T extends Comparable<T>> Graph<T, NullValue, NullValue> 
simplify(Graph<T, NullValue, NullValue> graph)
+                       throws Exception {
+
+               switch (value) {
+                       case DIRECTED:
+                               graph = graph
+                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<T, NullValue, NullValue>());
+                               break;
+                       case UNDIRECTED:
+                               graph = graph
+                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, 
NullValue>(false));
+                               break;
+                       case UNDIRECTED_CLIP_AND_FLIP:
+                               graph = graph
+                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, 
NullValue>(true));
+                               break;
+               }
+
+               return graph;
+       }
+
+       public String getShortString() {
+               switch (value) {
+                       case DIRECTED:
+                               return "d";
+                       case UNDIRECTED:
+                               return "u";
+                       case UNDIRECTED_CLIP_AND_FLIP:
+                               return "ɔ";
+                       default:
+                               return "";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
new file mode 100644
index 0000000..12ae7dc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.drivers.parameter.Simplify.Ordering;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SimplifyTest
+extends ParameterTestBase {
+
+       private Simplify parameter;
+
+       @Before
+       public void setup() {
+               super.setup();
+
+               parameter = new Simplify(owner);
+       }
+
+       @Test
+       public void testWithDirected() {
+               parameter.configure(ParameterTool.fromArgs(new 
String[]{"--simplify", "directed"}));
+               Assert.assertEquals(Ordering.DIRECTED, parameter.getValue());
+       }
+
+       @Test
+       public void testWithUndirected() {
+               parameter.configure(ParameterTool.fromArgs(new 
String[]{"--simplify", "undirected"}));
+               Assert.assertEquals(Ordering.UNDIRECTED, parameter.getValue());
+       }
+
+       @Test
+       public void testWithNoParameter() {
+               parameter.configure(ParameterTool.fromArgs(new String[]{}));
+               Assert.assertEquals(Ordering.NONE, parameter.getValue());
+       }
+}

Reply via email to