git commit: [SPARK-1592][streaming] Automatically remove streaming input blocks

2014-04-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 35e3d199f - 526a518bf


[SPARK-1592][streaming] Automatically remove streaming input blocks

The raw input data is stored as blocks in BlockManagers. Earlier they were 
cleared by cleaner ttl. Now since streaming does not require cleaner TTL to be 
set, the block would not get cleared. This increases up the Spark's memory 
usage, which is not even accounted and shown in the Spark storage UI. It may 
cause the data blocks to spill over to disk, which eventually slows down the 
receiving of data (persisting to memory become bottlenecked by writing to disk).

The solution in this PR is to automatically remove those blocks. The mechanism 
to keep track of which BlockRDDs (which has presents the raw data blocks as a 
RDD) can be safely cleared already exists. Just use it to explicitly remove 
blocks from BlockRDDs.

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #512 from tdas/block-rdd-unpersist and squashes the following commits:

d25e610 [Tathagata Das] Merge remote-tracking branch 'apache/master' into 
block-rdd-unpersist
5f46d69 [Tathagata Das] Merge remote-tracking branch 'apache/master' into 
block-rdd-unpersist
2c320cd [Tathagata Das] Updated configuration with spark.streaming.unpersist 
setting.
2d4b2fd [Tathagata Das] Automatically removed input blocks


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/526a518b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/526a518b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/526a518b

Branch: refs/heads/master
Commit: 526a518bf32ad55b926a26f16086f445fd0ae29f
Parents: 35e3d19
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Apr 24 18:18:22 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Apr 24 18:18:22 2014 -0700

--
 .../scala/org/apache/spark/rdd/BlockRDD.scala   | 45 ++--
 docs/configuration.md   |  7 +-
 .../scala/org/apache/spark/streaming/Time.scala |  2 +-
 .../spark/streaming/dstream/DStream.scala   | 16 -
 .../spark/streaming/BasicOperationsSuite.scala  | 76 +++-
 .../spark/streaming/InputStreamsSuite.scala | 13 
 .../spark/streaming/NetworkReceiverSuite.scala  |  1 +
 7 files changed, 135 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/526a518b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index e6c4a6d..c64da88 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -19,24 +19,30 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark._
 import org.apache.spark.storage.{BlockId, BlockManager}
+import scala.Some
 
 private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends 
Partition {
   val index = idx
 }
 
 private[spark]
-class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: 
Array[BlockId])
+class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val 
blockIds: Array[BlockId])
   extends RDD[T](sc, Nil) {
 
   @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, 
SparkEnv.get)
+  @volatile private var _isValid = true
 
-  override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i 
= {
-new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
-  }).toArray
+  override def getPartitions: Array[Partition] = {
+assertValid()
+(0 until blockIds.size).map(i = {
+  new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
+}).toArray
+  }
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+assertValid()
 val blockManager = SparkEnv.get.blockManager
 val blockId = split.asInstanceOf[BlockRDDPartition].blockId
 blockManager.get(blockId) match {
@@ -47,7 +53,36 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient 
blockIds: Array[BlockId
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
+assertValid()
 locations_(split.asInstanceOf[BlockRDDPartition].blockId)
   }
+
+  /**
+   * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
+   * irreversible operation, as the data in the blocks cannot be recovered back
+   * once removed. Use it with caution.
+   */
+  private[spark] def removeBlocks() {
+blockIds.foreach { blockId =
+  sc.env.blockManager.master.removeBlock(blockId)
+}
+_isValid = false
+  }
+
+ 

git commit: [SPARK-1592][streaming] Automatically remove streaming input blocks

2014-04-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 521d43599 - a3b6d8523


[SPARK-1592][streaming] Automatically remove streaming input blocks

The raw input data is stored as blocks in BlockManagers. Earlier they were 
cleared by cleaner ttl. Now since streaming does not require cleaner TTL to be 
set, the block would not get cleared. This increases up the Spark's memory 
usage, which is not even accounted and shown in the Spark storage UI. It may 
cause the data blocks to spill over to disk, which eventually slows down the 
receiving of data (persisting to memory become bottlenecked by writing to disk).

The solution in this PR is to automatically remove those blocks. The mechanism 
to keep track of which BlockRDDs (which has presents the raw data blocks as a 
RDD) can be safely cleared already exists. Just use it to explicitly remove 
blocks from BlockRDDs.

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #512 from tdas/block-rdd-unpersist and squashes the following commits:

d25e610 [Tathagata Das] Merge remote-tracking branch 'apache/master' into 
block-rdd-unpersist
5f46d69 [Tathagata Das] Merge remote-tracking branch 'apache/master' into 
block-rdd-unpersist
2c320cd [Tathagata Das] Updated configuration with spark.streaming.unpersist 
setting.
2d4b2fd [Tathagata Das] Automatically removed input blocks

(cherry picked from commit 526a518bf32ad55b926a26f16086f445fd0ae29f)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3b6d852
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3b6d852
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3b6d852

Branch: refs/heads/branch-1.0
Commit: a3b6d852337c5c4c9afd4942699f35baaa6f691d
Parents: 521d435
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Apr 24 18:18:22 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Apr 24 18:18:47 2014 -0700

--
 .../scala/org/apache/spark/rdd/BlockRDD.scala   | 45 ++--
 docs/configuration.md   |  7 +-
 .../scala/org/apache/spark/streaming/Time.scala |  2 +-
 .../spark/streaming/dstream/DStream.scala   | 16 -
 .../spark/streaming/BasicOperationsSuite.scala  | 76 +++-
 .../spark/streaming/InputStreamsSuite.scala | 13 
 .../spark/streaming/NetworkReceiverSuite.scala  |  1 +
 7 files changed, 135 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a3b6d852/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index e6c4a6d..c64da88 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -19,24 +19,30 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark._
 import org.apache.spark.storage.{BlockId, BlockManager}
+import scala.Some
 
 private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends 
Partition {
   val index = idx
 }
 
 private[spark]
-class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: 
Array[BlockId])
+class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val 
blockIds: Array[BlockId])
   extends RDD[T](sc, Nil) {
 
   @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, 
SparkEnv.get)
+  @volatile private var _isValid = true
 
-  override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i 
= {
-new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
-  }).toArray
+  override def getPartitions: Array[Partition] = {
+assertValid()
+(0 until blockIds.size).map(i = {
+  new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
+}).toArray
+  }
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+assertValid()
 val blockManager = SparkEnv.get.blockManager
 val blockId = split.asInstanceOf[BlockRDDPartition].blockId
 blockManager.get(blockId) match {
@@ -47,7 +53,36 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient 
blockIds: Array[BlockId
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
+assertValid()
 locations_(split.asInstanceOf[BlockRDDPartition].blockId)
   }
+
+  /**
+   * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
+   * irreversible operation, as the data in the blocks cannot be recovered back
+   * once removed. Use it with caution.
+   */
+  private[spark] def