[gelly] [refactoring] Removed Example end string from all gelly examples Added Algorithm end string to the library methods
This closes #625 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f4039dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f4039dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f4039dc Branch: refs/heads/master Commit: 8f4039dcb326a1af276ac4b93ffe5ea4da3e19bc Parents: 1479973 Author: andralungu <lungu.an...@gmail.com> Authored: Sun Apr 26 20:09:04 2015 +0200 Committer: vasia <va...@apache.org> Committed: Tue May 19 22:38:03 2015 +0200 ---------------------------------------------------------------------- .../flink/graph/example/CommunityDetection.java | 140 ++++++++++++ .../graph/example/ConnectedComponents.java | 141 ++++++++++++ .../example/ConnectedComponentsExample.java | 141 ------------ .../graph/example/EuclideanGraphExample.java | 210 ------------------ .../graph/example/EuclideanGraphWeighing.java | 210 ++++++++++++++++++ .../graph/example/GSAConnectedComponents.java | 176 +++++++++++++++ .../example/GSAConnectedComponentsExample.java | 176 --------------- .../example/GSASingleSourceShortestPaths.java | 178 +++++++++++++++ .../GSASingleSourceShortestPathsExample.java | 178 --------------- .../graph/example/JaccardSimilarityMeasure.java | 214 +++++++++++++++++++ .../JaccardSimilarityMeasureExample.java | 214 ------------------- .../flink/graph/example/LabelPropagation.java | 170 +++++++++++++++ .../graph/example/LabelPropagationExample.java | 170 --------------- .../flink/graph/example/MusicProfiles.java | 4 +- .../apache/flink/graph/example/PageRank.java | 149 +++++++++++++ .../flink/graph/example/PageRankExample.java | 149 ------------- .../SimpleCommunityDetectionExample.java | 140 ------------ .../example/SingleSourceShortestPaths.java | 133 ++++++++++++ .../SingleSourceShortestPathsExample.java | 133 ------------ .../example/utils/CommunityDetectionData.java | 65 ++++++ .../utils/ConnectedComponentsDefaultData.java | 52 +++++ .../utils/ConnectedComponentsExampleData.java | 52 ----- .../utils/EdgeWithLongIdNullValueParser.java | 33 --- .../graph/example/utils/EuclideanGraphData.java | 10 +- .../utils/SimpleCommunityDetectionData.java | 65 ------ .../utils/SingleSourceShortestPathsData.java | 4 + .../library/CommunityDetectionAlgorithm.java | 172 +++++++++++++++ .../graph/library/ConnectedComponents.java | 88 -------- .../library/ConnectedComponentsAlgorithm.java | 88 ++++++++ .../flink/graph/library/LabelPropagation.java | 111 ---------- .../library/LabelPropagationAlgorithm.java | 114 ++++++++++ .../apache/flink/graph/library/PageRank.java | 100 --------- .../flink/graph/library/PageRankAlgorithm.java | 104 +++++++++ .../graph/library/SimpleCommunityDetection.java | 172 --------------- .../library/SingleSourceShortestPaths.java | 108 ---------- .../SingleSourceShortestPathsAlgorithm.java | 111 ++++++++++ .../flink/graph/test/GatherSumApplyITCase.java | 10 +- .../test/example/CommunityDetectionITCase.java | 100 +++++++++ .../test/example/ConnectedComponentsITCase.java | 10 +- ...ctedComponentsWithRandomisedEdgesITCase.java | 4 +- .../example/EuclideanGraphExampleITCase.java | 77 ------- .../example/EuclideanGraphWeighingITCase.java | 77 +++++++ .../JaccardSimilarityMeasureExampleITCase.java | 72 ------- .../example/JaccardSimilarityMeasureITCase.java | 72 +++++++ .../example/LabelPropagationExampleITCase.java | 143 ------------- .../test/example/LabelPropagationITCase.java | 143 +++++++++++++ .../example/SimpleCommunityDetectionITCase.java | 100 --------- .../SingleSourceShortestPathsITCase.java | 4 +- 48 files changed, 2634 insertions(+), 2653 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java new file mode 100644 index 0000000..f9434d3 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.CommunityDetectionData; +import org.apache.flink.graph.library.CommunityDetectionAlgorithm; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; + +/** + * This example shows how to use the {@link org.apache.flink.graph.library.CommunityDetectionAlgorithm} + * library method: + * <ul> + * <li> with the edge data set given as a parameter + * <li> with default data + * </ul> + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges, + * 1-2 with weight 1.0 and 1-3 with weight 2.0. + * + * Usage <code>CommunityDetection <edge path> <result path> + * <number of iterations> <delta></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.CommunityDetectionData} + */ +public class CommunityDetection implements ProgramDescription { + + @SuppressWarnings("serial") + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // set up the graph + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + Graph<Long, Long, Double> graph = Graph.fromDataSet(edges, + new MapFunction<Long, Long>() { + @Override + public Long map(Long label) throws Exception { + return label; + } + }, env); + + // the result is in the form of <vertexId, communityId>, where the communityId is the label + // which the vertex converged to + DataSet<Vertex<Long, Long>> communityVertices = + graph.run(new CommunityDetectionAlgorithm(maxIterations, delta)).getVertices(); + + // emit result + if (fileOutput) { + communityVertices.writeAsCsv(outputPath, "\n", ","); + } else { + communityVertices.print(); + } + + env.execute("Executing Community Detection Example"); + } + + @Override + public String getDescription() { + return "Community Detection"; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + private static Integer maxIterations = CommunityDetectionData.MAX_ITERATIONS; + private static Double delta = CommunityDetectionData.DELTA; + + private static boolean parseParameters(String [] args) { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage CommunityDetection <edge path> <output path> " + + "<num iterations> <delta>"); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + maxIterations = Integer.parseInt(args[2]); + delta = Double.parseDouble(args[3]); + + } else { + System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("Usage CommunityDetection <edge path> <output path> " + + "<num iterations> <delta>"); + } + + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .ignoreComments("#") + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap<Long, Double>()); + } else { + return CommunityDetectionData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java new file mode 100644 index 0000000..b7c9045 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData; +import org.apache.flink.graph.library.ConnectedComponentsAlgorithm; +import org.apache.flink.types.NullValue; + +/** + * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm} + * library method: + * <ul> + * <li> with the edge data set given as a parameter + * <li> with default data + * </ul> + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1\t2\n1\t3\n</code> defines two edges, + * 1-2 with and 1-3. + * + * Usage <code>ConnectedComponents <edge path> <result path> + * <number of iterations> </code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData} + */ +public class ConnectedComponents implements ProgramDescription { + + @SuppressWarnings("serial") + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env); + + Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() { + @Override + public Long map(Long value) throws Exception { + return value; + } + }, env); + + DataSet<Vertex<Long, Long>> verticesWithMinIds = graph + .run(new ConnectedComponentsAlgorithm(maxIterations)).getVertices(); + + // emit result + if (fileOutput) { + verticesWithMinIds.writeAsCsv(outputPath, "\n", ","); + } else { + verticesWithMinIds.print(); + } + + env.execute("Connected Components Example"); + } + + @Override + public String getDescription() { + return "Connected Components Example"; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS; + + private static boolean parseParameters(String [] args) { + if(args.length > 0) { + if(args.length != 3) { + System.err.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>"); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + maxIterations = Integer.parseInt(args[2]); + + } else { + System.out.println("Executing ConnectedComponents example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("Usage ConnectedComponents <edge path> <output path> " + + "<num iterations>"); + } + + return true; + } + + @SuppressWarnings("serial") + private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .ignoreComments("#") + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { + @Override + public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { + return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); + } + }); + } else { + return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java deleted file mode 100644 index a185a70..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.ConnectedComponentsExampleData; -import org.apache.flink.graph.library.ConnectedComponents; -import org.apache.flink.types.NullValue; - -/** - * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponents} - * library method: - * <ul> - * <li> with the edge data set given as a parameter - * <li> with default data - * </ul> - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\n1\t3\n</code> defines two edges, - * 1-2 with and 1-3. - * - * Usage <code>ConnectedComponents <edge path> <result path> - * <number of iterations> </code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.ConnectedComponentsExampleData} - */ -public class ConnectedComponentsExample implements ProgramDescription { - - @SuppressWarnings("serial") - public static void main(String [] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env); - - Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() { - @Override - public Long map(Long value) throws Exception { - return value; - } - }, env); - - DataSet<Vertex<Long, Long>> verticesWithMinIds = graph - .run(new ConnectedComponents(maxIterations)).getVertices(); - - // emit result - if (fileOutput) { - verticesWithMinIds.writeAsCsv(outputPath, "\n", ","); - } else { - verticesWithMinIds.print(); - } - - env.execute("Connected Components Example"); - } - - @Override - public String getDescription() { - return "Connected Components Example"; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - private static Integer maxIterations = ConnectedComponentsExampleData.MAX_ITERATIONS; - - private static boolean parseParameters(String [] args) { - if(args.length > 0) { - if(args.length != 3) { - System.err.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>"); - return false; - } - - fileOutput = true; - edgeInputPath = args[0]; - outputPath = args[1]; - maxIterations = Integer.parseInt(args[2]); - - } else { - System.out.println("Executing ConnectedComponents example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>"); - } - - return true; - } - - @SuppressWarnings("serial") - private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { - @Override - public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { - return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); - } - }); - } else { - return ConnectedComponentsExampleData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java deleted file mode 100644 index fa08084..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Triplet; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.EuclideanGraphData; - -import java.io.Serializable; - -/** - * Given a directed, unweighted graph, with vertex values representing points in a plan, - * return a weighted graph where the edge weights are equal to the Euclidean distance between the - * src and the trg vertex values. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <ul> - * <li> Vertices are represented by their vertexIds and vertex values and are separated by newlines, - * the value being formed of two doubles separated by a comma. - * For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices - * <li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas. - * Edges themselves are separated by newlines. - * For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3. - * </ul> - * </p> - * - * Usage <code>EuclideanGraphExample <vertex path> <edge path> <result path></code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.EuclideanGraphData} - */ -@SuppressWarnings("serial") -public class EuclideanGraphExample implements ProgramDescription { - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env); - - DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); - - Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env); - - // the edge value will be the Euclidean distance between its src and trg vertex - DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets() - .map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() { - - @Override - public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet) - throws Exception { - - Vertex<Long, Point> srcVertex = triplet.getSrcVertex(); - Vertex<Long, Point> trgVertex = triplet.getTrgVertex(); - - return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(), - srcVertex.getValue().euclideanDistance(trgVertex.getValue())); - } - }); - - Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight, - new MapFunction<Tuple2<Double, Double>, Double>() { - - @Override - public Double map(Tuple2<Double, Double> distance) throws Exception { - return distance.f1; - } - }); - - // retrieve the edges from the final result - DataSet<Edge<Long, Double>> result = resultedGraph.getEdges(); - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", ","); - } else { - result.print(); - } - - env.execute("Euclidean Graph Example"); - } - - @Override - public String getDescription() { - return "Weighing a graph by computing the Euclidean distance " + - "between its vertices"; - } - - // ************************************************************************* - // DATA TYPES - // ************************************************************************* - - /** - * A simple two-dimensional point. - */ - public static class Point implements Serializable { - - public double x, y; - - public Point() {} - - public Point(double x, double y) { - this.x = x; - this.y = y; - } - - public double euclideanDistance(Point other) { - return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y)); - } - - @Override - public String toString() { - return x + " " + y; - } - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static String verticesInputPath = null; - - private static String edgesInputPath = null; - - private static String outputPath = null; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - if (args.length == 3) { - fileOutput = true; - verticesInputPath = args[0]; - edgesInputPath = args[1]; - outputPath = args[2]; - } else { - System.out.println("Executing Euclidean Graph example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("See the documentation for the correct format of input files."); - System.err.println("Usage: EuclideanGraphExample <input vertices path> <input edges path>" + - " <output path>"); - return false; - } - } - return true; - } - - private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(verticesInputPath) - .lineDelimiter("\n") - .types(Long.class, Double.class, Double.class) - .map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() { - - @Override - public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception { - return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2)); - } - }); - } else { - return EuclideanGraphData.getDefaultVertexDataSet(env); - } - } - - private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { - - @Override - public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { - return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0); - } - }); - } else { - return EuclideanGraphData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java new file mode 100644 index 0000000..565ef69 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.EuclideanGraphData; + +import java.io.Serializable; + +/** + * Given a directed, unweighted graph, with vertex values representing points in a plan, + * return a weighted graph where the edge weights are equal to the Euclidean distance between the + * src and the trg vertex values. + * + * <p> + * Input files are plain text files and must be formatted as follows: + * <ul> + * <li> Vertices are represented by their vertexIds and vertex values and are separated by newlines, + * the value being formed of two doubles separated by a comma. + * For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices + * <li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas. + * Edges themselves are separated by newlines. + * For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3. + * </ul> + * </p> + * + * Usage <code>EuclideanGraphWeighing <vertex path> <edge path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.EuclideanGraphData} + */ +@SuppressWarnings("serial") +public class EuclideanGraphWeighing implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env); + + // the edge value will be the Euclidean distance between its src and trg vertex + DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets() + .map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() { + + @Override + public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet) + throws Exception { + + Vertex<Long, Point> srcVertex = triplet.getSrcVertex(); + Vertex<Long, Point> trgVertex = triplet.getTrgVertex(); + + return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(), + srcVertex.getValue().euclideanDistance(trgVertex.getValue())); + } + }); + + Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight, + new MapFunction<Tuple2<Double, Double>, Double>() { + + @Override + public Double map(Tuple2<Double, Double> distance) throws Exception { + return distance.f1; + } + }); + + // retrieve the edges from the final result + DataSet<Edge<Long, Double>> result = resultedGraph.getEdges(); + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", ","); + } else { + result.print(); + } + + env.execute("Euclidean Graph Weighing Example"); + } + + @Override + public String getDescription() { + return "Weighing a graph by computing the Euclidean distance " + + "between its vertices"; + } + + // ************************************************************************* + // DATA TYPES + // ************************************************************************* + + /** + * A simple two-dimensional point. + */ + public static class Point implements Serializable { + + public double x, y; + + public Point() {} + + public Point(double x, double y) { + this.x = x; + this.y = y; + } + + public double euclideanDistance(Point other) { + return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y)); + } + + @Override + public String toString() { + return x + " " + y; + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static String verticesInputPath = null; + + private static String edgesInputPath = null; + + private static String outputPath = null; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + if (args.length == 3) { + fileOutput = true; + verticesInputPath = args[0]; + edgesInputPath = args[1]; + outputPath = args[2]; + } else { + System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("See the documentation for the correct format of input files."); + System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" + + " <output path>"); + return false; + } + } + return true; + } + + private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(verticesInputPath) + .lineDelimiter("\n") + .types(Long.class, Double.class, Double.class) + .map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() { + + @Override + public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception { + return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2)); + } + }); + } else { + return EuclideanGraphData.getDefaultVertexDataSet(env); + } + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { + + @Override + public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { + return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0); + } + }); + } else { + return EuclideanGraphData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java new file mode 100755 index 0000000..30855e4 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +/** + * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration + */ +public class GSAConnectedComponents implements ProgramDescription { + + // -------------------------------------------------------------------------------------------- + // Program + // -------------------------------------------------------------------------------------------- + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env); + + Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env); + + // Execute the GSA iteration + Graph<Long, Long, NullValue> result = + graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), + new UpdateComponentId(), maxIterations); + + // Extract the vertices as the result + DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices(); + + // emit result + if (fileOutput) { + connectedComponents.writeAsCsv(outputPath, "\n", " "); + } else { + connectedComponents.print(); + } + + env.execute("GSA Connected Components"); + } + + @SuppressWarnings("serial") + private static final class InitVertices implements MapFunction<Long, Long> { + + public Long map(Long vertexId) { + return vertexId; + } + } + + // -------------------------------------------------------------------------------------------- + // Connected Components UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> { + + public Long gather(Neighbor<Long, NullValue> neighbor) { + return neighbor.getNeighborValue(); + } + }; + + @SuppressWarnings("serial") + private static final class SelectMinId extends SumFunction<Long, NullValue, Long> { + + public Long sum(Long newValue, Long currentValue) { + return Math.min(newValue, currentValue); + } + }; + + @SuppressWarnings("serial") + private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> { + + public void apply(Long summedValue, Long origValue) { + if (summedValue < origValue) { + setResult(summedValue); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Util methods + // -------------------------------------------------------------------------------------------- + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + + private static int maxIterations = 16; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + + if (args.length != 3) { + System.err.println("Usage: GSAConnectedComponents <edge path> " + + "<result path> <max iterations>"); + return false; + } + + edgeInputPath = args[0]; + outputPath = args[1]; + maxIterations = Integer.parseInt(args[2]); + } else { + System.out.println("Executing GSA Connected Components example with built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: GSAConnectedComponents <edge path> <result path> <max iterations>"); + } + return true; + } + + @SuppressWarnings("serial") + private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { + + public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { + return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); + } + }); + } + + // Generates 3 components of size 2 + return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() { + @Override + public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception { + out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance())); + } + }); + } + + @Override + public String getDescription() { + return "GSA Connected Components"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java deleted file mode 100755 index 7c39123..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.gsa.ApplyFunction; -import org.apache.flink.graph.gsa.GatherFunction; -import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.graph.gsa.Neighbor; -import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; - -/** - * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration - */ -public class GSAConnectedComponentsExample implements ProgramDescription { - - // -------------------------------------------------------------------------------------------- - // Program - // -------------------------------------------------------------------------------------------- - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env); - - Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env); - - // Execute the GSA iteration - Graph<Long, Long, NullValue> result = - graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), - new UpdateComponentId(), maxIterations); - - // Extract the vertices as the result - DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices(); - - // emit result - if (fileOutput) { - connectedComponents.writeAsCsv(outputPath, "\n", " "); - } else { - connectedComponents.print(); - } - - env.execute("GSA Connected Components"); - } - - @SuppressWarnings("serial") - private static final class InitVertices implements MapFunction<Long, Long> { - - public Long map(Long vertexId) { - return vertexId; - } - } - - // -------------------------------------------------------------------------------------------- - // Connected Components UDFs - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("serial") - private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> { - - public Long gather(Neighbor<Long, NullValue> neighbor) { - return neighbor.getNeighborValue(); - } - }; - - @SuppressWarnings("serial") - private static final class SelectMinId extends SumFunction<Long, NullValue, Long> { - - public Long sum(Long newValue, Long currentValue) { - return Math.min(newValue, currentValue); - } - }; - - @SuppressWarnings("serial") - private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> { - - public void apply(Long summedValue, Long origValue) { - if (summedValue < origValue) { - setResult(summedValue); - } - } - }; - - // -------------------------------------------------------------------------------------------- - // Util methods - // -------------------------------------------------------------------------------------------- - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - - private static int maxIterations = 16; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - - if (args.length != 3) { - System.err.println("Usage: GSAConnectedComponentsExample <edge path> " + - "<result path> <max iterations>"); - return false; - } - - edgeInputPath = args[0]; - outputPath = args[1]; - maxIterations = Integer.parseInt(args[2]); - } else { - System.out.println("Executing GSA Connected Components example with built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: GSAConnectedComponentsExample <edge path> <result path> <max iterations>"); - } - return true; - } - - @SuppressWarnings("serial") - private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgeInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { - - public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { - return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); - } - }); - } - - // Generates 3 components of size 2 - return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() { - @Override - public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception { - out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance())); - } - }); - } - - @Override - public String getDescription() { - return "GSA Connected Components"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java new file mode 100755 index 0000000..bbc344f --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; + +/** + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration + */ +public class GSASingleSourceShortestPaths implements ProgramDescription { + + // -------------------------------------------------------------------------------------------- + // Program + // -------------------------------------------------------------------------------------------- + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env); + + Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); + + // Execute the GSA iteration + Graph<Long, Double, Double> result = graph + .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(), + new UpdateDistance(), maxIterations); + + // Extract the vertices as the result + DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); + + // emit result + if(fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " "); + } else { + singleSourceShortestPaths.print(); + } + + env.execute("GSA Single Source Shortest Paths"); + } + + @SuppressWarnings("serial") + private static final class InitVertices implements MapFunction<Long, Double>{ + + private long srcId; + + public InitVertices(long srcId) { + this.srcId = srcId; + } + + public Double map(Long id) { + if (id.equals(srcId)) { + return 0.0; + } + else { + return Double.POSITIVE_INFINITY; + } + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class CalculateDistances extends GatherFunction<Double, Double, Double> { + + public Double gather(Neighbor<Double, Double> neighbor) { + return neighbor.getNeighborValue() + neighbor.getEdgeValue(); + } + }; + + @SuppressWarnings("serial") + private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> { + + public Double sum(Double newValue, Double currentValue) { + return Math.min(newValue, currentValue); + } + }; + + @SuppressWarnings("serial") + private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> { + + public void apply(Double newDistance, Double oldDistance) { + if (newDistance < oldDistance) { + setResult(newDistance); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Util methods + // -------------------------------------------------------------------------------------------- + + private static boolean fileOutput = false; + + private static Long srcVertexId = 1l; + + private static String edgesInputPath = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + return false; + } + + fileOutput = true; + srcVertexId = Long.parseLong(args[0]); + edgesInputPath = args[1]; + outputPath = args[2]; + maxIterations = Integer.parseInt(args[3]); + } else { + System.out.println("Executing GSASingle Source Shortest Paths example " + + "with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + + " <input edges path> <output path> <num iterations>"); + } + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(edgesInputPath) + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap<Long, Double>()); + } else { + return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + } + } + + @Override + public String getDescription() { + return "GSA Single Source Shortest Paths"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java deleted file mode 100755 index 75cbd78..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; -import org.apache.flink.graph.gsa.ApplyFunction; -import org.apache.flink.graph.gsa.GatherFunction; -import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.graph.gsa.Neighbor; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; - -/** - * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration - */ -public class GSASingleSourceShortestPathsExample implements ProgramDescription { - - // -------------------------------------------------------------------------------------------- - // Program - // -------------------------------------------------------------------------------------------- - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env); - - Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); - - // Execute the GSA iteration - Graph<Long, Double, Double> result = graph - .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(), - new UpdateDistance(), maxIterations); - - // Extract the vertices as the result - DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); - - // emit result - if(fileOutput) { - singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " "); - } else { - singleSourceShortestPaths.print(); - } - - env.execute("GSA Single Source Shortest Paths Example"); - } - - @SuppressWarnings("serial") - private static final class InitVertices implements MapFunction<Long, Double>{ - - private long srcId; - - public InitVertices(long srcId) { - this.srcId = srcId; - } - - public Double map(Long id) { - if (id.equals(srcId)) { - return 0.0; - } - else { - return Double.POSITIVE_INFINITY; - } - } - } - - // -------------------------------------------------------------------------------------------- - // Single Source Shortest Path UDFs - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("serial") - private static final class CalculateDistances extends GatherFunction<Double, Double, Double> { - - public Double gather(Neighbor<Double, Double> neighbor) { - return neighbor.getNeighborValue() + neighbor.getEdgeValue(); - } - }; - - @SuppressWarnings("serial") - private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> { - - public Double sum(Double newValue, Double currentValue) { - return Math.min(newValue, currentValue); - } - }; - - @SuppressWarnings("serial") - private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> { - - public void apply(Double newDistance, Double oldDistance) { - if (newDistance < oldDistance) { - setResult(newDistance); - } - } - }; - - // -------------------------------------------------------------------------------------------- - // Util methods - // -------------------------------------------------------------------------------------------- - - private static boolean fileOutput = false; - - private static Long srcVertexId = 1l; - - private static String edgesInputPath = null; - - private static String outputPath = null; - - private static int maxIterations = 5; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - if(args.length != 4) { - System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - return false; - } - - fileOutput = true; - srcVertexId = Long.parseLong(args[0]); - edgesInputPath = args[1]; - outputPath = args[2]; - maxIterations = Integer.parseInt(args[3]); - } else { - System.out.println("Executing GSASingle Source Shortest Paths example " - + "with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - } - return true; - } - - private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap<Long, Double>()); - } else { - return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); - } - } - - @Override - public String getDescription() { - return "GSA Single Source Shortest Paths"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java new file mode 100644 index 0000000..dddaf41 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.EdgesFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size + * of the union of neighbor sets - for the src and target vertices. + * + * <p> + * Input files are plain text files and must be formatted as follows: + * <br> + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3. + * </p> + * + * Usage <code> JaccardSimilarityMeasure <edge path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings("serial") +public class JaccardSimilarityMeasure implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + + Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env); + + DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors = + graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL); + + Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); + + // the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors) + DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets() + .map(new WeighEdgesMapper()); + + DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight, + new MapFunction<Tuple2<Double, Double>, Double>() { + + @Override + public Double map(Tuple2<Double, Double> value) throws Exception { + return value.f1; + } + }).getEdges(); + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", ","); + } else { + result.print(); + } + + env.execute("Executing Jaccard Similarity Measure"); + } + + @Override + public String getDescription() { + return "Vertex Jaccard Similarity Measure"; + } + + /** + * Each vertex will have a HashSet containing its neighbor ids as value. + */ + private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> { + + @Override + public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges, + Collector<Vertex<Long, HashSet<Long>>> out) throws Exception { + + HashSet<Long> neighborsHashSet = new HashSet<Long>(); + long vertexId = -1; + + for(Tuple2<Long, Edge<Long, Double>> edge : edges) { + neighborsHashSet.add(getNeighborID(edge)); + vertexId = edge.f0; + } + out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet)); + } + } + + /** + * The edge weight will be the Jaccard coefficient, which is computed as follows: + * + * Consider the edge x-y + * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively. + * sizeX+sizeY = union + intersection of neighborhoods + * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods + * The intersection can then be deduced. + * + * The Jaccard similarity coefficient is then, the intersection/union. + */ + private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>, + Tuple3<Long, Long, Double>> { + + @Override + public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet) + throws Exception { + + Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex(); + Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex(); + + long unionPlusIntersection = source.getValue().size() + target.getValue().size(); + // within a HashSet, all elements are distinct + source.getValue().addAll(target.getValue()); + // the source value contains the union + long union = source.getValue().size(); + long intersection = unionPlusIntersection - union; + + return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union); + } + } + + /** + * Helper method that extracts the neighborId given an edge. + * @param edge + * @return + */ + private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) { + if(edge.f1.getSource() == edge.f0) { + return edge.f1.getTarget(); + } else { + return edge.f1.getSource(); + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + + private static boolean parseParameters(String [] args) { + if(args.length > 0) { + if(args.length != 2) { + System.err.println("Usage JaccardSimilarityMeasure <edge path> <output path>"); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + } else { + System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("Usage JaccardSimilarityMeasure <edge path> <output path>"); + } + + return true; + } + + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .ignoreComments("#") + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { + @Override + public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { + return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0)); + } + }); + } else { + return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java deleted file mode 100644 index 2783a29..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.EdgesFunction; -import org.apache.flink.graph.Triplet; -import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; -import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; - -import java.util.HashSet; - -/** - * Given a directed, unweighted graph, return a weighted graph where the edge values are equal - * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size - * of the union of neighbor sets - for the src and target vertices. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <br> - * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. - * Edges themselves are separated by newlines. - * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3. - * </p> - * - * Usage <code> JaccardSimilarityMeasureExample <edge path> <result path></code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} - */ -@SuppressWarnings("serial") -public class JaccardSimilarityMeasureExample implements ProgramDescription { - - public static void main(String [] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); - - Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env); - - DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors = - graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL); - - Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); - - // the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors) - DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets() - .map(new WeighEdgesMapper()); - - DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight, - new MapFunction<Tuple2<Double, Double>, Double>() { - - @Override - public Double map(Tuple2<Double, Double> value) throws Exception { - return value.f1; - } - }).getEdges(); - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", ","); - } else { - result.print(); - } - - env.execute("Executing Jaccard Similarity Measure"); - } - - @Override - public String getDescription() { - return "Vertex Jaccard Similarity Measure"; - } - - /** - * Each vertex will have a HashSet containing its neighbor ids as value. - */ - private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> { - - @Override - public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges, - Collector<Vertex<Long, HashSet<Long>>> out) throws Exception { - - HashSet<Long> neighborsHashSet = new HashSet<Long>(); - long vertexId = -1; - - for(Tuple2<Long, Edge<Long, Double>> edge : edges) { - neighborsHashSet.add(getNeighborID(edge)); - vertexId = edge.f0; - } - out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet)); - } - } - - /** - * The edge weight will be the Jaccard coefficient, which is computed as follows: - * - * Consider the edge x-y - * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively. - * sizeX+sizeY = union + intersection of neighborhoods - * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods - * The intersection can then be deduced. - * - * The Jaccard similarity coefficient is then, the intersection/union. - */ - private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>, - Tuple3<Long, Long, Double>> { - - @Override - public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet) - throws Exception { - - Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex(); - Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex(); - - long unionPlusIntersection = source.getValue().size() + target.getValue().size(); - // within a HashSet, all elements are distinct - source.getValue().addAll(target.getValue()); - // the source value contains the union - long union = source.getValue().size(); - long intersection = unionPlusIntersection - union; - - return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union); - } - } - - /** - * Helper method that extracts the neighborId given an edge. - * @param edge - * @return - */ - private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) { - if(edge.f1.getSource() == edge.f0) { - return edge.f1.getTarget(); - } else { - return edge.f1.getSource(); - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - - private static boolean parseParameters(String [] args) { - if(args.length > 0) { - if(args.length != 2) { - System.err.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>"); - return false; - } - - fileOutput = true; - edgeInputPath = args[0]; - outputPath = args[1]; - } else { - System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>"); - } - - return true; - } - - private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { - @Override - public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { - return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0)); - } - }); - } else { - return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java new file mode 100644 index 0000000..4012a4e --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.library.LabelPropagationAlgorithm; +import org.apache.flink.graph.utils.Tuple2ToVertexMap; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +/** + * This example uses the label propagation algorithm to detect communities by + * propagating labels. Initially, each vertex is assigned its id as its label. + * The vertices iteratively propagate their labels to their neighbors and adopt + * the most frequent label among their neighbors. The algorithm converges when + * no vertex changes value or the maximum number of iterations have been + * reached. + * + * The edges input file is expected to contain one edge per line, with long IDs + * in the following format:"<sourceVertexID>\t<targetVertexID>". + * + * The vertices input file is expected to contain one vertex per line, with long IDs + * and long vertex values, in the following format:"<vertexID>\t<vertexValue>". + * + * If no arguments are provided, the example runs with a random graph of 100 vertices. + */ +public class LabelPropagation implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // Set up the execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the graph + DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env); + DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env); + + Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env); + + // Set up the program + DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run( + new LabelPropagationAlgorithm<Long>(maxIterations)).getVertices(); + + // Emit results + if(fileOutput) { + verticesWithCommunity.writeAsCsv(outputPath, "\n", ","); + } else { + verticesWithCommunity.print(); + } + + // Execute the program + env.execute("Label Propagation Example"); + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String vertexInputPath = null; + private static String edgeInputPath = null; + private static String outputPath = null; + private static long numVertices = 100; + private static int maxIterations = 10; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>"); + return false; + } + + fileOutput = true; + vertexInputPath = args[0]; + edgeInputPath = args[1]; + outputPath = args[2]; + maxIterations = Integer.parseInt(args[3]); + } else { + System.out.println("Executing LabelPropagation example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>"); + } + return true; + } + + @SuppressWarnings("serial") + private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(vertexInputPath) + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new Tuple2ToVertexMap<Long, Long>()); + } + + return env.generateSequence(1, numVertices).map( + new MapFunction<Long, Vertex<Long, Long>>() { + public Vertex<Long, Long> map(Long l) throws Exception { + return new Vertex<Long, Long>(l, l); + } + }); + } + + @SuppressWarnings("serial") + private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { + @Override + public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { + return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); + } + }); + } + + return env.generateSequence(1, numVertices).flatMap( + new FlatMapFunction<Long, Edge<Long, NullValue>>() { + @Override + public void flatMap(Long key, + Collector<Edge<Long, NullValue>> out) { + int numOutEdges = (int) (Math.random() * (numVertices / 2)); + for (int i = 0; i < numOutEdges; i++) { + long target = (long) (Math.random() * numVertices) + 1; + out.collect(new Edge<Long, NullValue>(key, target, + NullValue.getInstance())); + } + } + }); + } + + @Override + public String getDescription() { + return "Label Propagation Example"; + } +} \ No newline at end of file