[FLINK-1522][FLINK-1576] Updated LabelPropagationExample and test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e306c629 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e306c629 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e306c629 Branch: refs/heads/master Commit: e306c62968b94ed5124405c9ff8c02147cdba250 Parents: 8961bd1 Author: balidani <[email protected]> Authored: Fri Feb 20 11:30:57 2015 +0100 Committer: Vasia Kalavri <[email protected]> Committed: Wed Mar 4 21:04:06 2015 +0100 ---------------------------------------------------------------------- flink-staging/flink-gelly/pom.xml | 60 ++++++------ .../graph/example/LabelPropagationExample.java | 87 ++++++++++++++++-- .../test/LabelPropagationExampleITCase.java | 97 ++++++++++++++++++++ 3 files changed, 207 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml old mode 100644 new mode 100755 index 8e00f98..13ed002 --- a/flink-staging/flink-gelly/pom.xml +++ b/flink-staging/flink-gelly/pom.xml @@ -53,34 +53,34 @@ under the License. </dependency> </dependencies> - <!-- See main pom.xml for explanation of profiles --> - <profiles> - <profile> - <id>hadoop-1</id> - <activation> - <property> - <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh --> - <!--hadoop1--><name>hadoop.profile</name><value>1</value> - </property> - </activation> - <dependencies> - <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively --> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - <scope>provided</scope> - </dependency> - </dependencies> - </profile> - <profile> - <id>hadoop-2</id> - <activation> - <property> - <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh --> - <!--hadoop2--><name>!hadoop.profile</name> - </property> - </activation> - </profile> - </profiles> + <!-- See main pom.xml for explanation of profiles --> + <profiles> + <profile> + <id>hadoop-1</id> + <activation> + <property> + <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop1--><name>hadoop.profile</name><value>1</value> + </property> + </activation> + <dependencies> + <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + </profile> + <profile> + <id>hadoop-2</id> + <activation> + <property> + <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop2--><name>!hadoop.profile</name> + </property> + </activation> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java index c490bb3..78cb5d5 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -42,31 +43,84 @@ public class LabelPropagationExample implements ProgramDescription { public static void main(String[] args) throws Exception { + if(!parseParameters(args)) { + return; + } + + // Set up the execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + // Set up the graph DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env); DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env); Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env); + // Set up the program DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run( new LabelPropagation<Long>(maxIterations)).getVertices(); - verticesWithCommunity.print(); + // Emit results + if(fileOutput) { + verticesWithCommunity.writeAsCsv(outputPath, "\n", ","); + } else { + verticesWithCommunity.print(); + } - env.execute(); + // Execute the program + env.execute("Label Propagation Example"); } - @Override - public String getDescription() { - return "Label Propagation Example"; - } + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + private static boolean fileOutput = false; + private static String vertexInputPath = null; + private static String edgeInputPath = null; + private static String outputPath = null; private static long numVertices = 100; - private static int maxIterations = 20; + private static int maxIterations = 10; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 5) { + System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>"); + return false; + } + + fileOutput = true; + vertexInputPath = args[0]; + edgeInputPath = args[1]; + outputPath = args[2]; + numVertices = Integer.parseInt(args[3]); + maxIterations = Integer.parseInt(args[4]); + } else { + System.out.println("Executing LabelPropagation example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>"); + } + return true; + } @SuppressWarnings("serial") private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(vertexInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() { + @Override + public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception { + return new Vertex<Long, Long>(value.f0, value.f1); + } + }); + } + return env.generateSequence(1, numVertices).map( new MapFunction<Long, Vertex<Long, Long>>() { public Vertex<Long, Long> map(Long l) throws Exception { @@ -77,6 +131,20 @@ public class LabelPropagationExample implements ProgramDescription { @SuppressWarnings("serial") private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { + @Override + public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { + return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); + } + }); + } + return env.generateSequence(1, numVertices).flatMap( new FlatMapFunction<Long, Edge<Long, NullValue>>() { @Override @@ -91,4 +159,9 @@ public class LabelPropagationExample implements ProgramDescription { } }); } + + @Override + public String getDescription() { + return "Label Propagation Example"; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java new file mode 100755 index 0000000..d5b2239 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.LabelPropagationExample; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class LabelPropagationExampleITCase extends MultipleProgramsTestBase { + + public LabelPropagationExampleITCase(ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + private String verticesPath; + private String edgesPath; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + + final String vertices = "1 1\n" + + "2 2\n" + + "3 3\n" + + "4 4\n" + + "5 5\n"; + + final String edges = "1 2\n" + + "1 3\n" + + "2 3\n" + + "3 4\n" + + "3 5\n" + + "4 5\n" + + "5 1\n"; + + File verticesFile = tempFolder.newFile(); + Files.write(vertices, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(edges, edgesFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testLabelPropagation() throws Exception { + /* + * Test the label propagation example + */ + LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "5", "16"}); + + expectedResult = "1,5\n" + + "2,5\n" + + "3,5\n" + + "4,5\n" + + "5,5\n"; + } +}
