[posts] Fix Gelly post markup

This closes #7.


Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/d571b040
Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/d571b040
Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/d571b040

Branch: refs/heads/asf-site
Commit: d571b0407aa5e79796128ba8488798749ff2fe47
Parents: 544db09
Author: Ufuk Celebi <u...@apache.org>
Authored: Tue Aug 25 15:03:54 2015 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Aug 25 15:30:10 2015 +0200

----------------------------------------------------------------------
 _posts/2015-08-24-introducing-flink-gelly.md | 175 +++++++++++++---------
 1 file changed, 101 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink-web/blob/d571b040/_posts/2015-08-24-introducing-flink-gelly.md
----------------------------------------------------------------------
diff --git a/_posts/2015-08-24-introducing-flink-gelly.md 
b/_posts/2015-08-24-introducing-flink-gelly.md
index 63b0cd5..fe32a61 100644
--- a/_posts/2015-08-24-introducing-flink-gelly.md
+++ b/_posts/2015-08-24-introducing-flink-gelly.md
@@ -1,26 +1,34 @@
 ---
 layout: post
-title:  "Introducing Flink Gelly"
+title:  "Introducing Gelly: Graph Processing with Apache Flink"
 date:   2015-08-24 
 
 categories: news
 ---
 
-This blog post introduces Gelly, Flink’s graph-processing API. Due to its 
native iteration
-support, among other features, Apache Flink is a suitable platform for 
large-scale graph analytics.
-By leveraging delta iterations, Gelly is able to map various graph processing 
models, such as
-vertex-centric or gather-sum-apply, to Flink dataflows.
+This blog post introduces **Gelly**, Apache Flink's *graph-processing API and 
library*. Flink's native support
+for iterations makes it a suitable platform for large-scale graph analytics.
+By leveraging delta iterations, Gelly is able to map various graph processing 
models such as
+vertex-centric or gather-sum-apply to Flink dataflows.
 
-Gelly allows Flink users to perform end-to-end data analysis, without having 
to build complex
-pipelines and combine different systems. Gelly can be seamlessly used with 
Flink's DataSet API,
-which means that pre-processing, graph creation, graph analysis and 
post-processing can be done
-in the same application. At the end of this post, we will go through a 
step-by-step example,
-in order to demonstrate that loading, transformation, filtering, graph 
creation and analysis,
-can be performed with a single Flink program.
+Gelly allows Flink users to perform end-to-end data analysis in a single 
system.
+Gelly can be seamlessly used with Flink's DataSet API,
+which means that pre-processing, graph creation, analysis, and post-processing 
can be done
+in the same application. At the end of this post, we will go through a 
step-by-step example
+in order to demonstrate that loading, transformation, filtering, graph 
creation, and analysis
+can be performed in a single Flink program.
 
-<a href="#top"></a>
+**Overview**
 
