[posts] Fix Gelly post markup This closes #7.
Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/d571b040 Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/d571b040 Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/d571b040 Branch: refs/heads/asf-site Commit: d571b0407aa5e79796128ba8488798749ff2fe47 Parents: 544db09 Author: Ufuk Celebi <u...@apache.org> Authored: Tue Aug 25 15:03:54 2015 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Tue Aug 25 15:30:10 2015 +0200 ---------------------------------------------------------------------- _posts/2015-08-24-introducing-flink-gelly.md | 175 +++++++++++++--------- 1 file changed, 101 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink-web/blob/d571b040/_posts/2015-08-24-introducing-flink-gelly.md ---------------------------------------------------------------------- diff --git a/_posts/2015-08-24-introducing-flink-gelly.md b/_posts/2015-08-24-introducing-flink-gelly.md index 63b0cd5..fe32a61 100644 --- a/_posts/2015-08-24-introducing-flink-gelly.md +++ b/_posts/2015-08-24-introducing-flink-gelly.md @@ -1,26 +1,34 @@ --- layout: post -title: "Introducing Flink Gelly" +title: "Introducing Gelly: Graph Processing with Apache Flink" date: 2015-08-24 categories: news --- -This blog post introduces Gelly, Flinkâs graph-processing API. Due to its native iteration -support, among other features, Apache Flink is a suitable platform for large-scale graph analytics. -By leveraging delta iterations, Gelly is able to map various graph processing models, such as -vertex-centric or gather-sum-apply, to Flink dataflows. +This blog post introduces **Gelly**, Apache Flink's *graph-processing API and library*. Flink's native support +for iterations makes it a suitable platform for large-scale graph analytics. +By leveraging delta iterations, Gelly is able to map various graph processing models such as +vertex-centric or gather-sum-apply to Flink dataflows. -Gelly allows Flink users to perform end-to-end data analysis, without having to build complex -pipelines and combine different systems. Gelly can be seamlessly used with Flink's DataSet API, -which means that pre-processing, graph creation, graph analysis and post-processing can be done -in the same application. At the end of this post, we will go through a step-by-step example, -in order to demonstrate that loading, transformation, filtering, graph creation and analysis, -can be performed with a single Flink program. +Gelly allows Flink users to perform end-to-end data analysis in a single system. +Gelly can be seamlessly used with Flink's DataSet API, +which means that pre-processing, graph creation, analysis, and post-processing can be done +in the same application. At the end of this post, we will go through a step-by-step example +in order to demonstrate that loading, transformation, filtering, graph creation, and analysis +can be performed in a single Flink program. -<a href="#top"></a> +**Overview** -[Back to top](#top) +1. [What is Gelly?](#what-is-gelly) +2. [Graph Representation and Creation](#graph-representation-and-creation) +3. [Transformations and Utilities](#transformations-and-utilities) +4. [Iterative Graph Processing](#iterative-graph-processing) +5. [Library of Graph Algorithms](#library-of-graph-algorithms) +6. [Use-Case: Music Profiles](#use-case-music-profiles) +7. [Ongoing and Future Work](#ongoing-and-future-work) + +<a href="#top"></a> ## What is Gelly? @@ -39,13 +47,14 @@ processing and introduces a library of graph algorithms. In Gelly, a graph is represented by a DataSet of vertices and a DataSet of edges. A vertex is defined by its unique ID and a value, whereas an edge is defined by its source ID, -target ID and value. A vertex or edge for which a value is not specified will simply have the +target ID, and value. A vertex or edge for which a value is not specified will simply have the value type set to `NullValue`. -A graph can, therefore, be created from: -1. a DataSet of edges and an optional DataSet of vertices using `Graph.fromDataSet()` -2. a DataSet of Tuple3 and an optional DataSet of Tuple2 using `Graph.fromTupleDataSet()` -3. a Collection of edges and an optional Collection of vertices using `Graph.fromCollection()` +A graph can be created from: + +1. **DataSet of edges** and an optional **DataSet of vertices** using `Graph.fromDataSet()` +2. **DataSet of Tuple3** and an optional **DataSet of Tuple2** using `Graph.fromTupleDataSet()` +3. **Collection of edges** and an optional **Collection of vertices** using `Graph.fromCollection()` In all three cases, if the vertices are not provided, Gelly will automatically produce the vertex IDs from the edge source and target IDs. @@ -66,12 +75,15 @@ The transformation methods enable several Graph operations, using high-level fun the ones provided by the batch processing API. These transformations can be applied one after the other, yielding a new Graph after each step, in a fashion similar to operators on DataSets: -`inputGraph.getUndirected().mapEdges(new CustomEdgeMapper());` +```java +inputGraph.getUndirected().mapEdges(new CustomEdgeMapper()); +``` + +Transformations can be applied on: -Transformations can be applied: -1. on vertices: `mapVertices`, `joinWithVertices`, `filterOnVertices`, `addVertex`, ... -2. on edges: `mapEdges`, `filterOnEdges`, `removeEdge`, ... -3. on triplets (source vertex, target vertex, edge): `getTriplets` +1. **Vertices**: `mapVertices`, `joinWithVertices`, `filterOnVertices`, `addVertex`, ... +2. **Edges**: `mapEdges`, `filterOnEdges`, `removeEdge`, ... +3. **Triplets** (source vertex, target vertex, edge): `getTriplets` #### Neighborhood Aggregations @@ -82,23 +94,26 @@ This provides a vertex-centric view, where each vertex can access its neighborin i.e. the edge value and the vertex ID of the edge endpoint. In order to also access the neighboring verticesâ values, one should call the `reduceOnNeighbors()` function. The scope of the neighborhood is defined by the EdgeDirection parameter, which can be IN, OUT or ALL, -to gather in-coming, out-going or all edges (neighbors) of a vertex. The two neighborhood +to gather in-coming, out-going or all edges (neighbors) of a vertex. + +The two neighborhood functions mentioned above can only be used when the aggregation function is associative and commutative. In case the function does not comply with these restrictions or if it is desirable to return zero, one or more values per vertex, the more general `groupReduceOnEdges()` and `groupReduceOnNeighbors()` functions must be called. -Consider, the following graph, for instance. +Consider the following graph, for instance: <center> <img src="{{ site.baseurl }}/img/blog/neighborhood.png" style="width:60%;margin:15px"> </center> -Assume you would want to compute the sum of the values of all incoming neighbors, for each vertex. -Since the sum is an associative and commutative operation and since the neighborsâ values are needed, -we will call the `reduceOnNeighbors()` aggregation method: +Assume you would want to compute the sum of the values of all incoming neighbors for each vertex. +We will call the `reduceOnNeighbors()` aggregation method since the sum is an associative and commutative operation and the neighborsâ values are needed: -`graph.reduceOnNeighbors(new SumValues(), EdgeDirection.IN);` +```java +graph.reduceOnNeighbors(new SumValues(), EdgeDirection.IN); +``` The vertex with id 1 is the only node that has no incoming edges. The result is therefore: @@ -155,8 +170,8 @@ it to propagate its old distance to its neighbors; as they have already taken it Flinkâs `IterateDelta` operator permits exploitation of this property as well as the execution of computations solely on the active parts of the graph. The operator receives two inputs: -1. the ** Solution Set **, which represents the current state of the input and -2. the ** Workset**, , which determines which parts of the graph will be recomputed in the next iteration. +1. the **Solution Set**, which represents the current state of the input and +2. the **Workset**, which determines which parts of the graph will be recomputed in the next iteration. In the SSSP example above, the Workset contains the vertices which update their distances. The user-defined iterative function is applied on these inputs to produce state updates. @@ -196,9 +211,9 @@ vertex values do not need to be recomputed during an iteration. Let us reconsider the Single Source Shortest Paths algorithm. In each iteration, a vertex: -1. *[Gather]* retrieves distances from its neighbors summed up with the corresponding edge values; -2. *[Sum]* compares the newly obtained distances in order to extract the minimum; -3. *[Apply]* and finally adopts the minimum distance computed in the sum step, +1. **Gather** retrieves distances from its neighbors summed up with the corresponding edge values; +2. **Sum** compares the newly obtained distances in order to extract the minimum; +3. **Apply** and finally adopts the minimum distance computed in the sum step, provided that it is lower than its current value. If a vertexâs value does not change during an iteration, it no longer propagates its distance. @@ -230,7 +245,7 @@ We currently have implementations of the following algorithms: 1. PageRank 2. Single-Source-Shortest-Paths 3. Label Propagation -4. Community Detection (based on [this paper](http://arxiv.org/pdf/0808.2633.pdf) ) +4. Community Detection (based on [this paper](http://arxiv.org/pdf/0808.2633.pdf)) 5. Connected Components 6. GSA Connected Components 7. GSA PageRank @@ -243,6 +258,7 @@ as well as computation of common graph metrics. [Back to top](#top) ## Use-Case: Music Profiles + In the following section, we go through a use-case scenario that combines the Flink DataSet API with Gelly in order to process usersâ music preferences to suggest additions to their playlist. @@ -254,31 +270,35 @@ on the common songs and use this resulting graph to detect communities by callin library method. For running the example implementation, please use the 0.10-SNAPSHOT version of Flink as a -dependency. The full example code base can be found [here](https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java); The public data set used for testing +dependency. The full example code base can be found [here](https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java). The public data set used for testing can be found [here](http://labrosa.ee.columbia.edu/millionsong/tasteprofile). This data set contains **48,373,586** real user-id, song-id and play-count triplets. +**Note:** The code snippets in this post try to reduce verbosity by skipping type parameters of generic functions. Please have a look at [the full example](https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java) for the correct and complete code. + #### Filtering out Bad Records -After reading the (user-id, song-id, play-count) triplets from a CSV file and after parsing a +After reading the `(user-id, song-id, play-count)` triplets from a CSV file and after parsing a text file in order to retrieve the list of songs that a user would not want to include in a playlist, we use a coGroup function to filter out the mismatches. ```java // read the user-song-play triplets. DataSet<Tuple3<String, String, Integer>> triplets = - getUserSongTripletsData(env); + getUserSongTripletsData(env); // read the mismatches dataset and extract the songIDs DataSet<Tuple3<String, String, Integer>> validTriplets = triplets - .coGroup(mismatches).where(1).equalTo(0) - .with(new CoGroupFunction { - void coGroup(Iterable triplets, Iterable invalidSongs, Collector out) { - if (!invalidSongs.iterator().hasNext()) - for (Tuple3 triplet : triplets) // valid triplet + .coGroup(mismatches).where(1).equalTo(0) + .with(new CoGroupFunction() { + void coGroup(Iterable triplets, Iterable invalidSongs, Collector out) { + if (!invalidSongs.iterator().hasNext()) { + for (Tuple3 triplet : triplets) { // valid triplet out.collect(triplet); - } - } + } + } + } + } ``` The coGroup simply takes the triplets whose song-id (second field) matches the song-id from the @@ -312,7 +332,7 @@ basically iterate through the edge value and collect the target (song) of the ma ```java //get the top track (most listened to) for each user DataSet<Tuple2> usersWithTopTrack = userSongGraph - .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT); + .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT); class GetTopSongPerUser implements EdgesFunctionWithVertexValue { void iterateEdges(Vertex vertex, Iterable<Edge> edges) { @@ -330,7 +350,7 @@ class GetTopSongPerUser implements EdgesFunctionWithVertexValue { } ``` -#### Creating a user-user Similarity Graph +#### Creating a User-User Similarity Graph Clustering users based on common interests, in this case, common top songs, could prove to be very useful for advertisements or for recommending new musical compilations. In a user-user graph, @@ -356,22 +376,27 @@ straightforward as a call to the `Graph.fromDataSet()` method. // create a user-user similarity graph: // two users that listen to the same song are connected DataSet<Edge> similarUsers = userSongGraph.getEdges() -// filter out user-song edges that are below the playcount threshold - .filter(new FilterFunction<Edge<String, Integer>>() { - public boolean filter(Edge<String, Integer> edge) { - return (edge.getValue() > playcountThreshold); + // filter out user-song edges that are below the playcount threshold + .filter(new FilterFunction<Edge<String, Integer>>() { + public boolean filter(Edge<String, Integer> edge) { + return (edge.getValue() > playcountThreshold); + } + }) + .groupBy(1) + .reduceGroup(new GroupReduceFunction() { + void reduce(Iterable<Edge> edges, Collector<Edge> out) { + List users = new ArrayList(); + for (Edge edge : edges) { + users.add(edge.getSource()); + for (int i = 0; i < users.size() - 1; i++) { + for (int j = i+1; j < users.size() - 1; j++) { + out.collect(new Edge(users.get(i), users.get(j))); } - }).groupBy(1) - .reduceGroup(new GroupReduceFunction() { - void reduce(Iterable<Edge> edges, Collector<Edge> out) { - List users = new ArrayList(); - for (Edge edge : edges) - users.add(edge.getSource()); - for (int i = 0; i < users.size() - 1; i++) - for (int j = i+1; j < users.size() - 1; j++) - out.collect(new Edge(users.get(i), users.get(j))); - } - }).distinct(); + } + } + } + }) + .distinct(); Graph similarUsersGraph = Graph.fromDataSet(similarUsers).getUndirected(); ``` @@ -388,21 +413,23 @@ among their neighbors. // detect user communities using label propagation // initialize each vertex with a unique numeric label DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils - .zipWithUniqueId(similarUsersGraph.getVertexIds()) - .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() { - @Override - public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception { - return new Tuple2<String, Long>(tuple2.f1, tuple2.f0); - } - }); + .zipWithUniqueId(similarUsersGraph.getVertexIds()) + .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() { + @Override + public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception { + return new Tuple2<String, Long>(tuple2.f1, tuple2.f0); + } + }); // update the vertex values and run the label propagation algorithm DataSet<Vertex> verticesWithCommunity = similarUsersGraph - .joinWithVertices(idsWithlLabels, new MapFunction() { - public Long map(Tuple2 idWithLabel) { - return idWithLabel.f1; - } - }).run(new LabelPropagation(numIterations)).getVertices(); + .joinWithVertices(idsWithlLabels, new MapFunction() { + public Long map(Tuple2 idWithLabel) { + return idWithLabel.f1; + } + }) + .run(new LabelPropagation(numIterations)) + .getVertices(); ``` [Back to top](#top)