Repository: spark
Updated Branches:
  refs/heads/master 965c82d8c -> 47d5d0ddb


[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD

## What changes were proposed in this pull request?

EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these 
to the internal partitionRDD. So when checkpoint() is called on them, its the 
partitionRDD that actually gets checkpointed. However since isCheckpointed() 
also overridden to call partitionRDD.isCheckpointed, 
EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually 
not checkpointed.

This would have been fine except the RDD's internal logic for computing the RDD 
depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is 
true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD 
even though they are not actually checkpointed. Through a crazy sequence of 
call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it 
to types in Vertex/EdgeRDD. This leads to ClassCastException.

The minimal fix that does not change any public behavior is to modify RDD 
internal to not use public override-able API for internal logic.
## How was this patch tested?

New unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #15396 from tdas/SPARK-14804.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47d5d0dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47d5d0dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47d5d0dd

Branch: refs/heads/master
Commit: 47d5d0ddb06c7d2c86515d9556c41dc80081f560
Parents: 965c82d
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Jan 25 17:17:34 2017 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Jan 25 17:17:34 2017 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  5 ++--
 .../org/apache/spark/graphx/EdgeRDDSuite.scala  | 27 ++++++++++++++++++++
 .../apache/spark/graphx/VertexRDDSuite.scala    | 26 +++++++++++++++++++
 3 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47d5d0dd/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a7e01f3..0359508 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag](
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably 
or locally.
    */
-  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
+  def isCheckpointed: Boolean = isCheckpointedAndMaterialized
 
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably 
or locally.
    * This is introduced as an alias for `isCheckpointed` to clarify the 
semantics of the
    * return value. Exposed for testing.
    */
-  private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+  private[spark] def isCheckpointedAndMaterialized: Boolean =
+    checkpointData.exists(_.isCheckpointed)
 
   /**
    * Return whether this RDD is marked for local checkpointing.

http://git-wip-us.apache.org/repos/asf/spark/blob/47d5d0dd/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index f1ecc9e..7a24e32 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("checkpointing") {
+    withSpark { sc =>
+      val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 
3), (2L, 3)))
+      val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      edges.checkpoint()
+
+      // EdgeRDD not yet checkpointed
+      assert(!edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(!edges.partitionsRDD.isCheckpointed)
+      assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = edges.collect().toSeq // force checkpointing
+
+      // EdgeRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(edges.partitionsRDD.isCheckpointed)
+      assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(edges.collect().toSeq ===  data) // test checkpointed RDD
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47d5d0dd/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 0bb9e0a..8e63043 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
 import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("checkpoint") {
+    withSpark { sc =>
+      val n = 100
+      val verts = vertices(sc, n)
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      verts.checkpoint()
+
+      // VertexRDD not yet checkpointed
+      assert(!verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(!verts.partitionsRDD.isCheckpointed)
+      assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = verts.collect().toSeq // force checkpointing
+
+      // VertexRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(verts.partitionsRDD.isCheckpointed)
+      assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(verts.collect().toSeq === data) // test checkpointed RDD
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to