[FLINK-5913] [gelly] Example drivers Replace existing and create new algorithm Driver implementations for each of the library methods.
This closes #3635 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a48357db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a48357db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a48357db Branch: refs/heads/table-retraction Commit: a48357db8c4187fd08f3b17880899ebbcb5d3b5e Parents: ded25be Author: Greg Hogan <c...@greghogan.com> Authored: Wed Oct 26 15:18:50 2016 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Fri Mar 31 11:17:26 2017 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Usage.java | 2 - .../apache/flink/graph/drivers/AdamicAdar.java | 71 ++++ .../graph/drivers/ClusteringCoefficient.java | 378 +++++-------------- .../graph/drivers/ConnectedComponents.java | 105 ++++++ .../apache/flink/graph/drivers/EdgeList.java | 92 +++++ .../apache/flink/graph/drivers/Graph500.java | 165 -------- .../flink/graph/drivers/GraphMetrics.java | 265 ++++--------- .../org/apache/flink/graph/drivers/HITS.java | 188 ++------- .../flink/graph/drivers/JaccardIndex.java | 224 ++--------- .../apache/flink/graph/drivers/PageRank.java | 74 ++++ .../flink/graph/drivers/SimpleDriver.java | 65 ++++ .../flink/graph/drivers/TriangleListing.java | 362 +++++------------- .../drivers/parameter/IterationConvergence.java | 89 +++++ .../graph/examples/ConnectedComponents.java | 141 ------- .../examples/GSASingleSourceShortestPaths.java | 4 +- .../parameter/IterationConvergenceTest.java | 66 ++++ .../examples/ConnectedComponentsITCase.java | 72 ---- .../main/java/org/apache/flink/graph/Graph.java | 16 +- .../graph/library/ConnectedComponents.java | 5 +- .../graph/library/GSAConnectedComponents.java | 8 +- .../flink/graph/library/LabelPropagation.java | 5 +- .../clustering/directed/TriangleListing.java | 2 +- .../undirected/LocalClusteringCoefficient.java | 2 +- .../graph/library/link_analysis/PageRank.java | 8 +- .../graph/library/similarity/AdamicAdar.java | 2 +- .../graph/library/similarity/JaccardIndex.java | 2 +- .../apache/flink/graph/utils/GraphUtils.java | 10 +- .../flink/graph/utils/NullValueEdgeMapper.java | 32 -- 28 files changed, 919 insertions(+), 1536 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java index d923bf0..642fe5b 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java @@ -28,7 +28,6 @@ public class Usage { private static final Class[] DRIVERS = new Class[]{ org.apache.flink.graph.drivers.ClusteringCoefficient.class, - org.apache.flink.graph.drivers.Graph500.class, org.apache.flink.graph.drivers.GraphMetrics.class, org.apache.flink.graph.drivers.HITS.class, org.apache.flink.graph.drivers.JaccardIndex.class, @@ -36,7 +35,6 @@ public class Usage { }; private static final Class[] EXAMPLES = new Class[]{ - org.apache.flink.graph.examples.ConnectedComponents.class, org.apache.flink.graph.examples.EuclideanGraphWeighing.class, org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class, org.apache.flink.graph.examples.IncrementalSSSP.class, http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java new file mode 100644 index 0000000..742c1de --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.commons.lang3.text.StrBuilder; +import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.library.similarity.AdamicAdar.Result; +import org.apache.flink.types.CopyableValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}. + */ +public class AdamicAdar<K extends CopyableValue<K>, VV, EV> +extends SimpleDriver<Result<K>> +implements Driver<K, VV, EV>, CSV, Print { + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "similarity score weighted by centerpoint degree"; + } + + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("Adamic-Adar measures the similarity between vertex neighborhoods and is " + + "computed as the sum of the inverse logarithm of centerpoint degree over shared " + + "neighbors.") + .appendNewLine() + .append("The algorithm result contains two vertex IDs and the similarity score.") + .toString(), 80); + } + + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + int lp = littleParallelism.getValue().intValue(); + + result = graph + .run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>() + .setLittleParallelism(lp)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java index 004390d..c463c0a 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java @@ -18,333 +18,127 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ChoiceParameter; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.types.CopyableValue; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * Driver for the library implementations of Global and Local Clustering Coefficient. - * - * This example reads a simple directed or undirected graph from a CSV file or - * generates an RMat graph with the given scale and edge factor then calculates - * the local clustering coefficient for each vertex and the global clustering - * coefficient for the graph. + * Driver for directed and undirected clustering coefficient algorithm and analytics. * + * @see org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient + * @see org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient */ -public class ClusteringCoefficient { - - private static final int DEFAULT_SCALE = 10; - - private static final int DEFAULT_EDGE_FACTOR = 16; - - private static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" + - " vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." + - " Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" + - " is a clique).", 80)) - .appendNewLine() - .appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + - " the vertex, and the number of edges between vertex neighbors.", 80)) - .appendNewLine() - .appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat> --output <print | hash | csv>") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - if (! parameters.has("directed")) { - throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); - } - boolean directedAlgorithm = parameters.getBoolean("directed"); - - int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); - - // global and local clustering coefficient results - GraphAnalytic gcc; - GraphAnalytic acc; - DataSet lcc; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph<LongValue, NullValue, NullValue> graph = reader - .keyType(LongValue.class); +public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends SimpleDriver<PrintableResult> +implements Driver<K, VV, EV>, CSV, Hash, Print { - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - } + private static final String DIRECTED = "directed"; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false) - .setParallelism(little_parallelism)); - } + private static final String UNDIRECTED = "undirected"; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; + private ChoiceParameter order = new ChoiceParameter(this, "order") + .addChoices(DIRECTED, UNDIRECTED); - case "string": { - Graph<StringValue, NullValue, NullValue> graph = reader - .keyType(StringValue.class); + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - } + private GraphAnalytic<K, VV, EV, ? extends PrintableResult> globalClusteringCoefficient; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false) - .setParallelism(little_parallelism)); - } + private GraphAnalytic<K, VV, EV, ? extends PrintableResult> averageClusteringCoefficient; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .setParallelism(little_parallelism) - .generate(); - - if (directedAlgorithm) { - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } else { - Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>() - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } - } else { - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + @Override + public String getName() { + return this.getClass().getSimpleName(); + } - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)); + @Override + public String getShortDescription() { + return "measure the connectedness of vertex neighborhoods"; + } - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } else { - Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)); + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("The local clustering coefficient measures the connectedness of each " + + "vertex's neighborhood. The global clustering coefficient measures the " + + "connected of the graph. The average clustering coefficient is the mean local " + + "clustering coefficient. Each score ranges from 0.0 (no edges between vertex " + + "neighbors) to 1.0 (neighborhood or graph is a clique).") + .appendNewLine() + .append("The algorithm result contains the vertex ID, degree, and number of edges " + + "connecting neighbors.") + .toString(), 80); + } - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } - } - } break; + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + int lp = littleParallelism.getValue().intValue(); - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } + switch (order.getValue()) { + case DIRECTED: + result = graph + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>() + .setLittleParallelism(lp)); - switch (parameters.get("output", "")) { - case "print": - if (directedAlgorithm) { - for (Object e: lcc.collect()) { - org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result = - (org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e; - System.out.println(result.toPrintableString()); - } - } else { - for (Object e: lcc.collect()) { - org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result = - (org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e; - System.out.println(result.toPrintableString()); - } - } - break; + globalClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>() + .setLittleParallelism(lp)); - case "hash": - System.out.println(DataSetUtils.checksumHashCode(lcc)); + averageClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>() + .setLittleParallelism(lp)); break; - case "csv": - String filename = parameters.get("output_filename"); + case UNDIRECTED: + result = graph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>() + .setLittleParallelism(lp)); - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + globalClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>() + .setLittleParallelism(lp)); - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("Clustering Coefficient"); + averageClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>() + .setLittleParallelism(lp)); break; - - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); } + } - System.out.println(gcc.getResult()); - System.out.println(acc.getResult()); + @Override + public void hash(String executionName) throws Exception { + super.hash(executionName); + printAnalytics(); + } - JobExecutionResult result = env.getLastJobExecutionResult(); + @Override + public void print(String executionName) throws Exception { + super.print(executionName); + printAnalytics(); + } + + @Override + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + super.writeCSV(filename, lineDelimiter, fieldDelimiter); + printAnalytics(); + } - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + private void printAnalytics() { + System.out.println(globalClusteringCoefficient.getResult().toPrintableString()); + System.out.println(averageClusteringCoefficient.getResult().toPrintableString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java new file mode 100644 index 0000000..32263cf --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.graph.library.GSAConnectedComponents; + +import java.util.List; + +/** + * Driver for {@link org.apache.flink.graph.library.GSAConnectedComponents}. + * + * The gather-sum-apply implementation is used because scatter-gather does not + * handle object reuse (see FLINK-5891). + */ +public class ConnectedComponents<K extends Comparable<K>, VV, EV> +extends ParameterizedBase +implements Driver<K, VV, EV>, CSV, Hash, Print { + + private DataSet<Vertex<K, K>> components; + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "ConnectedComponents"; + } + + @Override + public String getLongDescription() { + return "ConnectedComponents"; + } + + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + components = graph + .mapVertices(new MapVertices<K, VV>()) + .run(new GSAConnectedComponents<K, K, EV>(Integer.MAX_VALUE)); + } + + @Override + public void hash(String executionName) throws Exception { + Checksum checksum = new ChecksumHashCode<Vertex<K, K>>() + .run(components) + .execute(executionName); + + System.out.println(checksum); + } + + @Override + public void print(String executionName) throws Exception { + Collect<Vertex<K, K>> collector = new Collect<>(); + + // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 + List<Vertex<K, K>> records = collector.run(components).execute(executionName); + + for (Vertex<K, K> result : records) { + System.out.println(result); + } + } + + @Override + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + components + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } + + private static final class MapVertices<T, VT> + implements MapFunction<Vertex<T, VT>, T> { + @Override + public T map(Vertex<T, VT> value) throws Exception { + return value.f0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java new file mode 100644 index 0000000..85f32c3 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; + +import java.util.List; + +/** + * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s. + */ +public class EdgeList<K, VV, EV> +extends ParameterizedBase +implements Driver<K, VV, EV>, CSV, Hash, Print { + + private DataSet<Edge<K, EV>> edges; + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "the edge list"; + } + + @Override + public String getLongDescription() { + return "Pass-through of the graph's edge list."; + } + + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + edges = graph + .getEdges(); + } + + @Override + public void hash(String executionName) throws Exception { + Checksum checksum = new ChecksumHashCode<Edge<K, EV>>() + .run(edges) + .execute(executionName); + + System.out.println(checksum); + } + + @Override + public void print(String executionName) throws Exception { + Collect<Edge<K, EV>> collector = new Collect<>(); + + // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 + List<Edge<K, EV>> records = collector.run(edges).execute(executionName); + + for (Edge<K, EV> result : records) { + System.out.println(result); + } + + } + + @Override + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + edges + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java deleted file mode 100644 index c2abbf7..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.drivers; - -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.text.StrBuilder; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; - -import java.text.NumberFormat; - -/** - * Generate an RMat graph for Graph 500. - * - * Note that this does not yet implement permutation of vertex labels or edges. - * - * @see <a href="http://www.graph500.org/specifications">Graph 500</a> - */ -public class Graph500 { - - private static final int DEFAULT_SCALE = 10; - - private static final int DEFAULT_EDGE_FACTOR = 16; - - private static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln("A Graph500 generator using the Recursive Matrix (RMat) graph generator.") - .appendNewLine() - .appendln(WordUtils.wrap("The graph matrix contains 2^scale vertices although not every vertex will" + - " be represented in an edge. The number of edges is edge_factor * 2^scale edges" + - " although some edges may be duplicates.", 80)) - .appendNewLine() - .appendln("Note: this does not yet implement permutation of vertex labels or edges.") - .appendNewLine() - .appendln("usage: Graph500 --directed <true | false> --simplify <true | false> --output <print | hash | csv [options]>") - .appendNewLine() - .appendln("options:") - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - if (! parameters.has("directed")) { - throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); - } - boolean directed = parameters.getBoolean("directed"); - - if (! parameters.has("simplify")) { - throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'")); - } - boolean simplify = parameters.getBoolean("simplify"); - - - // Generate RMat graph - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (directed) { - if (simplify) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); - } - } else { - if (simplify) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); - } else { - graph = graph.getUndirected(); - } - } - - DataSet<Tuple2<LongValue, LongValue>> edges = graph - .getEdges() - .project(0, 1); - - // Print, hash, or write RMat graph to disk - switch (parameters.get("output", "")) { - case "print": - System.out.println(); - edges.print(); - break; - - case "hash": - System.out.println(); - System.out.println(DataSetUtils.checksumHashCode(edges)); - break; - - case "csv": - String filename = parameters.getRequired("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - edges.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("Graph500"); - break; - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java index 9b246df..cc5a894 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java @@ -18,224 +18,109 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ChoiceParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.CopyableValue; /** - * Computes vertex and edge metrics on a directed or undirected graph. + * Driver for directed and undirected graph metrics analytics. * * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics * @see org.apache.flink.graph.library.metric.directed.VertexMetrics * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics */ -public class GraphMetrics { +public class GraphMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends ParameterizedBase +implements Driver<K, VV, EV>, Hash, Print { - private static final int DEFAULT_SCALE = 10; + private static final String DIRECTED = "directed"; - private static final int DEFAULT_EDGE_FACTOR = 16; + private static final String UNDIRECTED = "undirected"; - private static final boolean DEFAULT_CLIP_AND_FLIP = true; + private ChoiceParameter order = new ChoiceParameter(this, "order") + .addChoices(DIRECTED, UNDIRECTED); - private static String getUsage(String message) { + private GraphAnalytic<K, VV, EV, ? extends PrintableResult> vertexMetrics; + + private GraphAnalytic<K, VV, EV, ? extends PrintableResult> edgeMetrics; + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "compute vertex and edge metrics"; + } + + @Override + public String getLongDescription() { return new StrBuilder() + .appendln("Computes metrics on a directed or undirected graph.") .appendNewLine() - .appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80)) - .appendNewLine() - .appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat>") + .appendln("Vertex metrics:") + .appendln("- number of vertices") + .appendln("- number of edges") + .appendln("- number of unidirectional edges (directed only)") + .appendln("- number of bidirectional edges (directed only)") + .appendln("- average degree") + .appendln("- number of triplets") + .appendln("- maximum degree") + .appendln("- maximum out degree (directed only)") + .appendln("- maximum in degree (directed only)") + .appendln("- maximum number of triplets") .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln("Usage error: " + message) + .appendln("Edge metrics:") + .appendln("- number of triangle triplets") + .appendln("- number of rectangle triplets") + .appendln("- maximum number of triangle triplets") + .append("- maximum number of rectangle triplets") .toString(); } - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + switch (order.getValue()) { + case DIRECTED: + vertexMetrics = graph + .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>()); - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); + edgeMetrics = graph + .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>()); + break; - if (! parameters.has("directed")) { - throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); - } - boolean directedAlgorithm = parameters.getBoolean("directed"); - - GraphAnalytic vm; - GraphAnalytic em; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.getRequired("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph<LongValue, NullValue, NullValue> graph = reader - .keyType(LongValue.class); - - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>()); - em = graph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>()); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>()); - em = graph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>()); - } - } break; - - case "string": { - Graph<StringValue, NullValue, NullValue> graph = reader - .keyType(StringValue.class); - - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>()); - em = graph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>()); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>()); - em = graph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>()); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (directedAlgorithm) { - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>()); - } else { - Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>()); - } - } else { - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - if (scale > 32) { - Graph<LongValue, NullValue, NullValue> newGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>()); - } else { - Graph<IntValue, NullValue, NullValue> newGraph = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>()); - } - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); + case UNDIRECTED: + vertexMetrics = graph + .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>()); + + edgeMetrics = graph + .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>()); + break; } + } - env.execute("Graph Metrics"); + @Override + public void hash(String executionName) throws Exception { + print(executionName); + } - System.out.println(); - System.out.print("Vertex metrics:\n "); - System.out.println(vm.getResult().toString().replace(";", "\n ")); - System.out.println(); - System.out.print("Edge metrics:\n "); - System.out.println(em.getResult().toString().replace(";", "\n ")); + @Override + public void print(String executionName) throws Exception { + vertexMetrics.execute(executionName); - JobExecutionResult result = env.getLastJobExecutionResult(); + System.out.print("Vertex metrics:\n "); + System.out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n ")); - NumberFormat nf = NumberFormat.getInstance(); System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + System.out.print("Edge metrics:\n "); + System.out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n ")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java index db27f0e..6081fea 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java @@ -18,177 +18,51 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.simple.directed.Simplify; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.IterationConvergence; import org.apache.flink.graph.library.link_analysis.HITS.Result; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; /** - * Driver for the library implementation of HITS (Hubs and Authorities). - * - * This example reads a simple, undirected graph from a CSV file or generates - * an undirected RMat graph with the given scale and edge factor then calculates - * hub and authority scores for each vertex. - * - * @see org.apache.flink.graph.library.link_analysis.HITS + * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}. */ -public class HITS { +public class HITS<K, VV, EV> +extends SimpleDriver<Result<K>> +implements Driver<K, VV, EV>, CSV, Print { private static final int DEFAULT_ITERATIONS = 10; - private static final int DEFAULT_SCALE = 10; + private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS); - private static final int DEFAULT_EDGE_FACTOR = 16; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" + - " scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" + - " and good \"authorities\" are linked from good \"hubs\".", 80)) - .appendNewLine() - .appendln("usage: HITS --input <csv | rmat> --output <print | hash | csv>") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); + @Override + public String getName() { + return this.getClass().getSimpleName(); } - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS); - - DataSet hits; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.getRequired("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - hits = reader - .keyType(LongValue.class) - .run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations)); - } break; - - case "string": { - hits = reader - .keyType(StringValue.class) - .run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations)); - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (scale > 32) { - hits = graph - .run(new Simplify<LongValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations)); - } else { - hits = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())) - .run(new Simplify<IntValue, NullValue, NullValue>()) - .run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } - - switch (parameters.get("output", "")) { - case "print": - System.out.println(); - for (Object e: hits.collect()) { - System.out.println(((Result)e).toPrintableString()); - } - break; - - case "hash": - System.out.println(); - System.out.println(DataSetUtils.checksumHashCode(hits)); - break; - - case "csv": - String filename = parameters.getRequired("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("HITS"); - break; - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } + @Override + public String getShortDescription() { + return "score vertices as hubs and authorities"; + } - JobExecutionResult result = env.getLastJobExecutionResult(); + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("Hyperlink-Induced Topic Search computes two interdependent scores for " + + "each vertex in a directed graph. A good \"hub\" links to good \"authorities\" " + + "and good \"authorities\" are linked to from good \"hubs\".") + .appendNewLine() + .append("The result contains the vertex ID, hub score, and authority score.") + .toString(), 80); + } - NumberFormat nf = NumberFormat.getInstance(); - System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + result = graph + .run(new org.apache.flink.graph.library.link_analysis.HITS<K, VV, EV>( + iterationConvergence.getValue().iterations, + iterationConvergence.getValue().convergenceThreshold)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java index 09479a6..1c836ea 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java @@ -18,211 +18,57 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.simple.undirected.Simplify; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.library.similarity.JaccardIndex.Result; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; +import org.apache.flink.types.CopyableValue; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * Driver for the library implementation of Jaccard Index. - * - * This example reads a simple, undirected graph from a CSV file or generates - * an undirected RMat graph with the given scale and edge factor then calculates - * all non-zero Jaccard Index similarity scores between vertices. - * - * @see org.apache.flink.graph.library.similarity.JaccardIndex + * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}. */ -public class JaccardIndex { +public class JaccardIndex<K extends CopyableValue<K>, VV, EV> +extends SimpleDriver<Result<K>> +implements Driver<K, VV, EV>, CSV, Hash, Print { - private static final int DEFAULT_SCALE = 10; + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); - private static final int DEFAULT_EDGE_FACTOR = 16; + @Override + public String getName() { + return this.getClass().getSimpleName(); + } - private static final boolean DEFAULT_CLIP_AND_FLIP = true; + @Override + public String getShortDescription() { + return "similarity score as fraction of common neighbors"; + } - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" + - " neighborhoods and is computed as the number of shared neighbors divided by the number of" + - " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" + - " shared).", 80)) - .appendNewLine() - .appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + - " number of shared neighbors, and the number of distinct neighbors.", 80)) - .appendNewLine() - .appendln("usage: JaccardIndex --input <csv | rmat> --output <print | hash | csv>") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("Jaccard Index measures the similarity between vertex neighborhoods and " + + "is computed as the number of shared neighbors divided by the number of " + + "distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all " + + "neighbors are shared).") .appendNewLine() - .appendln("Usage error: " + message) - .toString(); + .append("The result contains two vertex IDs, the number of shared neighbors, and " + + "the number of distinct neighbors.") + .toString(), 80); } - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); - - DataSet ji; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.getRequired("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph<LongValue, NullValue, NullValue> graph = reader - .keyType(LongValue.class); - - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false) - .setParallelism(little_parallelism)); - } - - ji = graph - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } break; - - case "string": { - Graph<StringValue, NullValue, NullValue> graph = reader - .keyType(StringValue.class); - - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false) - .setParallelism(little_parallelism)); - } - - ji = graph - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .setParallelism(little_parallelism) - .generate(); - - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - if (scale > 32) { - ji = graph - .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } else { - ji = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>() - .setLittleParallelism(little_parallelism)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } - - switch (parameters.get("output", "")) { - case "print": - System.out.println(); - for (Object e: ji.collect()) { - Result result = (Result)e; - System.out.println(result.toPrintableString()); - } - break; - - case "hash": - System.out.println(); - System.out.println(DataSetUtils.checksumHashCode(ji)); - break; - - case "csv": - String filename = parameters.getRequired("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("Jaccard Index"); - break; - - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } - - JobExecutionResult result = env.getLastJobExecutionResult(); + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + int lp = littleParallelism.getValue().intValue(); - NumberFormat nf = NumberFormat.getInstance(); - System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + result = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>() + .setLittleParallelism(lp)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java new file mode 100644 index 0000000..8cef077 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.DoubleParameter; +import org.apache.flink.graph.drivers.parameter.IterationConvergence; +import org.apache.flink.graph.library.link_analysis.PageRank.Result; + +/** + * @see org.apache.flink.graph.library.link_analysis.PageRank + */ +public class PageRank<K, VV, EV> +extends SimpleDriver<Result<K>> +implements Driver<K, VV, EV>, CSV, Print { + + private static final int DEFAULT_ITERATIONS = 10; + + private DoubleParameter dampingFactor = new DoubleParameter(this, "damping_factor") + .setDefaultValue(0.85) + .setMinimumValue(0.0, false) + .setMaximumValue(1.0, false); + + private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS); + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "score vertices by the number and quality of incoming links"; + } + + @Override + public String getLongDescription() { + return new StrBuilder() + .appendln("PageRank computes a per-vertex score which is the sum of PageRank scores " + + "transmitted over in-edges. Each vertex's score is divided evenly among " + + "out-edges. High-scoring vertices are linked to by other high-scoring vertices.") + .appendNewLine() + .append("The result contains the vertex ID and PageRank score.") + .toString(); + } + + @Override + public void plan(Graph<K, VV, EV> graph) throws Exception { + result = graph + .run(new org.apache.flink.graph.library.link_analysis.PageRank<K, VV, EV>( + dampingFactor.getValue(), + iterationConvergence.getValue().iterations, + iterationConvergence.getValue().convergenceThreshold)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java new file mode 100644 index 0000000..98bdfc5 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; + +import java.util.List; + +/** + * A base driver storing a single result {@link DataSet} with values + * implementing {@link PrintableResult}. + * + * @param <R> algorithm's result type + */ +public abstract class SimpleDriver<R extends PrintableResult> +extends ParameterizedBase { + + protected DataSet<? extends R> result; + + public void hash(String executionName) throws Exception { + Checksum checksum = new ChecksumHashCode<R>() + .run((DataSet<R>) result) + .execute(executionName); + + System.out.println(checksum); + } + + public void print(String executionName) throws Exception { + Collect<R> collector = new Collect<>(); + + // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 + List<R> records = collector.run((DataSet<R>) result).execute(executionName); + + for (R result : records) { + System.out.println(result.toPrintableString()); + } + } + + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + result + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } +}