git commit: [SPARK-1592][streaming] Automatically remove streaming input blocks
Repository: spark Updated Branches: refs/heads/master 35e3d199f - 526a518bf [SPARK-1592][streaming] Automatically remove streaming input blocks The raw input data is stored as blocks in BlockManagers. Earlier they were cleared by cleaner ttl. Now since streaming does not require cleaner TTL to be set, the block would not get cleared. This increases up the Spark's memory usage, which is not even accounted and shown in the Spark storage UI. It may cause the data blocks to spill over to disk, which eventually slows down the receiving of data (persisting to memory become bottlenecked by writing to disk). The solution in this PR is to automatically remove those blocks. The mechanism to keep track of which BlockRDDs (which has presents the raw data blocks as a RDD) can be safely cleared already exists. Just use it to explicitly remove blocks from BlockRDDs. Author: Tathagata Das tathagata.das1...@gmail.com Closes #512 from tdas/block-rdd-unpersist and squashes the following commits: d25e610 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist 5f46d69 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist 2c320cd [Tathagata Das] Updated configuration with spark.streaming.unpersist setting. 2d4b2fd [Tathagata Das] Automatically removed input blocks Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/526a518b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/526a518b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/526a518b Branch: refs/heads/master Commit: 526a518bf32ad55b926a26f16086f445fd0ae29f Parents: 35e3d19 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Apr 24 18:18:22 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Apr 24 18:18:22 2014 -0700 -- .../scala/org/apache/spark/rdd/BlockRDD.scala | 45 ++-- docs/configuration.md | 7 +- .../scala/org/apache/spark/streaming/Time.scala | 2 +- .../spark/streaming/dstream/DStream.scala | 16 - .../spark/streaming/BasicOperationsSuite.scala | 76 +++- .../spark/streaming/InputStreamsSuite.scala | 13 .../spark/streaming/NetworkReceiverSuite.scala | 1 + 7 files changed, 135 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/526a518b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index e6c4a6d..c64da88 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -19,24 +19,30 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark._ import org.apache.spark.storage.{BlockId, BlockManager} +import scala.Some private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition { val index = idx } private[spark] -class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId]) +class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) + @volatile private var _isValid = true - override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i = { -new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] - }).toArray + override def getPartitions: Array[Partition] = { +assertValid() +(0 until blockIds.size).map(i = { + new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] +}).toArray + } override def compute(split: Partition, context: TaskContext): Iterator[T] = { +assertValid() val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { @@ -47,7 +53,36 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId } override def getPreferredLocations(split: Partition): Seq[String] = { +assertValid() locations_(split.asInstanceOf[BlockRDDPartition].blockId) } + + /** + * Remove the data blocks that this BlockRDD is made from. NOTE: This is an + * irreversible operation, as the data in the blocks cannot be recovered back + * once removed. Use it with caution. + */ + private[spark] def removeBlocks() { +blockIds.foreach { blockId = + sc.env.blockManager.master.removeBlock(blockId) +} +_isValid = false + } + +
git commit: [SPARK-1592][streaming] Automatically remove streaming input blocks
Repository: spark Updated Branches: refs/heads/branch-1.0 521d43599 - a3b6d8523 [SPARK-1592][streaming] Automatically remove streaming input blocks The raw input data is stored as blocks in BlockManagers. Earlier they were cleared by cleaner ttl. Now since streaming does not require cleaner TTL to be set, the block would not get cleared. This increases up the Spark's memory usage, which is not even accounted and shown in the Spark storage UI. It may cause the data blocks to spill over to disk, which eventually slows down the receiving of data (persisting to memory become bottlenecked by writing to disk). The solution in this PR is to automatically remove those blocks. The mechanism to keep track of which BlockRDDs (which has presents the raw data blocks as a RDD) can be safely cleared already exists. Just use it to explicitly remove blocks from BlockRDDs. Author: Tathagata Das tathagata.das1...@gmail.com Closes #512 from tdas/block-rdd-unpersist and squashes the following commits: d25e610 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist 5f46d69 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist 2c320cd [Tathagata Das] Updated configuration with spark.streaming.unpersist setting. 2d4b2fd [Tathagata Das] Automatically removed input blocks (cherry picked from commit 526a518bf32ad55b926a26f16086f445fd0ae29f) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3b6d852 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3b6d852 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3b6d852 Branch: refs/heads/branch-1.0 Commit: a3b6d852337c5c4c9afd4942699f35baaa6f691d Parents: 521d435 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Apr 24 18:18:22 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Apr 24 18:18:47 2014 -0700 -- .../scala/org/apache/spark/rdd/BlockRDD.scala | 45 ++-- docs/configuration.md | 7 +- .../scala/org/apache/spark/streaming/Time.scala | 2 +- .../spark/streaming/dstream/DStream.scala | 16 - .../spark/streaming/BasicOperationsSuite.scala | 76 +++- .../spark/streaming/InputStreamsSuite.scala | 13 .../spark/streaming/NetworkReceiverSuite.scala | 1 + 7 files changed, 135 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a3b6d852/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index e6c4a6d..c64da88 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -19,24 +19,30 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark._ import org.apache.spark.storage.{BlockId, BlockManager} +import scala.Some private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition { val index = idx } private[spark] -class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId]) +class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) + @volatile private var _isValid = true - override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i = { -new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] - }).toArray + override def getPartitions: Array[Partition] = { +assertValid() +(0 until blockIds.size).map(i = { + new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] +}).toArray + } override def compute(split: Partition, context: TaskContext): Iterator[T] = { +assertValid() val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { @@ -47,7 +53,36 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId } override def getPreferredLocations(split: Partition): Seq[String] = { +assertValid() locations_(split.asInstanceOf[BlockRDDPartition].blockId) } + + /** + * Remove the data blocks that this BlockRDD is made from. NOTE: This is an + * irreversible operation, as the data in the blocks cannot be recovered back + * once removed. Use it with caution. + */ + private[spark] def