[FLINK-5912] [gelly] Inputs for CSV and graph generators Create Input classes for reading graphs from CSV as well as for each of the graph generators.
This closes #3626 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ded25be4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ded25be4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ded25be4 Branch: refs/heads/table-retraction Commit: ded25be4d8fc51f2a58cbd3264daffe48dca6b04 Parents: 963f46e Author: Greg Hogan <c...@greghogan.com> Authored: Mon Mar 27 10:08:59 2017 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Fri Mar 31 11:16:05 2017 -0400 ---------------------------------------------------------------------- .../apache/flink/graph/drivers/input/CSV.java | 115 ++++++++++++ .../graph/drivers/input/CompleteGraph.java | 60 +++++++ .../flink/graph/drivers/input/CycleGraph.java | 60 +++++++ .../flink/graph/drivers/input/EmptyGraph.java | 55 ++++++ .../flink/graph/drivers/input/GridGraph.java | 144 +++++++++++++++ .../graph/drivers/input/HypercubeGraph.java | 60 +++++++ .../flink/graph/drivers/input/PathGraph.java | 60 +++++++ .../flink/graph/drivers/input/RMatGraph.java | 174 +++++++++++++++++++ .../graph/drivers/input/SingletonEdgeGraph.java | 60 +++++++ .../flink/graph/drivers/input/StarGraph.java | 60 +++++++ .../graph/drivers/parameter/Parameterized.java | 2 +- .../drivers/parameter/ParameterizedBase.java | 2 +- .../flink/graph/drivers/parameter/Simplify.java | 137 +++++++++++++++ .../graph/drivers/parameter/SimplifyTest.java | 56 ++++++ 14 files changed, 1043 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java new file mode 100644 index 0000000..58b65b6 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphCsvReader; +import org.apache.flink.graph.drivers.parameter.ChoiceParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.graph.drivers.parameter.Simplify; +import org.apache.flink.graph.drivers.parameter.StringParameter; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.types.StringValue; + +/** + * Read a {@link Graph} from a CSV file using {@link IntValue}, + * {@link LongValue}, or {@link StringValue} keys. + * + * @param <K> key type + */ +public class CSV<K extends Comparable<K>> +extends ParameterizedBase +implements Input<K, NullValue, NullValue> { + + private static final String INTEGER = "integer"; + + private static final String LONG = "long"; + + private static final String STRING = "string"; + + private ChoiceParameter type = new ChoiceParameter(this, "type") + .setDefaultValue(INTEGER) + .addChoices(LONG, STRING); + + private StringParameter inputFilename = new StringParameter(this, "input_filename"); + + private StringParameter commentPrefix = new StringParameter(this, "comment_prefix") + .setDefaultValue("#"); + + private StringParameter lineDelimiter = new StringParameter(this, "input_line_delimiter") + .setDefaultValue(CsvInputFormat.DEFAULT_LINE_DELIMITER); + + private StringParameter fieldDelimiter = new StringParameter(this, "input_field_delimiter") + .setDefaultValue(CsvInputFormat.DEFAULT_FIELD_DELIMITER); + + private Simplify simplify = new Simplify(this); + + @Override + public String getName() { + return CSV.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")"; + } + + /** + * Generate the graph as configured. + * + * @param env execution environment + * @return input graph + */ + public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception { + GraphCsvReader reader = Graph.fromCsvReader(inputFilename.getValue(), env) + .ignoreCommentsEdges(commentPrefix.getValue()) + .lineDelimiterEdges(lineDelimiter.getValue()) + .fieldDelimiterEdges(fieldDelimiter.getValue()); + + Graph<K, NullValue, NullValue> graph; + + switch (type.getValue()) { + case INTEGER: + graph = (Graph<K, NullValue, NullValue>) reader + .keyType(IntValue.class); + break; + + case LONG: + graph = (Graph<K, NullValue, NullValue>) reader + .keyType(LongValue.class); + break; + + case STRING: + graph = (Graph<K, NullValue, NullValue>) reader + .keyType(StringValue.class); + break; + + default: + throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'"); + } + + return simplify.simplify(graph); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java new file mode 100644 index 0000000..c31c5a8 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT; + +/** + * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}. + */ +public class CompleteGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return CompleteGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + " (" + vertexCount.getValue() + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java new file mode 100644 index 0000000..df66dab --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.CycleGraph.MINIMUM_VERTEX_COUNT; + +/** + * Generate a {@link org.apache.flink.graph.generator.CycleGraph}. + */ +public class CycleGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return CycleGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + " (" + vertexCount + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java new file mode 100644 index 0000000..794317b --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.graph.generator.EmptyGraph.MINIMUM_VERTEX_COUNT; + +/** + * Generate an {@link org.apache.flink.graph.generator.EmptyGraph}. + */ +public class EmptyGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + @Override + public String getName() { + return EmptyGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + " (" + vertexCount + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java new file mode 100644 index 0000000..d502215 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Generate a {@link org.apache.flink.graph.generator.GridGraph}. + */ +public class GridGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private static final String PREFIX = "dim"; + + private List<Dimension> dimensions = new ArrayList<>(); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return GridGraph.class.getSimpleName(); + } + + @Override + public String getUsage() { + return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage(); + } + + @Override + public void configure(ParameterTool parameterTool) throws ProgramParametrizationException { + super.configure(parameterTool); + + // add dimensions as ordered by dimension ID (dim0, dim1, dim2, ...) + + Map<Integer, String> dimensionMap = new TreeMap<>(); + + // first parse all dimensions into a sorted map + for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) { + if (entry.getKey().startsWith(PREFIX)) { + int dimensionId = Integer.parseInt(entry.getKey().substring(PREFIX.length())); + dimensionMap.put(dimensionId, entry.getValue()); + } + } + + // then store dimensions in order + for (String field : dimensionMap.values()) { + dimensions.add(new Dimension(field)); + } + } + + @Override + public String getIdentity() { + return getName() + " (" + dimensions + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + org.apache.flink.graph.generator.GridGraph graph = new org.apache.flink.graph.generator.GridGraph(env); + + for (Dimension dimension : dimensions) { + graph.addDimension(dimension.size, dimension.wrapEndpoints); + } + + return graph + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } + + /** + * Stores and parses the size and endpoint wrapping configuration for a + * {@link org.apache.flink.graph.generator.GridGraph} dimension. + */ + private static class Dimension { + private long size; + + private boolean wrapEndpoints; + + /** + * Configuration string to be parsed. The size integer and endpoint + * wrapping boolean must be separated by a colon. + * + * @param field configuration string + */ + public Dimension(String field) { + ProgramParametrizationException exception = new ProgramParametrizationException("Grid dimension must use " + + "a colon to separate the integer size and boolean indicating whether the dimension endpoints are " + + "connected: '" + field + "'"); + + if (! field.contains(":")) { + throw exception; + } + + String[] parts = field.split(":"); + + if (parts.length != 2) { + throw exception; + } + + try { + size = Long.parseLong(parts[0]); + wrapEndpoints = Boolean.parseBoolean(parts[1]); + } catch(NumberFormatException ex) { + throw exception; + } + } + + @Override + public String toString() { + return Long.toString(size) + (wrapEndpoints ? "+" : "â"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java new file mode 100644 index 0000000..3b87b00 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.HypercubeGraph.MINIMUM_DIMENSIONS; + +/** + * Generate a {@link org.apache.flink.graph.generator.HypercubeGraph}. + */ +public class HypercubeGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private LongParameter dimensions = new LongParameter(this, "dimensions") + .setMinimumValue(MINIMUM_DIMENSIONS); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return HypercubeGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + " (" + dimensions + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + return new org.apache.flink.graph.generator.HypercubeGraph(env, dimensions.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java new file mode 100644 index 0000000..968628c --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT; + +/** + * Generate a {@link org.apache.flink.graph.generator.PathGraph}. + */ +public class PathGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return PathGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + " (" + vertexCount + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + return new org.apache.flink.graph.generator.PathGraph(env, vertexCount.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java new file mode 100644 index 0000000..e4e6a4c --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.commons.lang3.text.WordUtils; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue; +import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; +import org.apache.flink.graph.drivers.parameter.BooleanParameter; +import org.apache.flink.graph.drivers.parameter.ChoiceParameter; +import org.apache.flink.graph.drivers.parameter.DoubleParameter; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.graph.drivers.parameter.Simplify; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.types.StringValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Generate an {@code RMatGraph} with {@link IntValue}, {@link LongValue}, + * or {@link StringValue} keys. + * + * @see org.apache.flink.graph.generator.RMatGraph + * + * @param <K> key type + */ +public class RMatGraph<K extends Comparable<K>> +extends ParameterizedBase +implements Input<K, NullValue, NullValue> { + + private static final String INTEGER = "integer"; + + private static final String LONG = "long"; + + private static final String STRING = "string"; + + private ChoiceParameter type = new ChoiceParameter(this, "type") + .setDefaultValue(INTEGER) + .addChoices(LONG, STRING); + + // generate graph with 2^scale vertices + private LongParameter scale = new LongParameter(this, "scale") + .setDefaultValue(10) + .setMinimumValue(1); + + // generate graph with edgeFactor * 2^scale edges + private LongParameter edgeFactor = new LongParameter(this, "edge_factor") + .setDefaultValue(16) + .setMinimumValue(1); + + // matrix parameters "a", "b", "c", and implicitly "d = 1 - a - b - c" + // describe the skew in the recursive matrix + private DoubleParameter a = new DoubleParameter(this, "a") + .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_A) + .setMinimumValue(0.0, false); + + private DoubleParameter b = new DoubleParameter(this, "b") + .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_B) + .setMinimumValue(0.0, false); + + private DoubleParameter c = new DoubleParameter(this, "c") + .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_C) + .setMinimumValue(0.0, false); + + // noise randomly pertubates the matrix parameters for each successive bit + // for each generated edge + private BooleanParameter noiseEnabled = new BooleanParameter(this, "noise_enabled"); + + private DoubleParameter noise = new DoubleParameter(this, "noise") + .setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_NOISE) + .setMinimumValue(0.0, true) + .setMaximumValue(2.0, true); + + private Simplify simplify = new Simplify(this); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return RMatGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + WordUtils.capitalize(type.getValue()) + + " (s" + scale + "e" + edgeFactor + simplify.getShortString() + ")"; + } + + /** + * Generate the graph as configured. + * + * @param env Flink execution environment + * @return input graph + */ + public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception { + int lp = littleParallelism.getValue().intValue(); + + RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale.getValue(); + long edgeCount = vertexCount * edgeFactor.getValue(); + + Graph<LongValue, NullValue, NullValue> rmatGraph = new org.apache.flink.graph.generator.RMatGraph<>( + env, rnd, vertexCount, edgeCount) + .setConstants(a.getValue().floatValue(), b.getValue().floatValue(), c.getValue().floatValue()) + .setNoise(noiseEnabled.getValue(), noise.getValue().floatValue()) + .setParallelism(lp) + .generate(); + + Graph<K, NullValue, NullValue> graph; + + switch (type.getValue()) { + case INTEGER: + if (scale.getValue() > 32) { + throw new ProgramParametrizationException( + "Scale '" + scale.getValue() + "' must be no greater than 32 for type 'integer'"); + } + graph = (Graph<K, NullValue, NullValue>) rmatGraph + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())); + break; + + case LONG: + if (scale.getValue() > 64) { + throw new ProgramParametrizationException( + "Scale '" + scale.getValue() + "' must be no greater than 64 for type 'long'"); + } + graph = (Graph<K, NullValue, NullValue>) rmatGraph; + break; + + case STRING: + // scale bound is same as LONG since keys are generated as LongValue + if (scale.getValue() > 64) { + throw new ProgramParametrizationException( + "Scale '" + scale.getValue() + "' must be no greater than 64 for type 'string'"); + } + graph = (Graph<K, NullValue, NullValue>) rmatGraph + .run(new TranslateGraphIds<LongValue, StringValue, NullValue, NullValue>(new LongValueToStringValue())); + break; + + default: + throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'"); + } + + // simplify *after* the translation from LongValue to IntValue or + // StringValue to improve the performance of the simplify operators + return simplify.simplify(graph); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java new file mode 100644 index 0000000..502826f --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT; + +/** + * Generate a {@link org.apache.flink.graph.generator.SingletonEdgeGraph}. + */ +public class SingletonEdgeGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private LongParameter vertexPairCount = new LongParameter(this, "vertex_pair_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return SingletonEdgeGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + " (" + vertexPairCount + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + return new org.apache.flink.graph.generator.SingletonEdgeGraph(env, vertexPairCount.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java new file mode 100644 index 0000000..b794f5c --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.StarGraph.MINIMUM_VERTEX_COUNT; + +/** + * Generate a {@link org.apache.flink.graph.generator.StarGraph}. + */ +public class StarGraph +extends ParameterizedBase +implements Input<LongValue, NullValue, NullValue> { + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return StarGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getName() + " (" + vertexCount + ")"; + } + + @Override + public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { + return new org.apache.flink.graph.generator.StarGraph(env, vertexCount.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java index b24f8cf..7b291be 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java @@ -39,7 +39,7 @@ public interface Parameterized { * * @return command-line documentation string */ - String getParameterization(); + String getUsage(); /** * Read parameter values from the command-line arguments. http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java index 3b9b80a..5f36ff5 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java @@ -45,7 +45,7 @@ implements Parameterized { } @Override - public String getParameterization() { + public String getUsage() { StrBuilder strBuilder = new StrBuilder(); // print parameters as ordered list http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java new file mode 100644 index 0000000..3e9fd9a --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.parameter; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.Simplify.Ordering; +import org.apache.flink.types.NullValue; + +/** + * A simple graph has no self-loops (edges where the source and target vertices + * are the same) and no duplicate edges. Flink stores an undirected graph as + * a directed graph where each undirected edge is represented by a directed + * edge in each direction. + * + * This {@link Parameter} indicates whether to simplify the graph and if the + * graph should be directed or undirected. + */ +public class Simplify +implements Parameter<Ordering> { + + public enum Ordering { + // leave the graph unchanged + NONE, + + // create a simple, directed graph + DIRECTED, + + // create a simple, undirected graph + UNDIRECTED, + + // create a simple, undirected graph + // remove input edges where source < target before symmetrizing the graph + UNDIRECTED_CLIP_AND_FLIP, + } + + private Ordering value; + + /** + * Add this parameter to the list of parameters stored by owner. + * + * @param owner the {@link Parameterized} using this {@link Parameter} + */ + public Simplify(ParameterizedBase owner) { + owner.addParameter(this); + } + + @Override + public String getUsage() { + return "[--simplify <directed | undirected [--clip_and_flip]>]"; + } + + @Override + public void configure(ParameterTool parameterTool) { + String ordering = parameterTool.get("simplify"); + + if (ordering == null) { + value = Ordering.NONE; + } else { + switch (ordering.toLowerCase()) { + case "directed": + value = Ordering.DIRECTED; + break; + case "undirected": + value = parameterTool.has("clip_and_flip") ? Ordering.UNDIRECTED_CLIP_AND_FLIP : Ordering.UNDIRECTED; + break; + default: + throw new ProgramParametrizationException( + "Expected 'directed' or 'undirected' ordering but received '" + ordering + "'"); + } + } + } + + @Override + public Ordering getValue() { + return value; + } + + /** + * Simplify the given graph based on the configured value. + * + * @param graph input graph + * @param <T> graph key type + * @return output graph + * @throws Exception on error + */ + public <T extends Comparable<T>> Graph<T, NullValue, NullValue> simplify(Graph<T, NullValue, NullValue> graph) + throws Exception { + + switch (value) { + case DIRECTED: + graph = graph + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<T, NullValue, NullValue>()); + break; + case UNDIRECTED: + graph = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(false)); + break; + case UNDIRECTED_CLIP_AND_FLIP: + graph = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(true)); + break; + } + + return graph; + } + + public String getShortString() { + switch (value) { + case DIRECTED: + return "d"; + case UNDIRECTED: + return "u"; + case UNDIRECTED_CLIP_AND_FLIP: + return "É"; + default: + return ""; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java new file mode 100644 index 0000000..12ae7dc --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.parameter; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.drivers.parameter.Simplify.Ordering; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SimplifyTest +extends ParameterTestBase { + + private Simplify parameter; + + @Before + public void setup() { + super.setup(); + + parameter = new Simplify(owner); + } + + @Test + public void testWithDirected() { + parameter.configure(ParameterTool.fromArgs(new String[]{"--simplify", "directed"})); + Assert.assertEquals(Ordering.DIRECTED, parameter.getValue()); + } + + @Test + public void testWithUndirected() { + parameter.configure(ParameterTool.fromArgs(new String[]{"--simplify", "undirected"})); + Assert.assertEquals(Ordering.UNDIRECTED, parameter.getValue()); + } + + @Test + public void testWithNoParameter() { + parameter.configure(ParameterTool.fromArgs(new String[]{})); + Assert.assertEquals(Ordering.NONE, parameter.getValue()); + } +}