Repository: spark
Updated Branches:
refs/heads/branch-1.6 6a9f19dd5 -> 5830828ef
[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs
Some VertexRDD and EdgeRDD are created during the intermediate step of
g.connectedComponents() but unnecessarily left cached after the method is done.
The fix is to unpersist these RDDs once they are no longer in use.
A test case is added to confirm the fix for the reported bug.
Author: Jason Lee
Closes #10713 from jasoncl/SPARK-12655.
(cherry picked from commit d0a5c32bd05841f411a342a80c5da9f73f30d69a)
Signed-off-by: Sean Owen
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5830828e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5830828e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5830828e
Branch: refs/heads/branch-1.6
Commit: 5830828efbf863df510a2b5b17d76214863ff48f
Parents: 6a9f19d
Author: Jason Lee
Authored: Fri Jan 15 12:04:05 2016 +
Committer: Sean Owen
Committed: Tue Jun 7 09:25:04 2016 +0100
--
.../scala/org/apache/spark/graphx/Pregel.scala | 2 +-
.../spark/graphx/lib/ConnectedComponents.scala | 4 +++-
.../scala/org/apache/spark/graphx/GraphSuite.scala | 17 +
3 files changed, 21 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 2ca60d5..8a89295 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -151,7 +151,7 @@ object Pregel extends Logging {
// count the iteration
i += 1
}
-
+messages.unpersist(blocking = false)
g
} // end of apply
http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
--
diff --git
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 859f896..f72cbb1 100644
---
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -47,9 +47,11 @@ object ConnectedComponents {
}
}
val initialMessage = Long.MaxValue
-Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
+val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection =
EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
+ccGraph.unpersist()
+pregelGraph
} // end of connectedComponents
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 9acbd79..a46c5da 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -428,6 +428,23 @@ class GraphSuite extends SparkFunSuite with
LocalSparkContext {
}
}
+ test("unpersist graph RDD") {
+withSpark { sc =>
+ val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1)
+ val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)),
1)
+ val g0 = Graph(vert, edges)
+ val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2)
+ val cc = g.connectedComponents()
+ assert(sc.getPersistentRDDs.nonEmpty)
+ cc.unpersist()
+ g.unpersist()
+ g0.unpersist()
+ vert.unpersist()
+ edges.unpersist()
+ assert(sc.getPersistentRDDs.isEmpty)
+}
+ }
+
test("SPARK-14219: pickRandomVertex") {
withSpark { sc =>
val vert = sc.parallelize(List((1L, "a")), 1)
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org