-[Back to top](#top)
+1. [What is Gelly?](#what-is-gelly)
+2. [Graph Representation and Creation](#graph-representation-and-creation)
+3. [Transformations and Utilities](#transformations-and-utilities)
+4. [Iterative Graph Processing](#iterative-graph-processing)
+5. [Library of Graph Algorithms](#library-of-graph-algorithms)
+6. [Use-Case: Music Profiles](#use-case-music-profiles)
+7. [Ongoing and Future Work](#ongoing-and-future-work)
+
+<a href="#top"></a>
 
 ## What is Gelly?
 
@@ -39,13 +47,14 @@ processing and introduces a library of graph algorithms.
 
 In Gelly, a graph is represented by a DataSet of vertices and a DataSet of 
edges.
 A vertex is defined by its unique ID and a value, whereas an edge is defined 
by its source ID,
-target ID and value. A vertex or edge for which a value is not specified will 
simply have the
+target ID, and value. A vertex or edge for which a value is not specified will 
simply have the
 value type set to `NullValue`.
 
-A graph can, therefore, be created from:
-1. a DataSet of edges and an optional DataSet of vertices using 
`Graph.fromDataSet()`  
-2. a DataSet of Tuple3 and an optional DataSet of Tuple2  using 
`Graph.fromTupleDataSet()`  
-3. a Collection of edges and an optional Collection of vertices using 
`Graph.fromCollection()`  
+A graph can be created from:
+
+1. **DataSet of edges** and an optional **DataSet of vertices** using 
`Graph.fromDataSet()`
+2. **DataSet of Tuple3** and an optional **DataSet of Tuple2** using 
`Graph.fromTupleDataSet()`
+3. **Collection of edges** and an optional **Collection of vertices** using 
`Graph.fromCollection()`
 
 In all three cases, if the vertices are not provided,
 Gelly will automatically produce the vertex IDs from the edge source and 
target IDs.
@@ -66,12 +75,15 @@ The transformation methods enable several Graph operations, 
using high-level fun
 the ones provided by the batch processing API. These transformations can be 
applied one after the
 other, yielding a new Graph after each step, in a fashion similar to operators 
on DataSets: 
 
-`inputGraph.getUndirected().mapEdges(new CustomEdgeMapper());`
+```java
+inputGraph.getUndirected().mapEdges(new CustomEdgeMapper());
+```
+
+Transformations can be applied on:
 
-Transformations can be applied:
-1. on vertices: `mapVertices`, `joinWithVertices`, `filterOnVertices`, 
`addVertex`, ...  
-2. on edges: `mapEdges`, `filterOnEdges`, `removeEdge`, ...   
-3. on triplets (source vertex, target vertex, edge): `getTriplets`  
+1. **Vertices**: `mapVertices`, `joinWithVertices`, `filterOnVertices`, 
`addVertex`, ...  
+2. **Edges**: `mapEdges`, `filterOnEdges`, `removeEdge`, ...   
+3. **Triplets** (source vertex, target vertex, edge): `getTriplets`  
 
 #### Neighborhood Aggregations
 
@@ -82,23 +94,26 @@ This provides a vertex-centric view, where each vertex can 
access its neighborin
 i.e. the edge value and the vertex ID of the edge endpoint. In order to also 
access the
 neighboring vertices’ values, one should call the `reduceOnNeighbors()` 
function.
 The scope of the neighborhood is defined by the EdgeDirection parameter, which 
can be IN, OUT or ALL,
-to gather in-coming, out-going or all edges (neighbors) of a vertex. The two 
neighborhood
+to gather in-coming, out-going or all edges (neighbors) of a vertex.
+
+The two neighborhood
 functions mentioned above can only be used when the aggregation function is 
associative and commutative.
 In case the function does not comply with these restrictions or if it is 
desirable to return zero,
 one or more values per vertex, the more general  `groupReduceOnEdges()` and 
 `groupReduceOnNeighbors()` functions must be called.
 
-Consider, the following graph, for instance.
+Consider the following graph, for instance:
 
 <center>
 <img src="{{ site.baseurl }}/img/blog/neighborhood.png" 
style="width:60%;margin:15px">
 </center>
 
-Assume you would want to compute the sum of the values of all incoming 
neighbors, for each vertex.
-Since the sum is an associative and commutative operation and since the 
neighbors’ values are needed, 
-we will call the `reduceOnNeighbors()` aggregation method: 
+Assume you would want to compute the sum of the values of all incoming 
neighbors for each vertex.
+We will call the `reduceOnNeighbors()` aggregation method since the sum is an 
associative and commutative operation and the neighbors’ values are needed:
 
-`graph.reduceOnNeighbors(new SumValues(), EdgeDirection.IN);`
+```java
+graph.reduceOnNeighbors(new SumValues(), EdgeDirection.IN);
+```
 
 The vertex with id 1 is the only node that has no incoming edges. The result 
is therefore:
 
@@ -155,8 +170,8 @@ it to propagate its old distance to its neighbors; as they 
have already taken it
 Flink’s `IterateDelta` operator permits exploitation of this property as 
well as the
 execution of computations solely on the active parts of the graph. The 
operator receives two inputs:
 
-1. the ** Solution Set **, which represents the current state of the input and
-2. the ** Workset**, , which determines which parts of the graph will be 
recomputed in the next iteration. 
+1. the **Solution Set**, which represents the current state of the input and
+2. the **Workset**, which determines which parts of the graph will be 
recomputed in the next iteration.
 
 In the SSSP example above, the Workset contains the vertices which update 
their distances.
 The user-defined iterative function is applied on these inputs to produce 
state updates.
@@ -196,9 +211,9 @@ vertex values do not need to be recomputed during an 
iteration.
 
 Let us reconsider the Single Source Shortest Paths algorithm. In each 
iteration, a vertex:
 
-1. *[Gather]* retrieves distances from its neighbors summed up with the 
corresponding edge values; 
-2. *[Sum]* compares the newly obtained distances in order to extract the 
minimum;
-3. *[Apply]* and finally adopts the minimum distance computed in the sum step,
+1. **Gather** retrieves distances from its neighbors summed up with the 
corresponding edge values; 
+2. **Sum** compares the newly obtained distances in order to extract the 
minimum;
+3. **Apply** and finally adopts the minimum distance computed in the sum step,
 provided that it is lower than its current value. If a vertex’s value does 
not change during
 an iteration, it no longer propagates its distance.
 
@@ -230,7 +245,7 @@ We currently have implementations of the following 
algorithms:
 1. PageRank
 2. Single-Source-Shortest-Paths
 3. Label Propagation
-4. Community Detection (based on [this 
paper](http://arxiv.org/pdf/0808.2633.pdf) )
+4. Community Detection (based on [this 
paper](http://arxiv.org/pdf/0808.2633.pdf))
 5. Connected Components
 6. GSA Connected Components
 7. GSA PageRank
@@ -243,6 +258,7 @@ as well as computation of common graph metrics.
 [Back to top](#top)
 
 ## Use-Case: Music Profiles
+
 In the following section, we go through a use-case scenario that combines the 
Flink DataSet API
 with Gelly in order to process users’ music preferences to suggest additions 
to their playlist.
 
@@ -254,31 +270,35 @@ on the common songs and use this resulting graph to 
detect communities by callin
 library method. 
 
 For running the example implementation, please use the 0.10-SNAPSHOT version 
of Flink as a
-dependency. The full example code base can be found 
[here](https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java);
 The public data set used for testing
+dependency. The full example code base can be found 
[here](https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java).
 The public data set used for testing
 can be found [here](http://labrosa.ee.columbia.edu/millionsong/tasteprofile). 
This data set contains **48,373,586** real user-id, song-id and
 play-count triplets.
 
+**Note:** The code snippets in this post try to reduce verbosity by skipping 
type parameters of generic functions. Please have a look at [the full 
example](https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java)
 for the correct and complete code.
+
 #### Filtering out Bad Records
 
-After reading the (user-id, song-id, play-count) triplets from a CSV file and 
after parsing a
+After reading the `(user-id, song-id, play-count)` triplets from a CSV file 
and after parsing a
 text file in order to retrieve the list of songs that a user would not want to 
include in a
 playlist, we use a coGroup function to filter out the mismatches.
 
 ```java
 // read the user-song-play triplets.
 DataSet<Tuple3<String, String, Integer>> triplets =
-                                    getUserSongTripletsData(env);
+    getUserSongTripletsData(env);
 
 // read the mismatches dataset and extract the songIDs
 DataSet<Tuple3<String, String, Integer>> validTriplets = triplets
-              .coGroup(mismatches).where(1).equalTo(0)
-              .with(new CoGroupFunction {
-                     void coGroup(Iterable triplets, Iterable invalidSongs, 
Collector out) {
-                        if (!invalidSongs.iterator().hasNext())
-                            for (Tuple3 triplet : triplets) // valid triplet
+        .coGroup(mismatches).where(1).equalTo(0)
+        .with(new CoGroupFunction() {
+                void coGroup(Iterable triplets, Iterable invalidSongs, 
Collector out) {
+                        if (!invalidSongs.iterator().hasNext()) {
+                            for (Tuple3 triplet : triplets) { // valid triplet
                                 out.collect(triplet);
-                     }
-              }
+                            }
+                        }
+                    }
+                }
 ```
 
 The coGroup simply takes the triplets whose song-id (second field) matches the 
song-id from the
@@ -312,7 +332,7 @@ basically iterate through the edge value and collect the 
target (song) of the ma
 ```java
 //get the top track (most listened to) for each user
 DataSet<Tuple2> usersWithTopTrack = userSongGraph
-               .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT);
+        .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT);
 
 class GetTopSongPerUser implements EdgesFunctionWithVertexValue {
     void iterateEdges(Vertex vertex, Iterable<Edge> edges) {
@@ -330,7 +350,7 @@ class GetTopSongPerUser implements 
EdgesFunctionWithVertexValue {
 }
 ```
 
-#### Creating a user-user Similarity Graph
+#### Creating a User-User Similarity Graph
 
 Clustering users based on common interests, in this case, common top songs, 
could prove to be
 very useful for advertisements or for recommending new musical compilations. 
In a user-user graph,
@@ -356,22 +376,27 @@ straightforward as a call to the `Graph.fromDataSet()` 
method.
 // create a user-user similarity graph:
 // two users that listen to the same song are connected
 DataSet<Edge> similarUsers = userSongGraph.getEdges()
-// filter out user-song edges that are below the playcount threshold
-                       .filter(new FilterFunction<Edge<String, Integer>>() {
-                            public boolean filter(Edge<String, Integer> edge) {
-                                return (edge.getValue() > playcountThreshold);
+        // filter out user-song edges that are below the playcount threshold
+        .filter(new FilterFunction<Edge<String, Integer>>() {
+               public boolean filter(Edge<String, Integer> edge) {
+                    return (edge.getValue() > playcountThreshold);
+                }
+        })
+        .groupBy(1)
+        .reduceGroup(new GroupReduceFunction() {
+                void reduce(Iterable<Edge> edges, Collector<Edge> out) {
+                    List users = new ArrayList();
+                    for (Edge edge : edges) {
+                        users.add(edge.getSource());
+                        for (int i = 0; i < users.size() - 1; i++) {
+                            for (int j = i+1; j < users.size() - 1; j++) {
+                                out.collect(new Edge(users.get(i), 
users.get(j)));
                             }
-                       }).groupBy(1)
-                       .reduceGroup(new GroupReduceFunction() {
-                            void reduce(Iterable<Edge> edges, Collector<Edge> 
out) {
-                                List users = new ArrayList();
-                                for (Edge edge : edges)
-                                    users.add(edge.getSource());
-                                    for (int i = 0; i < users.size() - 1; i++)
-                                        for (int j = i+1; j < users.size() - 
1; j++)
-                                            out.collect(new Edge(users.get(i), 
users.get(j)));
-                            }
-                       }).distinct();
+                        }
+                    }
+                }
+        })
+        .distinct();
 
 Graph similarUsersGraph = Graph.fromDataSet(similarUsers).getUndirected();
 ```
@@ -388,21 +413,23 @@ among their neighbors.
 // detect user communities using label propagation
 // initialize each vertex with a unique numeric label
 DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils
-               .zipWithUniqueId(similarUsersGraph.getVertexIds())
-               .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, 
Long>>() {
-                    @Override
-                    public Tuple2<String, Long> map(Tuple2<Long, String> 
tuple2) throws Exception {
-                        return new Tuple2<String, Long>(tuple2.f1, tuple2.f0);
-                    }
-               });
+        .zipWithUniqueId(similarUsersGraph.getVertexIds())
+        .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() {
+                @Override
+                public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) 
throws Exception {
+                    return new Tuple2<String, Long>(tuple2.f1, tuple2.f0);
+                }
+        });
 
 // update the vertex values and run the label propagation algorithm
 DataSet<Vertex> verticesWithCommunity = similarUsersGraph
-              .joinWithVertices(idsWithlLabels, new MapFunction() {
-                    public Long map(Tuple2 idWithLabel) {
-                        return idWithLabel.f1;
-                    }
-              }).run(new LabelPropagation(numIterations)).getVertices();
+        .joinWithVertices(idsWithlLabels, new MapFunction() {
+                public Long map(Tuple2 idWithLabel) {
+                    return idWithLabel.f1;
+                }
+        })
+        .run(new LabelPropagation(numIterations))
+        .getVertices();
 ```
 
 [Back to top](#top)

Reply via email to