Repository: spark
Updated Branches:
  refs/heads/branch-2.0 00a48075a -> 48a8dc8b8


[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD

## What changes were proposed in this pull request?

EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these 
to the internal partitionRDD. So when checkpoint() is called on them, its the 
partitionRDD that actually gets checkpointed. However since isCheckpointed() 
also overridden to call partitionRDD.isCheckpointed, 
EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually 
not checkpointed.

This would have been fine except the RDD's internal logic for computing the RDD 
depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is 
true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD 
even though they are not actually checkpointed. Through a crazy sequence of 
call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it 
to types in Vertex/EdgeRDD. This leads to ClassCastException.

The minimal fix that does not change any public behavior is to modify RDD 
internal to not use public override-able API for internal logic.
## How was this patch tested?

New unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #15396 from tdas/SPARK-14804.

(cherry picked from commit 47d5d0ddb06c7d2c86515d9556c41dc80081f560)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48a8dc8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48a8dc8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48a8dc8b

Branch: refs/heads/branch-2.0
Commit: 48a8dc8b856d90c369592ca5ea16618fe6edb2d5
Parents: 00a4807
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Jan 25 17:17:34 2017 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Jan 25 17:18:02 2017 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  5 ++--
 .../org/apache/spark/graphx/EdgeRDDSuite.scala  | 27 ++++++++++++++++++++
 .../apache/spark/graphx/VertexRDDSuite.scala    | 26 +++++++++++++++++++
 3 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/48a8dc8b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 7013396..cb0eac2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1581,14 +1581,15 @@ abstract class RDD[T: ClassTag](
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably 
or locally.
    */
-  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
+  def isCheckpointed: Boolean = isCheckpointedAndMaterialized
 
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably 
or locally.
    * This is introduced as an alias for `isCheckpointed` to clarify the 
semantics of the
    * return value. Exposed for testing.
    */
-  private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+  private[spark] def isCheckpointedAndMaterialized: Boolean =
+    checkpointData.exists(_.isCheckpointed)
 
   /**
    * Return whether this RDD is marked for local checkpointing.

http://git-wip-us.apache.org/repos/asf/spark/blob/48a8dc8b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index f1ecc9e..7a24e32 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("checkpointing") {
+    withSpark { sc =>
+      val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 
3), (2L, 3)))
+      val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      edges.checkpoint()
+
+      // EdgeRDD not yet checkpointed
+      assert(!edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(!edges.partitionsRDD.isCheckpointed)
+      assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = edges.collect().toSeq // force checkpointing
+
+      // EdgeRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(edges.partitionsRDD.isCheckpointed)
+      assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(edges.collect().toSeq ===  data) // test checkpointed RDD
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/48a8dc8b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 0bb9e0a..8e63043 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
 import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("checkpoint") {
+    withSpark { sc =>
+      val n = 100
+      val verts = vertices(sc, n)
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      verts.checkpoint()
+
+      // VertexRDD not yet checkpointed
+      assert(!verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(!verts.partitionsRDD.isCheckpointed)
+      assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = verts.collect().toSeq // force checkpointing
+
+      // VertexRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(verts.partitionsRDD.isCheckpointed)
+      assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(verts.collect().toSeq === data) // test checkpointed RDD
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to