[FLINK-1522][FLINK-1576] Updated LabelPropagationExample and test

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e306c629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e306c629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e306c629

Branch: refs/heads/master
Commit: e306c62968b94ed5124405c9ff8c02147cdba250
Parents: 8961bd1
Author: balidani <[email protected]>
Authored: Fri Feb 20 11:30:57 2015 +0100
Committer: Vasia Kalavri <[email protected]>
Committed: Wed Mar 4 21:04:06 2015 +0100

----------------------------------------------------------------------
 flink-staging/flink-gelly/pom.xml               | 60 ++++++------
 .../graph/example/LabelPropagationExample.java  | 87 ++++++++++++++++--
 .../test/LabelPropagationExampleITCase.java     | 97 ++++++++++++++++++++
 3 files changed, 207 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml 
b/flink-staging/flink-gelly/pom.xml
old mode 100644
new mode 100755
index 8e00f98..13ed002
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -53,34 +53,34 @@ under the License.
                </dependency>
        </dependencies>
 
-       <!-- See main pom.xml for explanation of profiles -->
-       <profiles>
-               <profile>
-                       <id>hadoop-1</id>
-                       <activation>
-                               <property>
-                                       <!-- Please do not remove the 'hadoop1' 
comment. See ./tools/generate_specific_pom.sh -->
-                                       
<!--hadoop1--><name>hadoop.profile</name><value>1</value>
-                               </property>
-                       </activation>
-                       <dependencies>
-                               <!-- Add this here, for hadoop-2 we don't need 
it since we get guava transitively -->
-                               <dependency>
-                                       <groupId>com.google.guava</groupId>
-                                       <artifactId>guava</artifactId>
-                                       <version>${guava.version}</version>
-                                       <scope>provided</scope>
-                               </dependency>
-                       </dependencies>
-               </profile>
-               <profile>
-                       <id>hadoop-2</id>
-                       <activation>
-                               <property>
-                                       <!-- Please do not remove the 'hadoop2' 
comment. See ./tools/generate_specific_pom.sh -->
-                                       
<!--hadoop2--><name>!hadoop.profile</name>
-                               </property>
-                       </activation>
-               </profile>
-       </profiles>
+    <!-- See main pom.xml for explanation of profiles -->
+    <profiles>
+        <profile>
+            <id>hadoop-1</id>
+            <activation>
+                <property>
+                    <!-- Please do not remove the 'hadoop1' comment. See 
./tools/generate_specific_pom.sh -->
+                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
+                </property>
+            </activation>
+            <dependencies>
+                <!-- Add this here, for hadoop-2 we don't need it since we get 
guava transitively -->
+                <dependency>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                    <version>${guava.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <property>
+                    <!-- Please do not remove the 'hadoop2' comment. See 
./tools/generate_specific_pom.sh -->
+                    <!--hadoop2--><name>!hadoop.profile</name>
+                </property>
+            </activation>
+        </profile>
+    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index c490bb3..78cb5d5 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -42,31 +43,84 @@ public class LabelPropagationExample implements 
ProgramDescription {
 
        public static void main(String[] args) throws Exception {
 
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               // Set up the execution environment
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
+               // Set up the graph
                DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
                DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
 
                Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(vertices, edges, env);
 
+               // Set up the program
                DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
                                new 
LabelPropagation<Long>(maxIterations)).getVertices();
 
-               verticesWithCommunity.print();
+               // Emit results
+               if(fileOutput) {
+                       verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
+               } else {
+                       verticesWithCommunity.print();
+               }
 
-               env.execute();
+               // Execute the program
+               env.execute("Label Propagation Example");
        }
 
-       @Override
-       public String getDescription() {
-               return "Label Propagation Example";
-       }
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
 
+       private static boolean fileOutput = false;
+       private static String vertexInputPath = null;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
        private static long numVertices = 100;
-       private static int maxIterations = 20;
+       private static int maxIterations = 10;
+
+       private static boolean parseParameters(String[] args) {
+
+               if(args.length > 0) {
+                       if(args.length != 5) {
+                               System.err.println("Usage: LabelPropagation 
<vertex path> <edge path> <output path> <num vertices> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       vertexInputPath = args[0];
+                       edgeInputPath = args[1];
+                       outputPath = args[2];
+                       numVertices = Integer.parseInt(args[3]);
+                       maxIterations = Integer.parseInt(args[4]);
+               } else {
+                       System.out.println("Executing LabelPropagation example 
with default parameters and built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("  Usage: LabelPropagation <vertex 
path> <edge path> <output path> <num vertices> <num iterations>");
+               }
+               return true;
+       }
 
        @SuppressWarnings("serial")
        private static DataSet<Vertex<Long, Long>> 
getVertexDataSet(ExecutionEnvironment env) {
+
+               if (fileOutput) {
+                       return env.readCsvFile(vertexInputPath)
+                                       .fieldDelimiter(" ")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Vertex<Long, Long>>() {
+                                               @Override
+                                               public Vertex<Long, Long> 
map(Tuple2<Long, Long> value) throws Exception {
+                                                       return new Vertex<Long, 
Long>(value.f0, value.f1);
+                                               }
+                                       });
+               }
+
                return env.generateSequence(1, numVertices).map(
                                new MapFunction<Long, Vertex<Long, Long>>() {
                                        public Vertex<Long, Long> map(Long l) 
throws Exception {
@@ -77,6 +131,20 @@ public class LabelPropagationExample implements 
ProgramDescription {
 
        @SuppressWarnings("serial")
        private static DataSet<Edge<Long, NullValue>> 
getEdgeDataSet(ExecutionEnvironment env) {
+
+               if (fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .fieldDelimiter(" ")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
+                                               @Override
+                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
+                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+                                               }
+                                       });
+               }
+
                return env.generateSequence(1, numVertices).flatMap(
                                new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
                                        @Override
@@ -91,4 +159,9 @@ public class LabelPropagationExample implements 
ProgramDescription {
                                        }
                                });
        }
+
+       @Override
+       public String getDescription() {
+               return "Label Propagation Example";
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e306c629/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
new file mode 100755
index 0000000..d5b2239
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.LabelPropagationExample;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
+
+       public LabelPropagationExampleITCase(ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+       private String verticesPath;
+       private String edgesPath;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+
+               final String vertices = "1 1\n" +
+                               "2 2\n" +
+                               "3 3\n" +
+                               "4 4\n" +
+                               "5 5\n";
+
+               final String edges = "1 2\n" +
+                               "1 3\n" +
+                               "2 3\n" +
+                               "3 4\n" +
+                               "3 5\n" +
+                               "4 5\n" +
+                               "5 1\n";
+
+               File verticesFile = tempFolder.newFile();
+               Files.write(vertices, verticesFile, Charsets.UTF_8);
+
+               File edgesFile = tempFolder.newFile();
+               Files.write(edges, edgesFile, Charsets.UTF_8);
+
+               verticesPath = verticesFile.toURI().toString();
+               edgesPath = edgesFile.toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testLabelPropagation() throws Exception {
+               /*
+                * Test the label propagation example
+                */
+               LabelPropagationExample.main(new String[] {verticesPath, 
edgesPath, resultPath, "5", "16"});
+
+               expectedResult = "1,5\n" +
+                       "2,5\n" +
+                       "3,5\n" +
+                       "4,5\n" +
+                       "5,5\n";
+       }
+}

Reply via email to