Repository: spark Updated Branches: refs/heads/master 4bd85d06e -> 0985d2c30
[SPARK-8707] RDD#toDebugString fails if any cached RDD has invalid partitions Added numPartitions(evaluate: Boolean) to RDD. With "evaluate=true" the method is same with "partitions.length". With "evaluate=false", it checks checked-out or already evaluated partitions in the RDD to get number of partition. If it's not those cases, returns -1. RDDInfo.partitionNum calls numPartition only when it's accessed. Author: navis.ryu <na...@apache.org> Closes #7127 from navis/SPARK-8707. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0985d2c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0985d2c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0985d2c3 Branch: refs/heads/master Commit: 0985d2c30e031f80892987f7c3581d15dd210303 Parents: 4bd85d0 Author: navis.ryu <na...@apache.org> Authored: Wed Sep 2 22:11:11 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed Sep 2 22:11:51 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++++- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0985d2c3/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7388870..cbfe8bf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1516,8 +1516,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi def getRDDStorageInfo: Array[RDDInfo] = { + getRDDStorageInfo(_ => true) + } + + private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = { assertNotStopped() - val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray + val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) rddInfos.filter(_.isCached) } http://git-wip-us.apache.org/repos/asf/spark/blob/0985d2c3/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 081c721..7dd2bc5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1666,7 +1666,7 @@ abstract class RDD[T: ClassTag]( import Utils.bytesToString val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else "" - val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info => + val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info => " CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format( info.numCachedPartitions, bytesToString(info.memSize), bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org