Julien MASSIOT created SPARK-19023:
--------------------------------------

             Summary: Memory leak on GraphX with an iterative algorithm and 
checkpoint on the graph
                 Key: SPARK-19023
                 URL: https://issues.apache.org/jira/browse/SPARK-19023
             Project: Spark
          Issue Type: Bug
          Components: GraphX
    Affects Versions: 2.0.2
            Reporter: Julien MASSIOT


I am facing OOM whithin a spark streaming application with GraphX.  
While trying to reproduce the issue on a simple application, I was able to 
identify what appears to be 2 kind of memory leaks.  
  
*Leak 1*

It can be reproduced with this simple scala application (that simulates more or 
less what I'm doing in my spark streaming application, each iteration within 
the loop simulating one micro-batch).

{code:title=TestGraph.scala|borderStyle=solid}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._


object TestGraph {
    case class ImpactingEvent(entityInstance: String)
    case class ImpactedNode(entityIsntance:String)
    case class RelationInstance(relationType : String)
    var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
    
    def main(args: Array[String]) {
      val conf = new 
SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
      conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
      val sc = new SparkContext(conf)
      sc.setLogLevel("ERROR")
     
      val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, 
ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, 
ImpactedNode("Node3"))))
      
      val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( Edge(1L, 
2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
        
      impactingGraph = Graph(vertices, edges, null)
      
      var x =0;
      for(x <- 1 to 10){
        impactingGraph = propagateEvent(impactingGraph, 
ImpactingEvent("node1"), sc)
        
        impactingGraph.checkpoint()
        impactingGraph.edges.count()
        impactingGraph.vertices.count()
      }
      println("Hello")
      Thread.sleep(10000000)
    }
    
    private def propagateEvent(impactingGraph: Graph[ImpactedNode, 
RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, 
RelationInstance] = {
      var graph = impactingGraph.mapVertices((id, node) => node ).cache
      impactingGraph.unpersist(true)
      graph.cache();
    }
}

{code}
  
In this simple application, I am just applying a mapVertices transformation on 
the graph and then I am doing a checkpoint on the graph. I am doing this 
operation 10 times.   
After this application finished the loop, I am taking an heapdump.  
  
In this heapdump, I am able to see 11 "live" GraphImpl instances in memory.  
My expectation is to have only 1 (the one referenced in the global variable 
impactingGraph).  
  
The "leak" is coming from the f function in a MapPartitionsRDD (which is 
referenced by the partitionsRDD variable of my VertexRDD).
This f function contains an outer reference to the graph created in the 
previous iteration.

I can see that in the clearDependencies function of MapPartitionsRDD, the f 
function is not reset to null.
  
When looking to similar issues, I found this pull request:  
[https://github.com/apache/spark/pull/3545]


In this pull request, the f variable is reset to null in the clearDependencies 
method of the ZippedPartitionsRDD.
I do not understand why the same is not done within the MapPartitionsRDD.  
I made a try by patching spark-core and by setting f to null in 
clearDependencies of MapPartitionsRDD and it solved my leak on this simple use 
case.

Don't you think the f variable has to be reset to null also in MapPartitionsRDD 
?


*Leak 2*


Now, I'll do the same but in the propageEvent method in addition to the 
mapVertices I am doing a joinVertices on the graph.
It can be found in the following application:

{code:title=TestGraph.scala|borderStyle=solid}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._


object TestGraph {
    case class ImpactingEvent(entityInstance: String)
    case class ImpactedNode(entityIsntance:String)
    case class RelationInstance(relationType : String)
    var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
    
    def main(args: Array[String]) {
      val conf = new 
SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
      conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
      val sc = new SparkContext(conf)
      sc.setLogLevel("ERROR")
     
      val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, 
ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, 
ImpactedNode("Node3"))))
      
      val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( Edge(1L, 
2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
        
      impactingGraph = Graph(vertices, edges, null)
      
      var x =0;
      for(x <- 1 to 10){
        impactingGraph = propagateEvent(impactingGraph, 
ImpactingEvent("node1"), sc)
        
        impactingGraph.checkpoint()
        impactingGraph.edges.count()
        impactingGraph.vertices.count()
      }
      println("Hello")
      Thread.sleep(10000000)
    }
    
    private def propagateEvent(impactingGraph: Graph[ImpactedNode, 
RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, 
RelationInstance] = {
      var graph = impactingGraph.mapVertices((id, node) => node ).cache
      val verticesToJoin: RDD[(VertexId, String)] = sc.parallelize(Array( (1L, 
"Node1"), (2L, "Node2")) )
      graph = graph.joinVertices(verticesToJoin)({(id,src,toJoin)=>src})
      impactingGraph.unpersist(true)
      graph.cache();
    }
}

{code}

When running this application and taking a memory dump, I can still see 11 
"live" GraphImpl in memory (where I am expecting only 1) (even with the patch 
described in the previous section).

When analyzing this dump, I can see that the "leak" is coming from a reference 
to an array of partitions hold by the "partitions_" variable within the EdgeRDD 
(this array of partitions contains a reference to the MapPartitionsRDD that 
contains a reference to the graph created by the previous iteration similarly 
to what is described in the *Leak 1* section)

This array of partitions is referenced 2 times:
* once in the "partitions_" variable of the partitionsRDD emebedded within the 
EdgeRDD
* once in the "partitions_" variable of the EdgeRDD itself

This is coming from the getPartition method within the EdgeRDD

{code:title=EdgeRDD.scala|borderStyle=solid}

  override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions

{code}

After the checkpoint and count is called on graph edges, the reference to this 
array is cleaned within the partitionsRDD of the EdgeRDD.
It is done through this call:

{code:title=RDD.scala|borderStyle=solid}

  /**
   * Changes the dependencies of this RDD from its original parents to a new 
RDD (`newRDD`)
   * created from the checkpoint file, and forget its old dependencies and 
partitions.
   */
  private[spark] def markCheckpointed(): Unit = {
    clearDependencies()
    partitions_ = null
    deps = null    // Forget the constructor argument for dependencies too
  }

{code}

But this is not done for the "partitions_" variable of the EdgeRDD itself.
Indeed, the markCheckpointed() method is not called on the EdgeRDD itself but 
only on the partitionsRDD embedded within the EdgeRDD.

Due to that, we still have a reference to this array of partitions (that 
references a MapPartitionsRDD that references the graph of the previous 
iteration).

I am able to solve this leak if I am calling the checkpoint and count on the 
edges just after the mapVertices (and before the joinVertices) (and if the 
patch described in the previous section is applied on MapPartitionsRDD).

But it doesn't seem clean to me.
In my mind:
* either the "partitions_" variable of the EdgeRDD should be reset to null 
after a checkpoint is called on the Graph
* either the "partitions_" variable of the EdgeRDD should not reference the 
same array of partitions as the one referenced by the "partitions_" variable of 
the partitionsRDD. (don't know if this "partitions_" is really usefull on the 
EdgeRDD)

What do you think?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to