[ 
https://issues.apache.org/jira/browse/SPARK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sjk updated SPARK-5036:
-----------------------
    Description: 
Better support sending partial messages in Pregel API

1. the reqirement

In many iterative graph algorithms, only a part of the vertexes (we call them 
ActiveVertexes) need to send messages to their neighbours in each iteration. In 
many cases, ActiveVertexes are the vertexes that their attributes do not change 
between the previous and current iteration. To implement this requirement, we 
can use Pregel API + a flag (e.g., `bool isAttrChanged`) in each vertex's 
attribute. 

However, after `aggregateMessage` or `mapReduceTriplets` of each iteration, we 
need to reset this flag to the init value in every vertex, which needs a heavy 
`joinVertices`. 

We find a more efficient way to meet this requirement and want to discuss it 
here.


Look at a simple example as follows:

In i-th iteartion, the previous attribute of each vertex is `Attr` and the 
newly computed attribute is `NewAttr`:

|VID| Attr| NewAttr| Neighbours|
|:----|:-----|:----|:------|
| 1 | 4| 5| 2, 3 |
| 2 | 3| 2| 1, 4 |
| 3 | 2| 2| 1, 4 |
| 4|  3| 4| 1, 2, 3 |

Our requirement is that: 

1.      Set each vertex's `Attr` to be `NewAttr` in i-th iteration    
2.      For each vertex whose `Attr!=NewAttr`, send message to its neighbours 
in the next iteration's `aggregateMessage`.


We found it is hard to implement this requirment using current Pregel API 
efficiently. The reason is that we not only need to perform `pregel()` to  
compute the `NewAttr`  (2) but also need to perform `outJoin()` to satisfy (1).

A simple idea is to keep a `isAttrChanged:Boolean` (solution 1)  or `flag:Int` 
(solution 2) in each vertex's attribute.

 2. two solution  
-----------

2.1 solution 1: label and reset `isAttrChanged:Boolean` of Vertex Attr

![alt text](s1.jpeg "Title")

1. init message by `aggregateMessage`
        it return a messageRDD
2. `innerJoin`
        compute the messages on the received vertex, return a new VertexRDD 
which have the computed value by customed logic function `vprog`, set 
`isAttrChanged = true`
3. `outerJoinVertices`
        update the changed vertex to the whole graph. now the graph is new.
4. `aggregateMessage`. it return a messageRDD
5. `joinVertices`  reset erery `isAttrChanged` of Vertex attr to false

        ```
        //      here reset the isAttrChanged to false
        g = updateG.joinVertices(updateG.vertices) {
        (vid, oriVertex, updateGVertex) => updateGVertex.reset()
        }
   ```
   here need to reset the vertex attribute object's variable as false

if don't reset the `isAttrChanged`, it will send message next iteration 
directly.

**result:**  

*       Edge: 890041895 
*       Vertex: 181640208
*       Iterate: 150 times
*       Cost total: 8.4h
*       can't run until the 0 message 


solution 2. color vertex

![alt text](s2.jpeg "Title")

iterate process:

1. innerJoin 
  `vprog` using as a partial function, looks like `vprog(curIter, _: VertexId, 
_: VD, _: A)`
  ` i = i + 1; val curIter = i`. 
  in `vprog`, user can fetch `curIter` and assign to `falg`.
2. outerJoinVertices
        `graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old)}.cache()`
3. aggregateMessages     
        sendMsg is partial function, looks like `sendMsg(curIter, _: 
EdgeContext[VD, ED, A]`            
        **in `sendMsg`, compare `curIter` with `flag`, determine whether 
sending message**

####    result

raw data       from

*       vertex: 181640208
*       edge: 890041895


|  | iteration average cost | 150 iteration cost | 420 iteration cost | 
| ------------ | ------------- | ------------ | ------------ |
|  solution 1 | 188m | 7.8h | cannot finish  |
|  solution 2 | 24 | 1.2h   | 3.1h | 
| compare  | 7x  | 6.5x  | finished in 3.1 |

    
##              the end
    
i think the second solution(Pregel + a flag) is better.    
this can really support the iterative graph algorithms which only part of the 
vertexes send messages to their neighbours in each iteration.

we shall use it in product environment.

pr: https://github.com/apache/spark/pull/3866

----EOF----


  was:
Better support sending partial messages in Pregel API

1. the reqirement

In many iterative graph algorithms, only a part of the vertexes (we call them 
ActiveVertexes) need to send messages to their neighbours in each iteration. In 
many cases, ActiveVertexes are the vertexes that their attributes do not change 
between the previous and current iteration. To implement this requirement, we 
can use Pregel API + a flag (e.g., `bool isAttrChanged`) in each vertex's 
attribute. 

However, after `aggregateMessage` or `mapReduceTriplets` of each iteration, we 
need to reset this flag to the init value in every vertex, which needs a heavy 
`joinVertices`. 

We find a more efficient way to meet this requirement and want to discuss it 
here.


Look at a simple example as follows:

In i-th iteartion, the previous attribute of each vertex is `Attr` and the 
newly computed attribute is `NewAttr`:

|VID| Attr| NewAttr| Neighbours|
|:----|:-----|:----|:------|
| 1 | 4| 5| 2, 3 |
| 2 | 3| 2| 1, 4 |
| 3 | 2| 2| 1, 4 |
| 4|  3| 4| 1, 2, 3 |

Our requirement is that: 

1.      Set each vertex's `Attr` to be `NewAttr` in i-th iteration    
2.      For each vertex whose `Attr!=NewAttr`, send message to its neighbours 
in the next iteration's `aggregateMessage`.


We found it is hard to implement this requirment using current Pregel API 
efficiently. The reason is that we not only need to perform `pregel()` to  
compute the `NewAttr`  (2) but also need to perform `outJoin()` to satisfy (1).

A simple idea is to keep a `isAttrChanged:Boolean` (solution 1)  or `flag:Int` 
(solution 2) in each vertex's attribute.

 2. two solution  
-----------

2.1 solution 1: label and reset `isAttrChanged:Boolean` of Vertex Attr

![alt text](s1.jpeg "Title")

1. init message by `aggregateMessage`
        it return a messageRDD
2. `innerJoin`
        compute the messages on the received vertex, return a new VertexRDD 
which have the computed value by customed logic function `vprog`, set 
`isAttrChanged = true`
3. `outerJoinVertices`
        update the changed vertex to the whole graph. now the graph is new.
4. `aggregateMessage`. it return a messageRDD
5. `joinVertices`  reset erery `isAttrChanged` of Vertex attr to false

        ```
        //      here reset the isAttrChanged to false
        g = updateG.joinVertices(updateG.vertices) {
        (vid, oriVertex, updateGVertex) => updateGVertex.reset()
        }
   ```
   here need to reset the vertex attribute object's variable as false

if don't reset the `isAttrChanged`, it will send message next iteration 
directly.

**result:**  

*       Edge: 890041895 
*       Vertex: 181640208
*       Iterate: 150 times
*       Cost total: 8.4h
*       can't run until the 0 message 


solution 2. color vertex

![alt text](s2.jpeg "Title")

iterate process:

1. innerJoin 
  `vprog` using as a partial function, looks like `vprog(curIter, _: VertexId, 
_: VD, _: A)`
  ` i = i + 1; val curIter = i`. 
  in `vprog`, user can fetch `curIter` and assign to `falg`.
2. outerJoinVertices
        `graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old)}.cache()`
3. aggregateMessages     
        sendMsg is partial function, looks like `sendMsg(curIter, _: 
EdgeContext[VD, ED, A]`            
        **in `sendMsg`, compare `curIter` with `flag`, determine whether 
sending message**

####    result

raw data       from

*       vertex: 181640208
*       edge: 890041895


|  | iteration average cost | 150 iteration cost | 420 iteration cost | 
| ------------ | ------------- | ------------ | ------------ |
|  solution 1 | 188m | 7.8h | cannot finish  |
|  solution 2 | 24 | 1.2h   | 3.1h | 
| compare  | 7x  | 6.5x  | finished in 3.1 |

    
##              the end
    
i think the second solution(Pregel + a flag) is better.    
this can really support the iterative graph algorithms which only part of the 
vertexes send messages to their neighbours in each iteration.

we shall use it in product environment.

----EOF----



> Better support sending partial messages in Pregel API
> -----------------------------------------------------
>
>                 Key: SPARK-5036
>                 URL: https://issues.apache.org/jira/browse/SPARK-5036
>             Project: Spark
>          Issue Type: Improvement
>          Components: GraphX
>            Reporter: sjk
>         Attachments: s1.jpeg, s2.jpeg
>
>
> Better support sending partial messages in Pregel API
> 1. the reqirement
> In many iterative graph algorithms, only a part of the vertexes (we call them 
> ActiveVertexes) need to send messages to their neighbours in each iteration. 
> In many cases, ActiveVertexes are the vertexes that their attributes do not 
> change between the previous and current iteration. To implement this 
> requirement, we can use Pregel API + a flag (e.g., `bool isAttrChanged`) in 
> each vertex's attribute. 
> However, after `aggregateMessage` or `mapReduceTriplets` of each iteration, 
> we need to reset this flag to the init value in every vertex, which needs a 
> heavy `joinVertices`. 
> We find a more efficient way to meet this requirement and want to discuss it 
> here.
> Look at a simple example as follows:
> In i-th iteartion, the previous attribute of each vertex is `Attr` and the 
> newly computed attribute is `NewAttr`:
> |VID| Attr| NewAttr| Neighbours|
> |:----|:-----|:----|:------|
> | 1 | 4| 5| 2, 3 |
> | 2 | 3| 2| 1, 4 |
> | 3 | 2| 2| 1, 4 |
> | 4|  3| 4| 1, 2, 3 |
> Our requirement is that: 
> 1.    Set each vertex's `Attr` to be `NewAttr` in i-th iteration    
> 2.    For each vertex whose `Attr!=NewAttr`, send message to its neighbours 
> in the next iteration's `aggregateMessage`.
> We found it is hard to implement this requirment using current Pregel API 
> efficiently. The reason is that we not only need to perform `pregel()` to  
> compute the `NewAttr`  (2) but also need to perform `outJoin()` to satisfy 
> (1).
> A simple idea is to keep a `isAttrChanged:Boolean` (solution 1)  or 
> `flag:Int` (solution 2) in each vertex's attribute.
>  2. two solution  
> -----------
> 2.1 solution 1: label and reset `isAttrChanged:Boolean` of Vertex Attr
> ![alt text](s1.jpeg "Title")
> 1. init message by `aggregateMessage`
>       it return a messageRDD
> 2. `innerJoin`
>       compute the messages on the received vertex, return a new VertexRDD 
> which have the computed value by customed logic function `vprog`, set 
> `isAttrChanged = true`
> 3. `outerJoinVertices`
>       update the changed vertex to the whole graph. now the graph is new.
> 4. `aggregateMessage`. it return a messageRDD
> 5. `joinVertices`  reset erery `isAttrChanged` of Vertex attr to false
>       ```
>       //      here reset the isAttrChanged to false
>       g = updateG.joinVertices(updateG.vertices) {
>               (vid, oriVertex, updateGVertex) => updateGVertex.reset()
>       }
>    ```
>    here need to reset the vertex attribute object's variable as false
> if don't reset the `isAttrChanged`, it will send message next iteration 
> directly.
> **result:**  
> *     Edge: 890041895 
> *     Vertex: 181640208
> *     Iterate: 150 times
> *     Cost total: 8.4h
> *     can't run until the 0 message 
> solution 2. color vertex
> ![alt text](s2.jpeg "Title")
> iterate process:
> 1. innerJoin 
>   `vprog` using as a partial function, looks like `vprog(curIter, _: 
> VertexId, _: VD, _: A)`
>   ` i = i + 1; val curIter = i`. 
>   in `vprog`, user can fetch `curIter` and assign to `falg`.
> 2. outerJoinVertices
>       `graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) => 
> newOpt.getOrElse(old)}.cache()`
> 3. aggregateMessages     
>       sendMsg is partial function, looks like `sendMsg(curIter, _: 
> EdgeContext[VD, ED, A]`            
>       **in `sendMsg`, compare `curIter` with `flag`, determine whether 
> sending message**
> ####  result
> raw data       from
> *     vertex: 181640208
> *     edge: 890041895
> |  | iteration average cost | 150 iteration cost | 420 iteration cost | 
> | ------------ | ------------- | ------------ | ------------ |
> |  solution 1 | 188m | 7.8h | cannot finish  |
> |  solution 2 | 24 | 1.2h   | 3.1h | 
> | compare  | 7x  | 6.5x  | finished in 3.1 |
>     
> ##            the end
>     
> i think the second solution(Pregel + a flag) is better.    
> this can really support the iterative graph algorithms which only part of the 
> vertexes send messages to their neighbours in each iteration.
> we shall use it in product environment.
> pr: https://github.com/apache/spark/pull/3866
> ----EOF----



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to