Repository: spark
Updated Branches:
  refs/heads/master 4bd85d06e -> 0985d2c30


[SPARK-8707] RDD#toDebugString fails if any cached RDD has invalid partitions

Added numPartitions(evaluate: Boolean) to RDD. With "evaluate=true" the method 
is same with "partitions.length". With "evaluate=false", it checks checked-out 
or already evaluated partitions in the RDD to get number of partition. If it's 
not those cases, returns -1. RDDInfo.partitionNum calls numPartition only when 
it's accessed.

Author: navis.ryu <na...@apache.org>

Closes #7127 from navis/SPARK-8707.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0985d2c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0985d2c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0985d2c3

Branch: refs/heads/master
Commit: 0985d2c30e031f80892987f7c3581d15dd210303
Parents: 4bd85d0
Author: navis.ryu <na...@apache.org>
Authored: Wed Sep 2 22:11:11 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Sep 2 22:11:51 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++++-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala      | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0985d2c3/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7388870..cbfe8bf 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1516,8 +1516,12 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    */
   @DeveloperApi
   def getRDDStorageInfo: Array[RDDInfo] = {
+    getRDDStorageInfo(_ => true)
+  }
+
+  private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): 
Array[RDDInfo] = {
     assertNotStopped()
-    val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
+    val rddInfos = 
persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
     StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
     rddInfos.filter(_.isCached)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0985d2c3/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 081c721..7dd2bc5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1666,7 +1666,7 @@ abstract class RDD[T: ClassTag](
       import Utils.bytesToString
 
       val persistence = if (storageLevel != StorageLevel.NONE) 
storageLevel.description else ""
-      val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == 
rdd.id).map(info =>
+      val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info 
=>
         "    CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; 
DiskSize: %s".format(
           info.numCachedPartitions, bytesToString(info.memSize),
           bytesToString(info.externalBlockStoreSize), 
bytesToString(info.diskSize)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to