[FLINK-1201] [gelly] added label propagation step in MusicProfiles

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32d9d2b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32d9d2b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32d9d2b1

Branch: refs/heads/master
Commit: 32d9d2b1342df40051257dc64f534b0a7da340ee
Parents: 0393bc1
Author: vasia <vasilikikala...@gmail.com>
Authored: Mon Jan 5 20:58:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:15 2015 +0100

----------------------------------------------------------------------
 .../flink/graph/example/MusicProfiles.java      | 66 +++++++++++++++++---
 .../graph/example/utils/MusicProfilesData.java  | 65 +++++++++++++++++++
 2 files changed, 122 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32d9d2b1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index d74e339..02f6554 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -2,9 +2,11 @@ package flink.graphs.example;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -20,26 +22,41 @@ import flink.graphs.EdgeDirection;
 import flink.graphs.EdgesFunctionWithVertexValue;
 import flink.graphs.Graph;
 import flink.graphs.Vertex;
+import flink.graphs.example.utils.MusicProfilesData;
+import flink.graphs.library.LabelPropagation;
 import flink.graphs.utils.Tuple3ToEdgeMap;
 
 public class MusicProfiles implements ProgramDescription {
 
+       /**
+        * This example demonstrates how to mix the "record" Flink API with the 
graph API.
+        * The input is a set <userId - songId - playCount> triplets and a set 
of
+        * bad records,i.e. song ids that should not be trusted.
+        * Initially, we use the record API to filter out the bad records.
+        * Then, we use the graph API to create a user -> song weighted 
bipartite graph
+        * and compute the top song (most listened) per user.
+        * Then, we use the record API again, to create a user-user similarity 
graph, 
+        * based on common songs, where two users that listen to the same song 
are connected.
+        * Finally, we use the graph API to run the label propagation community 
detection algorithm
+        * on the similarity graph.
+        */
        public static void main (String [] args) throws Exception {
        
        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+       final long numberOfLabels = 3;
+       final int numIterations = 10;
 
        /** 
         *  Read the user-song-play triplets
         *  The format is <userID>\t<songID>\t<playcount>
         */
-       DataSet<Tuple3<String, String, Integer>> triplets = 
env.readCsvFile(args[0])
-                       
.lineDelimiter("\n").fieldDelimiter('\t').types(String.class, String.class, 
Integer.class);
-       
+       DataSet<Tuple3<String, String, Integer>> triplets = 
MusicProfilesData.getUserSongTriplets(env);
+
        /**
         *  Read the mismatches dataset and extract the songIDs
         *  The format is "ERROR: <songID trackID> song_title"
         */
-       DataSet<Tuple1<String>> mismatches = env.readTextFile(args[1]).map(new 
ExtractMismatchSongIds());
+       DataSet<Tuple1<String>> mismatches = 
MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds());
 
        /**
         *  Filter out the mismatches from the triplets dataset
@@ -58,7 +75,8 @@ public class MusicProfiles implements ProgramDescription {
         *  Get the top track (most listened) for each user
         */
        DataSet<Tuple2<String, String>> usersWithTopTrack = 
userSongGraph.reduceOnEdges(new GetTopSongPerUser(), 
-                       EdgeDirection.OUT);
+                       EdgeDirection.OUT).filter(new FilterSongNodes());
+       usersWithTopTrack.print();
 
        /**
         * Create a user-user similarity graph, based on common songs, 
@@ -69,6 +87,15 @@ public class MusicProfiles implements ProgramDescription {
                        .reduceGroup(new CreateSimilarUserEdges()).distinct();
        Graph<String, NullValue, NullValue> similarUsersGraph = 
Graph.create(similarUsers, env).getUndirected();
 
+       /**
+        * Detect user communities using the label propagation library method
+        */
+       DataSet<Vertex<String, Long>> verticesWithCommunity = 
similarUsersGraph.mapVertices(
+                       new InitVertexLabels(numberOfLabels))
+                       .run(new 
LabelPropagation<String>(numIterations)).getVertices();
+       verticesWithCommunity.print();
+
+       env.execute();
     }
 
     @SuppressWarnings("serial")
@@ -89,12 +116,21 @@ public class MusicProfiles implements ProgramDescription {
                                Collector<Tuple3<String, String, Integer>> out) 
{
                        if (!invalidSongs.iterator().hasNext()) {
                                // this is a valid triplet
-                               out.collect(triplets.iterator().next());
+                               for (Tuple3<String, String, Integer> triplet : 
triplets) {
+                                       out.collect(triplet);                   
                
+                               }
                        }
                }
     }
 
     @SuppressWarnings("serial")
+       public static final class FilterSongNodes implements 
FilterFunction<Tuple2<String, String>> {
+               public boolean filter(Tuple2<String, String> value) throws 
Exception {
+                       return !value.f1.equals("");
+               }
+    }
+
+    @SuppressWarnings("serial")
        public static final class GetTopSongPerUser implements 
EdgesFunctionWithVertexValue
                <String, NullValue, Integer, Tuple2<String, String>> {
                public Tuple2<String, String> iterateEdges(Vertex<String, 
NullValue> vertex,    
@@ -120,14 +156,26 @@ public class MusicProfiles implements ProgramDescription {
                                listeners.add(edge.getSource());
                        }
                        for (int i=0; i < listeners.size()-1; i++) {
-                               out.collect(new Edge<String, 
NullValue>(listeners.get(i), listeners.get(i+1)));
+                               out.collect(new Edge<String, 
NullValue>(listeners.get(i), listeners.get(i+1),
+                                               NullValue.getInstance()));
                        }
                }
     }
+
+    @SuppressWarnings("serial")
+       public static final class InitVertexLabels implements 
MapFunction<Vertex<String, NullValue>, Long> {
+       private long numberOfLabels;
+       public InitVertexLabels(long labels) {
+               this.numberOfLabels = labels;
+       }
+               public Long map(Vertex<String, NullValue> value) {
+                       Random randomGenerator = new Random();
+                       return (long) randomGenerator.nextInt((int) 
numberOfLabels);
+               }
+    }
        
        @Override
        public String getDescription() {
-               return null;
+               return "Music Profiles Example";
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/32d9d2b1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
new file mode 100644
index 0000000..cfe9c88
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
@@ -0,0 +1,65 @@
+package flink.graphs.example.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+public class MusicProfilesData {
+
+       public static DataSet<Tuple3<String, String, Integer>> 
getUserSongTriplets(ExecutionEnvironment env) {
+               List<Tuple3<String, String, Integer>> triplets = new 
ArrayList<Tuple3<String, String, Integer>>();
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_1", 100));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_2", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_3", 20));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_4", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_5", 1));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_6", 40));
+               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_7", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_8", 3));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_1", 100));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_2", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_3", 20));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_8", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_9", 1));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_10", 8));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_11", 90));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_12", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_13", 34));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_14", 17));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_1", 100));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_6", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_8", 20));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_12", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_13", 1));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_15", 1));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_3", 300));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_4", 4));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_5", 5));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_8", 8));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_9", 9));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_10", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_12", 12));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_13", 13));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_15", 15));
+
+               triplets.add(new Tuple3<String, String, Integer>("user_6", 
"song_6", 30));
+
+               return env.fromCollection(triplets);
+       }
+       
+       public static DataSet<String> getMismatches(ExecutionEnvironment env) {
+               List<String> errors = new ArrayList<String>();
+               errors.add("ERROR: <song_8 track_8> Sever");
+               errors.add("ERROR: <song_15 track_15> Black Trees");
+               return env.fromCollection(errors);
+       }
+}
+

Reply via email to