Repository: spark
Updated Branches:
  refs/heads/master fe7246fea -> d0a5c32bd


[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs

Some VertexRDD and EdgeRDD are created during the intermediate step of 
g.connectedComponents() but unnecessarily left cached after the method is done. 
The fix is to unpersist these RDDs once they are no longer in use.

A test case is added to confirm the fix for the reported bug.

Author: Jason Lee <cj...@us.ibm.com>

Closes #10713 from jasoncl/SPARK-12655.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0a5c32b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0a5c32b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0a5c32b

Branch: refs/heads/master
Commit: d0a5c32bd05841f411a342a80c5da9f73f30d69a
Parents: fe7246f
Author: Jason Lee <cj...@us.ibm.com>
Authored: Fri Jan 15 12:04:05 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jan 15 12:04:05 2016 +0000

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/graphx/Pregel.scala |  2 +-
 .../spark/graphx/lib/ConnectedComponents.scala      |  4 +++-
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 ++++++++++++++++
 3 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index b908860..7960827 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -151,7 +151,7 @@ object Pregel extends Logging {
       // count the iteration
       i += 1
     }
-
+    messages.unpersist(blocking = false)
     g
   } // end of apply
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 859f896..f72cbb1 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -47,9 +47,11 @@ object ConnectedComponents {
       }
     }
     val initialMessage = Long.MaxValue
-    Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
+    val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = 
EdgeDirection.Either)(
       vprog = (id, attr, msg) => math.min(attr, msg),
       sendMsg = sendMessage,
       mergeMsg = (a, b) => math.min(a, b))
+    ccGraph.unpersist()
+    pregelGraph
   } // end of connectedComponents
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 1f5e27d..2fbc6f0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -428,4 +428,20 @@ class GraphSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("unpersist graph RDD") {
+    withSpark { sc =>
+      val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1)
+      val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 
1)
+      val g0 = Graph(vert, edges)
+      val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2)
+      val cc = g.connectedComponents()
+      assert(sc.getPersistentRDDs.nonEmpty)
+      cc.unpersist()
+      g.unpersist()
+      g0.unpersist()
+      vert.unpersist()
+      edges.unpersist()
+      assert(sc.getPersistentRDDs.isEmpty)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to