spark git commit: [SPARK-12655][GRAPHX] GraphX does not unpersist RDDs

2016-06-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6a9f19dd5 -> 5830828ef


[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs

Some VertexRDD and EdgeRDD are created during the intermediate step of 
g.connectedComponents() but unnecessarily left cached after the method is done. 
The fix is to unpersist these RDDs once they are no longer in use.

A test case is added to confirm the fix for the reported bug.

Author: Jason Lee 

Closes #10713 from jasoncl/SPARK-12655.

(cherry picked from commit d0a5c32bd05841f411a342a80c5da9f73f30d69a)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5830828e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5830828e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5830828e

Branch: refs/heads/branch-1.6
Commit: 5830828efbf863df510a2b5b17d76214863ff48f
Parents: 6a9f19d
Author: Jason Lee 
Authored: Fri Jan 15 12:04:05 2016 +
Committer: Sean Owen 
Committed: Tue Jun 7 09:25:04 2016 +0100

--
 .../scala/org/apache/spark/graphx/Pregel.scala |  2 +-
 .../spark/graphx/lib/ConnectedComponents.scala |  4 +++-
 .../scala/org/apache/spark/graphx/GraphSuite.scala | 17 +
 3 files changed, 21 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 2ca60d5..8a89295 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -151,7 +151,7 @@ object Pregel extends Logging {
   // count the iteration
   i += 1
 }
-
+messages.unpersist(blocking = false)
 g
   } // end of apply
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 859f896..f72cbb1 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -47,9 +47,11 @@ object ConnectedComponents {
   }
 }
 val initialMessage = Long.MaxValue
-Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
+val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = 
EdgeDirection.Either)(
   vprog = (id, attr, msg) => math.min(attr, msg),
   sendMsg = sendMessage,
   mergeMsg = (a, b) => math.min(a, b))
+ccGraph.unpersist()
+pregelGraph
   } // end of connectedComponents
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5830828e/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 9acbd79..a46c5da 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -428,6 +428,23 @@ class GraphSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("unpersist graph RDD") {
+withSpark { sc =>
+  val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1)
+  val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 
1)
+  val g0 = Graph(vert, edges)
+  val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2)
+  val cc = g.connectedComponents()
+  assert(sc.getPersistentRDDs.nonEmpty)
+  cc.unpersist()
+  g.unpersist()
+  g0.unpersist()
+  vert.unpersist()
+  edges.unpersist()
+  assert(sc.getPersistentRDDs.isEmpty)
+}
+  }
+
   test("SPARK-14219: pickRandomVertex") {
 withSpark { sc =>
   val vert = sc.parallelize(List((1L, "a")), 1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12655][GRAPHX] GraphX does not unpersist RDDs

2016-01-15 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master fe7246fea -> d0a5c32bd


[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs

Some VertexRDD and EdgeRDD are created during the intermediate step of 
g.connectedComponents() but unnecessarily left cached after the method is done. 
The fix is to unpersist these RDDs once they are no longer in use.

A test case is added to confirm the fix for the reported bug.

Author: Jason Lee 

Closes #10713 from jasoncl/SPARK-12655.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0a5c32b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0a5c32b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0a5c32b

Branch: refs/heads/master
Commit: d0a5c32bd05841f411a342a80c5da9f73f30d69a
Parents: fe7246f
Author: Jason Lee 
Authored: Fri Jan 15 12:04:05 2016 +
Committer: Sean Owen 
Committed: Fri Jan 15 12:04:05 2016 +

--
 .../main/scala/org/apache/spark/graphx/Pregel.scala |  2 +-
 .../spark/graphx/lib/ConnectedComponents.scala  |  4 +++-
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 
 3 files changed, 20 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index b908860..7960827 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -151,7 +151,7 @@ object Pregel extends Logging {
   // count the iteration
   i += 1
 }
-
+messages.unpersist(blocking = false)
 g
   } // end of apply
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 859f896..f72cbb1 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -47,9 +47,11 @@ object ConnectedComponents {
   }
 }
 val initialMessage = Long.MaxValue
-Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
+val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = 
EdgeDirection.Either)(
   vprog = (id, attr, msg) => math.min(attr, msg),
   sendMsg = sendMessage,
   mergeMsg = (a, b) => math.min(a, b))
+ccGraph.unpersist()
+pregelGraph
   } // end of connectedComponents
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d0a5c32b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 1f5e27d..2fbc6f0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -428,4 +428,20 @@ class GraphSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("unpersist graph RDD") {
+withSpark { sc =>
+  val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1)
+  val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 
1)
+  val g0 = Graph(vert, edges)
+  val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2)
+  val cc = g.connectedComponents()
+  assert(sc.getPersistentRDDs.nonEmpty)
+  cc.unpersist()
+  g.unpersist()
+  g0.unpersist()
+  vert.unpersist()
+  edges.unpersist()
+  assert(sc.getPersistentRDDs.isEmpty)
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org