Repository: spark Updated Branches: refs/heads/master fe7246fea -> d0a5c32bd
[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs Some VertexRDD and EdgeRDD are created during the intermediate step of g.connectedComponents() but unnecessarily left cached after the method is done. The fix is to unpersist these RDDs once they are no longer in use. A test case is added to confirm the fix for the reported bug. Author: Jason Lee <cj...@us.ibm.com> Closes #10713 from jasoncl/SPARK-12655. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0a5c32b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0a5c32b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0a5c32b Branch: refs/heads/master Commit: d0a5c32bd05841f411a342a80c5da9f73f30d69a Parents: fe7246f Author: Jason Lee <cj...@us.ibm.com> Authored: Fri Jan 15 12:04:05 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Jan 15 12:04:05 2016 +0000 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/graphx/Pregel.scala | 2 +- .../spark/graphx/lib/ConnectedComponents.scala | 4 +++- .../scala/org/apache/spark/graphx/GraphSuite.scala | 16 ++++++++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index b908860..7960827 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -151,7 +151,7 @@ object Pregel extends Logging { // count the iteration i += 1 } - + messages.unpersist(blocking = false) g } // end of apply http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 859f896..f72cbb1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -47,9 +47,11 @@ object ConnectedComponents { } } val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( + val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) + ccGraph.unpersist() + pregelGraph } // end of connectedComponents } http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 1f5e27d..2fbc6f0 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -428,4 +428,20 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { } } + test("unpersist graph RDD") { + withSpark { sc => + val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1) + val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 1) + val g0 = Graph(vert, edges) + val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2) + val cc = g.connectedComponents() + assert(sc.getPersistentRDDs.nonEmpty) + cc.unpersist() + g.unpersist() + g0.unpersist() + vert.unpersist() + edges.unpersist() + assert(sc.getPersistentRDDs.isEmpty) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org