[ 
https://issues.apache.org/jira/browse/SPARK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sjk updated SPARK-5036:
-----------------------
    Attachment: s2.jpeg

> Better support sending partial messages in Pregel API
> -----------------------------------------------------
>
>                 Key: SPARK-5036
>                 URL: https://issues.apache.org/jira/browse/SPARK-5036
>             Project: Spark
>          Issue Type: Improvement
>          Components: GraphX
>            Reporter: sjk
>         Attachments: s1.jpeg, s2.jpeg
>
>
> Better support sending partial messages in Pregel API
> 1. the reqirement
> In many iterative graph algorithms, only a part of the vertexes (we call them 
> ActiveVertexes) need to send messages to their neighbours in each iteration. 
> In many cases, ActiveVertexes are the vertexes that their attributes do not 
> change between the previous and current iteration. To implement this 
> requirement, we can use Pregel API + a flag (e.g., `bool isAttrChanged`) in 
> each vertex's attribute. 
> However, after `aggregateMessage` or `mapReduceTriplets` of each iteration, 
> we need to reset this flag to the init value in every vertex, which needs a 
> heavy `joinVertices`. 
> We find a more efficient way to meet this requirement and want to discuss it 
> here.
> Look at a simple example as follows:
> In i-th iteartion, the previous attribute of each vertex is `Attr` and the 
> newly computed attribute is `NewAttr`:
> |VID| Attr| NewAttr| Neighbours|
> |:----|:-----|:----|:------|
> | 1 | 4| 5| 2, 3 |
> | 2 | 3| 2| 1, 4 |
> | 3 | 2| 2| 1, 4 |
> | 4|  3| 4| 1, 2, 3 |
> Our requirement is that: 
> 1.    Set each vertex's `Attr` to be `NewAttr` in i-th iteration    
> 2.    For each vertex whose `Attr!=NewAttr`, send message to its neighbours 
> in the next iteration's `aggregateMessage`.
> We found it is hard to implement this requirment using current Pregel API 
> efficiently. The reason is that we not only need to perform `pregel()` to  
> compute the `NewAttr`  (2) but also need to perform `outJoin()` to satisfy 
> (1).
> A simple idea is to keep a `isAttrChanged:Boolean` (solution 1)  or 
> `flag:Int` (solution 2) in each vertex's attribute.
>  2. two solution  
> -----------
> 2.1 solution 1: label and reset `isAttrChanged:Boolean` of Vertex Attr
> ![alt text](s1.jpeg "Title")
> 1. init message by `aggregateMessage`
>       it return a messageRDD
> 2. `innerJoin`
>       compute the messages on the received vertex, return a new VertexRDD 
> which have the computed value by customed logic function `vprog`, set 
> `isAttrChanged = true`
> 3. `outerJoinVertices`
>       update the changed vertex to the whole graph. now the graph is new.
> 4. `aggregateMessage`. it return a messageRDD
> 5. `joinVertices`  reset erery `isAttrChanged` of Vertex attr to false
>       ```
>       //      here reset the isAttrChanged to false
>       g = updateG.joinVertices(updateG.vertices) {
>               (vid, oriVertex, updateGVertex) => updateGVertex.reset()
>       }
>    ```
>    here need to reset the vertex attribute object's variable as false
> if don't reset the `isAttrChanged`, it will send message next iteration 
> directly.
> **result:**  
> *     Edge: 890041895 
> *     Vertex: 181640208
> *     Iterate: 150 times
> *     Cost total: 8.4h
> *     can't run until the 0 message 
> solution 2. color vertex
> ![alt text](s2.jpeg "Title")
> iterate process:
> 1. innerJoin 
>   `vprog` using as a partial function, looks like `vprog(curIter, _: 
> VertexId, _: VD, _: A)`
>   ` i = i + 1; val curIter = i`. 
>   in `vprog`, user can fetch `curIter` and assign to `falg`.
> 2. outerJoinVertices
>       `graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) => 
> newOpt.getOrElse(old)}.cache()`
> 3. aggregateMessages     
>       sendMsg is partial function, looks like `sendMsg(curIter, _: 
> EdgeContext[VD, ED, A]`            
>       **in `sendMsg`, compare `curIter` with `flag`, determine whether 
> sending message**
> ####  result
> raw data       from
> *     vertex: 181640208
> *     edge: 890041895
> |  | iteration average cost | 150 iteration cost | 420 iteration cost | 
> | ------------ | ------------- | ------------ | ------------ |
> |  solution 1 | 188m | 7.8h | cannot finish  |
> |  solution 2 | 24 | 1.2h   | 3.1h | 
> | compare  | 7x  | 6.5x  | finished in 3.1 |
>     
> ##            the end
>     
> i think the second solution(Pregel + a flag) is better.    
> this can really support the iterative graph algorithms which only part of the 
> vertexes send messages to their neighbours in each iteration.
> we shall use it in product environment.
> ----EOF----



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to