Repository: flink Updated Branches: refs/heads/master 441ebf1ff -> f1dd914de
[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example This closes #968 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1dd914d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1dd914d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1dd914d Branch: refs/heads/master Commit: f1dd914de21313a90c3799438f1318349dd5d6df Parents: 441ebf1 Author: vasia <va...@apache.org> Authored: Fri Jul 31 22:12:18 2015 +0200 Committer: vasia <va...@apache.org> Committed: Fri Aug 7 11:33:21 2015 +0200 ---------------------------------------------------------------------- .../flink/graph/example/MusicProfiles.java | 41 +++++++++++++------- .../graph/test/example/MusicProfilesITCase.java | 2 +- 2 files changed, 27 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index a535216..0fc45bd 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -46,15 +46,17 @@ import org.apache.flink.util.Collector; public class MusicProfiles implements ProgramDescription { /** - * This example demonstrates how to mix the "record" Flink API with the - * graph API. The input is a set <userId - songId - playCount> triplets and - * a set of bad records,i.e. song ids that should not be trusted. Initially, - * we use the record API to filter out the bad records. Then, we use the - * graph API to create a user -> song weighted bipartite graph and compute - * the top song (most listened) per user. Then, we use the record API again, - * to create a user-user similarity graph, based on common songs, where two - * users that listen to the same song are connected. Finally, we use the - * graph API to run the label propagation community detection algorithm on + * This example demonstrates how to mix the DataSet Flink API with the Gelly API. + * The input is a set <userId - songId - playCount> triplets and + * a set of bad records, i.e. song ids that should not be trusted. + * Initially, we use the DataSet API to filter out the bad records. + * Then, we use Gelly to create a user -> song weighted bipartite graph and compute + * the top song (most listened) per user. + * Then, we use the DataSet API again, to create a user-user similarity graph, + * based on common songs, where users that are listeners of the same song + * are connected. A user-defined threshold on the playcount value + * defines when a user is considered to be a listener of a song. + * Finally, we use the graph API to run the label propagation community detection algorithm on * the similarity graph. * * The triplets input is expected to be given as one triplet per line, @@ -116,7 +118,13 @@ public class MusicProfiles implements ProgramDescription { * create an edge between each pair of its in-neighbors. */ DataSet<Edge<String, NullValue>> similarUsers = userSongGraph - .getEdges().groupBy(1) + .getEdges() + // filter out user-song edges that are below the playcount threshold + .filter(new FilterFunction<Edge<String, Integer>>() { + public boolean filter(Edge<String, Integer> edge) { + return (edge.getValue() > playcountThreshold); + } + }).groupBy(1) .reduceGroup(new CreateSimilarUserEdges()).distinct(); Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers, @@ -241,6 +249,8 @@ public class MusicProfiles implements ProgramDescription { private static String topTracksOutputPath = null; + private static int playcountThreshold = 0; + private static String communitiesOutputPath = null; private static int maxIterations = 10; @@ -248,10 +258,10 @@ public class MusicProfiles implements ProgramDescription { private static boolean parseParameters(String[] args) { if(args.length > 0) { - if(args.length != 5) { + if(args.length != 6) { System.err.println("Usage: MusicProfiles <input user song triplets path>" + " <input song mismatches path> <output top tracks path> " - + "<output communities path> <num iterations>"); + + "<playcount threshold> <output communities path> <num iterations>"); return false; } @@ -259,15 +269,16 @@ public class MusicProfiles implements ProgramDescription { userSongTripletsInputPath = args[0]; mismatchesInputPath = args[1]; topTracksOutputPath = args[2]; - communitiesOutputPath = args[3]; - maxIterations = Integer.parseInt(args[4]); + playcountThreshold = Integer.parseInt(args[3]); + communitiesOutputPath = args[4]; + maxIterations = Integer.parseInt(args[5]); } else { System.out.println("Executing Music Profiles example with default parameters and built-in default data."); System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); System.out.println("Usage: MusicProfiles <input user song triplets path>" + " <input song mismatches path> <output top tracks path> " - + "<output communities path> <num iterations>"); + + "<playcount threshold> <output communities path> <num iterations>"); } return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java index 0410d41..5aa9f26 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java @@ -73,7 +73,7 @@ public class MusicProfilesITCase extends MultipleProgramsTestBase { @Test public void testMusicProfilesExample() throws Exception { - MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath, + MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath, MusicProfilesData.MAX_ITERATIONS + ""}); expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT; }