Github user jegonzal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1217#discussion_r14227560
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala ---
    @@ -158,4 +169,125 @@ object Pregel extends Logging {
         g
       } // end of apply
     
    +  /**
    +   * Execute a Pregel-like iterative vertex-parallel abstraction.  The
    +   * user-defined vertex-program `vprog` is executed in parallel on
    +   * each vertex receiving any inbound messages and computing a new
    +   * value for the vertex.  The `sendMsg` function is then invoked on
    +   * all out-edges and is used to compute an optional message to the
    +   * destination vertex. The `mergeMsg` function is a commutative
    +   * associative function used to combine messages destined to the
    +   * same vertex.
    +   *
    +   * On the first iteration all vertices receive the `initialMsg` and
    +   * on subsequent iterations if a vertex does not receive a message
    +   * then the vertex-program is not invoked.
    +   *
    +   * This function iterates until there are no remaining messages, or
    +   * for `maxIterations` iterations.
    +   *
    +   * @tparam VD the vertex data type
    +   * @tparam ED the edge data type
    +   * @tparam A the Pregel message type
    +   *
    +   * @param graph the input graph.
    +   *
    +   * @param initialMsg the message each vertex will receive at the on
    +   * the first iteration
    +   *
    +   * @param maxIterations the maximum number of iterations to run for
    +   *
    +   * @param activeDirection the direction of edges incident to a vertex 
that received a message in
    +   * the previous round on which to run `sendMsg`. For example, if this is 
`EdgeDirection.Out`, only
    +   * out-edges of vertices that received a message in the previous round 
will run. The default is
    +   * `EdgeDirection.Either`, which will run `sendMsg` on edges where 
either side received a message
    +   * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` 
will only run on edges where
    +   * *both* vertices received a message.
    +   *
    +   * @param vprog the user-defined vertex program which runs on each
    +   * vertex and receives the inbound message and computes a new vertex
    +   * value.  On the first iteration the vertex program is invoked on
    +   * all vertices and is passed the default message.  On subsequent
    +   * iterations the vertex program is only invoked on those vertices
    +   * that receive messages.
    +   *
    +   * @param sendMsg a user supplied function that is applied to out
    +   * edges of vertices that received messages in the current
    +   * iteration
    +   *
    +   * @param mergeMsg a user supplied function that takes two incoming
    +   * messages of type A and merges them into a single message of type
    +   * A.  ''This function must be commutative and associative and
    +   * ideally the size of A should not increase.''
    +   *
    +   * @return the resulting graph at the end of the computation
    +   *
    +   */
    +  def run[VD: ClassTag, ED: ClassTag, A: ClassTag]
    +  (graph: Graph[VD, ED],
    +   maxIterations: Int = Int.MaxValue,
    +   activeDirection: EdgeDirection = EdgeDirection.Either)
    +  (vertexProgram: (VertexId, VD, Option[A], VertexContext) => VD,
    +   sendMsg: (EdgeTriplet[VD, ED], EdgeContext) => Iterator[(VertexId, A)],
    +   mergeMsg: (A, A) => A)
    +  : Graph[VD, ED] =
    +  {
    +    // Initialize the graph with all vertices active
    +    var g: Graph[(VD, Boolean), ED] = graph.mapVertices { (vid, vdata) => 
(vdata, true) }.cache()
    +    // Determine the set of vertices that did not vote to halt
    +    var activeVertices = g.vertices
    +    var numActive = activeVertices.count()
    +    var i = 0
    +    while (numActive > 0 && i < maxIterations) {
    +      // The send message wrapper removes the active fields from the 
triplet and places them in the edge context.
    +      def sendMessageWrapper(triplet: EdgeTriplet[(VD, Boolean),ED]): 
Iterator[(VertexId, A)] = {
    +        val simpleTriplet = new EdgeTriplet[VD, ED]()
    +        simpleTriplet.set(triplet)
    +        simpleTriplet.srcAttr = triplet.srcAttr._1
    +        simpleTriplet.dstAttr = triplet.dstAttr._1
    +        val ctx = new EdgeContext(i, triplet.srcAttr._2, 
triplet.dstAttr._2)
    +        sendMsg(simpleTriplet, ctx)
    +      }
    +
    +      // Compute the messages for all the active vertices
    +      val messages = g.mapReduceTriplets(sendMessageWrapper, mergeMsg, 
Some((activeVertices, activeDirection)))
    +
    +      // get a reference to the current graph so that we can unpersist it 
once the new graph is created.
    +      val prevG = g
    +
    +      // Receive the messages to the subset of active vertices
    +      g = g.outerJoinVertices(messages){ (vid, dataAndActive, msgOpt) =>
    +        val (vdata, active) = dataAndActive
    +        // If the vertex voted to halt and received no message then we can 
skip the vertex program
    +        if (!active && msgOpt.isEmpty) {
    +          dataAndActive
    +        } else {
    +          val ctx = new VertexContext(i, active)
    +          // The vertex program is either active or received a message (or 
both).
    +          // A vertex program should vote to halt again even if it has 
previously voted to halt
    +          (vertexProgram(vid, vdata, msgOpt, ctx), ctx.isActive)
    +        }
    +      }.cache()
    +
    +      // Recompute the active vertices (those that have not voted to halt)
    +      activeVertices = g.vertices.filter(v => v._2._2)
    +
    +      // Force all computation!
    +      numActive = activeVertices.count()
    +
    +      // Unpersist the RDDs hidden by newly-materialized RDDs
    +//      prevG.unpersistVertices(blocking=false)
    +//      prevG.edges.unpersist(blocking=false)
    --- End diff --
    
    Uncommenting lines 279 and 280 leads to a substantial slow down in later 
iterations indicating that there is still an issue with `unpersist`.  
@ankurdave any thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to