Repository: flink
Updated Branches:
  refs/heads/master 441ebf1ff -> f1dd914de


[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example

This closes #968


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1dd914d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1dd914d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1dd914d

Branch: refs/heads/master
Commit: f1dd914de21313a90c3799438f1318349dd5d6df
Parents: 441ebf1
Author: vasia <va...@apache.org>
Authored: Fri Jul 31 22:12:18 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Aug 7 11:33:21 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/MusicProfiles.java      | 41 +++++++++++++-------
 .../graph/test/example/MusicProfilesITCase.java |  2 +-
 2 files changed, 27 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index a535216..0fc45bd 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -46,15 +46,17 @@ import org.apache.flink.util.Collector;
 public class MusicProfiles implements ProgramDescription {
 
        /**
-        * This example demonstrates how to mix the "record" Flink API with the
-        * graph API. The input is a set <userId - songId - playCount> triplets 
and
-        * a set of bad records,i.e. song ids that should not be trusted. 
Initially,
-        * we use the record API to filter out the bad records. Then, we use the
-        * graph API to create a user -> song weighted bipartite graph and 
compute
-        * the top song (most listened) per user. Then, we use the record API 
again,
-        * to create a user-user similarity graph, based on common songs, where 
two
-        * users that listen to the same song are connected. Finally, we use the
-        * graph API to run the label propagation community detection algorithm 
on
+        * This example demonstrates how to mix the DataSet Flink API with the 
Gelly API.
+        * The input is a set <userId - songId - playCount> triplets and
+        * a set of bad records, i.e. song ids that should not be trusted.
+        * Initially, we use the DataSet API to filter out the bad records.
+        * Then, we use Gelly to create a user -> song weighted bipartite graph 
and compute
+        * the top song (most listened) per user.
+        * Then, we use the DataSet API again, to create a user-user similarity 
graph,
+        * based on common songs, where users that are listeners of the same 
song
+        * are connected. A user-defined threshold on the playcount value
+        * defines when a user is considered to be a listener of a song.
+        * Finally, we use the graph API to run the label propagation community 
detection algorithm on
         * the similarity graph.
         *
         * The triplets input is expected to be given as one triplet per line,
@@ -116,7 +118,13 @@ public class MusicProfiles implements ProgramDescription {
                 * create an edge between each pair of its in-neighbors.
                 */
                DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
-                               .getEdges().groupBy(1)
+                               .getEdges()
+                               // filter out user-song edges that are below 
the playcount threshold
+                               .filter(new FilterFunction<Edge<String, 
Integer>>() {
+                                       public boolean filter(Edge<String, 
Integer> edge) {
+                                               return (edge.getValue() > 
playcountThreshold);
+                                       }
+                               }).groupBy(1)
                                .reduceGroup(new 
CreateSimilarUserEdges()).distinct();
 
                Graph<String, Long, NullValue> similarUsersGraph = 
Graph.fromDataSet(similarUsers,
@@ -241,6 +249,8 @@ public class MusicProfiles implements ProgramDescription {
 
        private static String topTracksOutputPath = null;
 
+       private static int playcountThreshold = 0;
+
        private static String communitiesOutputPath = null;
 
        private static int maxIterations = 10;
@@ -248,10 +258,10 @@ public class MusicProfiles implements ProgramDescription {
        private static boolean parseParameters(String[] args) {
 
                if(args.length > 0) {
-                       if(args.length != 5) {
+                       if(args.length != 6) {
                                System.err.println("Usage: MusicProfiles <input 
user song triplets path>" +
                                                " <input song mismatches path> 
<output top tracks path> "
-                                               + "<output communities path> 
<num iterations>");
+                                               + "<playcount threshold> 
<output communities path> <num iterations>");
                                return false;
                        }
 
@@ -259,15 +269,16 @@ public class MusicProfiles implements ProgramDescription {
                        userSongTripletsInputPath = args[0];
                        mismatchesInputPath = args[1];
                        topTracksOutputPath = args[2];
-                       communitiesOutputPath = args[3];
-                       maxIterations = Integer.parseInt(args[4]);
+                       playcountThreshold = Integer.parseInt(args[3]);
+                       communitiesOutputPath = args[4];
+                       maxIterations = Integer.parseInt(args[5]);
                } else {
                        System.out.println("Executing Music Profiles example 
with default parameters and built-in default data.");
                        System.out.println("  Provide parameters to read input 
data from files.");
                        System.out.println("  See the documentation for the 
correct format of input files.");
                        System.out.println("Usage: MusicProfiles <input user 
song triplets path>" +
                                        " <input song mismatches path> <output 
top tracks path> "
-                                       + "<output communities path> <num 
iterations>");
+                                       + "<playcount threshold> <output 
communities path> <num iterations>");
                }
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
index 0410d41..5aa9f26 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
@@ -73,7 +73,7 @@ public class MusicProfilesITCase extends 
MultipleProgramsTestBase {
 
        @Test
        public void testMusicProfilesExample() throws Exception {
-               MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, 
topSongsResultPath, communitiesResultPath,
+               MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, 
topSongsResultPath, "0", communitiesResultPath,
                                MusicProfilesData.MAX_ITERATIONS + ""});
                expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
        }

Reply via email to