[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-04-18 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r60153562
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
--- End diff --

Just ran into another bug related to test code from this PR! This part of 
the test should guarantee that the block actually won't fit, either by making 
the block size absolutely huge (e.g. 5 megabytes) or by pushing asserts into 
`testRecordcount`. As it stands now, this test doesn't exercise the right thing 
because the block actually _is_ stored in master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52854361
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -57,10 +56,12 @@ class ReceivedBlockHandlerSuite
   val serializer = new KryoSerializer(conf)
   val manualClock = new ManualClock
   val blockManagerSize = 1000
+  val blockManagerBuffer = new ArrayBuffer[BlockManager]()
--- End diff --

This seems like a huge anti-pattern to me! Unless you're testing the 
interaction of multiple block managers, creating a a buffer to manage cleanup 
seems to suggest bad test design to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52854321
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ByteBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+  maxMem: Long,
+  conf: SparkConf,
+  name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+val transfer = new NioBlockTransferService(conf, securityMgr)
+val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+m

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52854308
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite
 (blockIds, storeResults)
   }
 
+  /** Store single block using a handler */
+  private def storeSingleBlock(
+  handler: ReceivedBlockHandler,
+  block: ReceivedBlock
+): (StreamBlockId, ReceivedBlockStoreResult) = {
+val blockId = generateBlockId
+val blockStoreResult = handler.storeBlock(blockId, block)
+logDebug("Done inserting")
--- End diff --

This seems like a very low-usefulness log message. At the very least I 
would have included the `blockId` and the outcome of storing (success or 
failure).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52854271
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
--- End diff --

Kind of weird to assign the exception to a variable and then not write any 
asserts over the exception or its message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52854252
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
--- End diff --

This method of switching the handlers in tests is also bad because it 
doesn't generalize to handle a third type of handler. Instead, I think it would 
have been better to just pass in either `withWriteAheadLogBasedBlockHandler` or 
`withWriteAheadLogBasedBlockHandler` into some function, since both of those 
methods are instances of a common function supertype. Higher-order functions 
are a great language feature and should be used more in test code where 
appropriate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52854187
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
--- End diff --

I think that this is a bad function name: if I'm only reading this line and 
am not jumping around in the IDE, how I am I supposed to know what this boolean 
parameter denotes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52854163
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
--- End diff --

This is an example of a testing anti-pattern which I've seen a lot: this 
test is actually testing two completely independent cases, one with a 
BlockManager-based block handler and another with a WAL-based handler. As a 
result, this deserves to be _two_ `test` cases so that they can fail 
independently. As things stand now, a bug could introduce a change which causes 
only the BlockManager case to fail and that would cause the second test to be 
skipped rather than run.

The fact that this test is four lines and half of the lines are comments 
explaining the two cases is another hint that this should be two separate 
tests, since then the name in `test()` could explain the cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52853940
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ByteBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+  maxMem: Long,
+  conf: SparkConf,
+  name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+val transfer = new NioBlockTransferService(conf, securityMgr)
+val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+m

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52853898
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ByteBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+  maxMem: Long,
+  conf: SparkConf,
+  name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+val transfer = new NioBlockTransferService(conf, securityMgr)
+val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+m

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r52853882
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
--- End diff --

I think that this is a messy / bad way of writing this test. If we want to 
test a whole bunch of cases, then I think it would make sense to use a for-loop 
to build up some test cases. I'd also omit the `//` comments here, since 
they're not helpful to me as a reader.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2016-02-14 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-184022738
  
I know it's about half a year too late to be complaining about this, but 
the formatting and control flow in this patch's tests make things really hard 
to debug and there's a lot of code formatting issues here. Next time, I think 
we should push for stronger test code quality on patches like this one, since 
this one is a giant mess and is very unpleasant to try to understand / edit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-19 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-113656399
  
Thank you very much for this patch. This was a very important one,
especially the tests.

On Thu, Jun 18, 2015 at 8:02 PM, asfgit  wrote:

> Closed #6707  via 3eaed87
> 

> .
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-113354439
  
Also merged to branch 1.4, available in Spark 1.4.1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-113353258
  
I merged it to master, will be available in Spark 1.5. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/6707


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32798195
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ByteBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+  maxMem: Long,
+  conf: SparkConf,
+  name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+val transfer = new NioBlockTransferService(conf, securityMgr)
+val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+manage

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32798076
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ByteBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+  maxMem: Long,
+  conf: SparkConf,
+  name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+val transfer = new NioBlockTransferService(conf, securityMgr)
+val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+manage

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32798033
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ByteBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+  maxMem: Long,
+  conf: SparkConf,
+  name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+val transfer = new NioBlockTransferService(conf, securityMgr)
+val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+manage

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32797903
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("Test Block - count messages") {
+// Test count with BlockManagedBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(true)
+// Test count with WriteAheadLogBasedBlockHandler
+testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+sparkConf.set("spark.storage.unrollFraction", "0.4")
+// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+blockManager = createBlockManager(12000, sparkConf)
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to WAL
+// and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block in MEMORY,
+// But BlockManager will be able to sereliaze this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+// there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+// BlockManager will not be able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+storageLevel = StorageLevel.MEMORY_ONLY
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+  }
+}
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ByteBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-MEMORY_ONLY_SER
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+  maxMem: Long,
+  conf: SparkConf,
+  name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+val transfer = new NioBlockTransferService(conf, securityMgr)
+val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+manage

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32796888
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -251,9 +377,20 @@ class ReceivedBlockHandlerSuite
 (blockIds, storeResults)
   }
 
+  /** Store block using a handler */
+  private def storeBlock(
--- End diff --

Yeah, I know that. I had meant use that for single block. Could have done 
```
def storeSingleBlock(params): (.., ...) = {
  storeBlocks(params).head
}
```
Would have further deduped code. But this is a nit. This is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-18 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-113174385
  
hi @tdas . Let me know if latest changes are fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-111733856
  
hi @tdas kindly let me know how this looks. Just to mention the global 
state won't get changed if individual test cases modify the blockmanager or 
storagelevel as Before block reset back to default settings before each test 
case run. Tried to incorporate all comments , do let me know if this is fine. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32370361
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369729
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
+
+  private def createBlockManager(
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369727
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
+val block = List.fill(70)(new Array[Byte](100))
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block With MEMORY_ONLY StorageLevel. BlockManager will not be 
able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  }
+  assert(thrown.getMessage ===
+"Could not store input-1-1000 to block manager with storage level 
" + storageLevel)
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But BlockManager will be able to sereliaze 
this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But Bl

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369721
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
+val block = List.fill(70)(new Array[Byte](100))
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block With MEMORY_ONLY StorageLevel. BlockManager will not be 
able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  }
+  assert(thrown.getMessage ===
+"Could not store input-1-1000 to block manager with storage level 
" + storageLevel)
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But BlockManager will be able to sereliaze 
this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But Bl

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369724
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
   /** Generate blocks infos using random ids */
   def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
 List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
-  BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)
+  BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)), Some(0L
--- End diff --

this is not an issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369704
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
+val block = List.fill(70)(new Array[Byte](100))
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block With MEMORY_ONLY StorageLevel. BlockManager will not be 
able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  }
+  assert(thrown.getMessage ===
+"Could not store input-1-1000 to block manager with storage level 
" + storageLevel)
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") {
+blockManager = createBlockManager(12000)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369701
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
--- End diff --

implemented as you suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369697
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -151,12 +165,17 @@ private[streaming] class 
WriteAheadLogBasedBlockHandler(
*/
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+var numRecords = None: Option[Long]
 // Serialize the block so that it can be inserted into both
 val serializedBlock = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+  numRecords = Some(arrayBuffer.size.toLong)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369699
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369693
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
--- End diff --

implemented as you suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369255
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
--- End diff --

I can even set the StorageLevel back to default value in Before block which 
can reset the global variable before every test. Anyone write new test can see 
the before block to check what is global state. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369028
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -251,9 +377,20 @@ class ReceivedBlockHandlerSuite
 (blockIds, storeResults)
   }
 
+  /** Store block using a handler */
+  private def storeBlock(
--- End diff --

existing storeBlocks take Sequence of blocks and return Sequence of blockId 
and BlockResult. In my test cases I need to count messages in single Block. 
Probably can rename it to storeSingleBlock which will be much intuitive . Also 
I can generate random block id there also using generateBlockId


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369010
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
--- End diff --

and before block create a global BlockManager before every test..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-13 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32369007
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
--- End diff --

Why you think its a leak ? blockManager is defined as var and there is 
already a after block which does close the BlockManager after every test. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32367807
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
   /** Generate blocks infos using random ids */
   def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
 List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
-  BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)
+  BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)), Some(0L
--- End diff --

I did not change the Random.nextInt. Just added 0 as 
BlockManagerBasedStoreResult now takes the recordCount. This testcase is for 
testing the recordcount 0 which @zsxwing did, which I changed after the merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-111631847
  
Overall, the code change looks good. The additional tests need a bit more 
refactoring to be perfect and leak-proof. Thank you very very much for all this 
extra effort to implement this much needed array of tests. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32359635
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
+
+  private def createBlockManager(
--- End diff --

Also move this definition below in the file. All the private defs are 
defined below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32359579
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
+
+  private def createBlockManager(
--- End diff --

There are two places where BlockManager instantiation logic is present (10 
lines below is the other one). Can you consolidate for cleanliness. That is, 
make the other one use createBlockManager.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32359354
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
--- End diff --

Aah I see what you have done. It was val but you made it a var so that it 
can be modified to set a different storage level which 
`withXYZBasedBlockHandler` can use. This is non-intuitive and changes global 
state which can affect other tests. Not a good idea. Rather make the 
storageLevel an *optional* parameter in `withXYZBasedBlockHandler` and the 
default value being the earlier global value `StorageLevel.MEMORY_ONLY_SER`. 
Then we dont even need to have the global variable `storageLeval` at all. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32359042
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
--- End diff --

Also DO NOT OVERRIDE the global BlockManager var here. This changes the 
behavior of all subsequent tests that may get added later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32358886
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -251,9 +377,20 @@ class ReceivedBlockHandlerSuite
 (blockIds, storeResults)
   }
 
+  /** Store block using a handler */
+  private def storeBlock(
--- End diff --

Instead of adding this, please use the other `storeBlocks`. there is 
already semantics here that randomized blockIds etc to prevent any conflicts 
due to the same block id being used. So best to use this rather than use the 
same block id every time and mask errors. 

Also use the returned block Id to clean up the block later, in 
`testRecordCount`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32358833
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
+val block = List.fill(70)(new Array[Byte](100))
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block With MEMORY_ONLY StorageLevel. BlockManager will not be 
able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  }
+  assert(thrown.getMessage ===
+"Could not store input-1-1000 to block manager with storage level 
" + storageLevel)
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But BlockManager will be able to sereliaze 
this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But BlockM

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32358781
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
--- End diff --

Please add MEMORY_ONLY_SER as well. That has a slightly different code path 
underneath so best to check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32358598
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
   /** Generate blocks infos using random ids */
   def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
 List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
-  BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)
+  BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)), Some(0L
--- End diff --

Why change it to 0 from Random.nextInt? Let's not touch other testsuites 
without understanding the implications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32358204
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
+val block = List.fill(70)(new Array[Byte](100))
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block With MEMORY_ONLY StorageLevel. BlockManager will not be 
able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  }
+  assert(thrown.getMessage ===
+"Could not store input-1-1000 to block manager with storage level 
" + storageLevel)
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But BlockManager will be able to sereliaze 
this block to DISK
+// and hence count returns correct value.
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+blockManager = createBlockManager(12000)
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block in MEMORY, But BlockM

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32358037
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
+val block = List.fill(70)(new Array[Byte](100))
+// spark.storage.unrollFraction set to 0.4 for BlockManager
+// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is 
not enough space to store
+// this block With MEMORY_ONLY StorageLevel. BlockManager will not be 
able to unroll this block
+// and hence it will not tryToPut this block, resulting the 
SparkException
+withBlockManagerBasedBlockHandler { handler =>
+  val thrown = intercept[SparkException] {
+val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  }
+  assert(thrown.getMessage ===
+"Could not store input-1-1000 to block manager with storage level 
" + storageLevel)
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") {
+blockManager = createBlockManager(12000)
--- End diff --

You can merge these three tests as well into a single test. Then you can 
use one block manager to test all three cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32357897
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(true, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("WriteAheadLogBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => 
i.toByte))), blockManager, None)
+// ArrayBufferBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+// ArrayBufferBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+// ArrayBufferBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+// IteratorBlock-MEMORY_ONLY
+testRecordcount(false, StorageLevel.MEMORY_ONLY,
+  IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+// IteratorBlock-DISK_ONLY
+testRecordcount(false, StorageLevel.DISK_ONLY,
+  IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+// IteratorBlock-MEMORY_AND_DISK
+testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
+  IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
+storageLevel = StorageLevel.MEMORY_ONLY
+blockManager = createBlockManager(12000)
--- End diff --

This is a pretty subtle leak. The Block Manager created here is going to be 
never stopped. Rather than using `createBlockManager` directly, please make it 
a `withBlockManager` function (similar to `withBlockManagerBasedBlockHandler`) 
that will create the BlockManager, and delete it at the end in a `finally { }`. 
So that whenever there is an error, its gets cleaned up without affecting other 
tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32357707
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,120 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler - count messages") {
+// ByteBufferBlock-MEMORY_ONLY
+testRecordcount(true, StorageLevel.MEMORY_ONLY,
--- End diff --

This is so much better! But I realize that there is still duplication. 
These two tests are line by line exact. So I think these set of lines can be 
put in a function which just takes one parameter, 
isBlockManagedBasedBlockHandler. You can keep the name as `testRecordCounts`. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32357254
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
--- End diff --

Why do you need to define a `storageLevel` var here? I dont see any usage 
that requires defining a global var.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32354875
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -151,12 +165,17 @@ private[streaming] class 
WriteAheadLogBasedBlockHandler(
*/
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+var numRecords = None: Option[Long]
 // Serialize the block so that it can be inserted into both
 val serializedBlock = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+  numRecords = Some(arrayBuffer.size.toLong)
--- End diff --

Incorrect formatting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-111485838
  
hi @tdas . Implemented all your comments. Let me know if this is fine. I 
made the changes to count ByteBufferBlock as None. If there are further debate 
over it , will address it . 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32305234
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -79,7 +93,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
   throw new SparkException(
 s"Could not store $blockId to block manager with storage level 
$storageLevel")
 }
-BlockManagerBasedStoreResult(blockId)
+BlockManagerBasedStoreResult(blockId, numRecords)
--- End diff --

Both are wrong but counting it as zero means we end up suggesting no data 
was received. I personally think they should be counted as 1, we can doc it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32298574
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -199,3 +223,16 @@ private[streaming] object 
WriteAheadLogBasedBlockHandler {
 new Path(checkpointDir, new Path("receivedData", 
streamId.toString)).toString
   }
 }
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends 
Iterator[T] {
+   var count = 0
+   def hasNext(): Boolean = iterator.hasNext
+   def isFullyConsumed: Boolean = !iterator.hasNext
--- End diff --

Ok , will do 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32298168
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -151,13 +167,21 @@ private[streaming] class 
WriteAheadLogBasedBlockHandler(
*/
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+var numRecords = None: Option[Long]
 // Serialize the block so that it can be inserted into both
 val serializedBlock = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+val countIterator = new CountingIterator(arrayBuffer.iterator)
--- End diff --

Ok. Will change back to as it was earlier. But if we do not go through 
CountingIterator we may not know if Block is fullyConsumed or not. But we are 
not using isFullyConsumed presently , so this should be fine. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32297750
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -199,3 +223,16 @@ private[streaming] object 
WriteAheadLogBasedBlockHandler {
 new Path(checkpointDir, new Path("receivedData", 
streamId.toString)).toString
   }
 }
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends 
Iterator[T] {
+   var count = 0
+   def hasNext(): Boolean = iterator.hasNext
+   def isFullyConsumed: Boolean = !iterator.hasNext
--- End diff --

As a good design pattern, could you put the count as a private variable 
(rename as _count) and then make public def count() which checks if it is fully 
consumed and returns Some(_count) or None?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32297611
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,183 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
--- End diff --

I had commented on this in the previous PR. This is a lot of duplicate 
code! You can put all of this in a simple function like 

```
def testRecordcount(isBlockManagedBasedBlockHandler: Boolean, storageLevel: 
StorageLevel, receivedBlock: ReceivedBlock, expectedNumRecords: Int) {
   if (isBlockManagedBasedBlockHandler) {
  // test received block with BlockManager based handler
   } else {
  // test received block with WAL based handler
   }
}

Then you can just use it in a loop, and turn the whole thing into just a 
couple of unit tests - one with BMBased, and another with WALBased.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32297179
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,183 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-DISK_ONLY-IteratorBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block =

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32297172
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,183 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-DISK_ONLY-IteratorBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block =

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32297159
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +186,183 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+val bytes = Array.tabulate(100)(i => i.toByte)
+val byteBufferBlock = ByteBuffer.wrap(bytes)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
ByteBufferBlock(byteBufferBlock))
+  // ByteBufferBlock is counted as single record
+  assert(blockStoreResult.numRecords === Some(1))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-DISK_ONLY-IteratorBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block = ArrayBuffer.fill(100)(0)
+withBlockManagerBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, 
IteratorBlock(block.iterator))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count 
messages") {
+storageLevel = StorageLevel.DISK_ONLY
+val block = ArrayBuffer.fill(100)(0)
+withWriteAheadLogBasedBlockHandler { handler =>
+  val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
+  assert(blockStoreResult.numRecords === Some(100))
+}
+  }
+
+  test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - 
count messages") {
+storageLevel = StorageLevel.MEMORY_AND_DISK
+val block =

[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32296915
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -151,13 +167,21 @@ private[streaming] class 
WriteAheadLogBasedBlockHandler(
*/
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+var numRecords = None: Option[Long]
 // Serialize the block so that it can be inserted into both
 val serializedBlock = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+val countIterator = new CountingIterator(arrayBuffer.iterator)
--- End diff --

For ArrayBufferBlock, isnt it just easier to just get the count directly 
rather than going through the CountingIterator again? Why even add the slightly 
inefficiency of going through the counting iterator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32296746
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -79,7 +93,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
   throw new SparkException(
 s"Could not store $blockId to block manager with storage level 
$storageLevel")
 }
-BlockManagerBasedStoreResult(blockId)
+BlockManagerBasedStoreResult(blockId, numRecords)
--- End diff --

The bytebuffer is expected to have multiple serialized records in it, so I 
think it is more wrong to count it as 1. So None is better than Some(1)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r32296587
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
  * that stores the metadata related to storage of blocks using
  * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
  */
-private[streaming] case class BlockManagerBasedStoreResult(blockId: 
StreamBlockId)
+private[streaming] case class BlockManagerBasedStoreResult(blockId: 
StreamBlockId,
--- End diff --

nit (only if there are other changes): can you move both params to the next 
line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-12 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-111389446
  
This PR looks good to me. I feel a bit weird about counting ByteBufferBlock 
as 1, but I cannot find a better solution. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-11 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-111383946
  
hi @tdas @zsxwing @harishreedharan is this PR okay with you ? Just a 
followup if there is anything needs to be done. I know you all must be super 
busy with 1.4 release ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-09 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-110552909
  
Make the changes as @harishreedharan suggested to count ByteBufferBlock as 
1 count. Let me know if this looks fine .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31982518
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -79,7 +93,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
   throw new SparkException(
 s"Could not store $blockId to block manager with storage level 
$storageLevel")
 }
-BlockManagerBasedStoreResult(blockId)
+BlockManagerBasedStoreResult(blockId, numRecords)
--- End diff --

@tdas @zsxwing what you think ? Is it fine to count ByteBufferBlock as 1 
count ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-110224537
  
taken care couple of comments given by @harishreedharan
Not sure what to do with ByteBuffer case as there is no way to count number 
of messages in a ByteBufferBlock


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31981074
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, 
SparkException}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
-  def blockId: StreamBlockId  // Any implementation of this trait will 
store a block id
+  // Any implementation of this trait will store a block id
+  def blockId: StreamBlockId
+  // Any implementation of this trait will have to return the number of 
records
+  def numRecords: Option[Long]
--- End diff --

Ah, ok. I just find the `num*` method calls weird, when it could be called 
`*count`. But if it is consistent with everything else, then it is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31981048
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -79,7 +93,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
   throw new SparkException(
 s"Could not store $blockId to block manager with storage level 
$storageLevel")
 }
-BlockManagerBasedStoreResult(blockId)
+BlockManagerBasedStoreResult(blockId, numRecords)
--- End diff --

Well, technically it is a single record - though I agree that is not 
exactly right either, but it must count as at least 1, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31976730
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
+
+  private def makeBlockManager(
--- End diff --

Sure. will change it 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31976427
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -199,3 +221,16 @@ private[streaming] object 
WriteAheadLogBasedBlockHandler {
 new Path(checkpointDir, new Path("receivedData", 
streamId.toString)).toString
   }
 }
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends 
Iterator[T] {
+   var count = 0
+   def hasNext(): Boolean = iterator.hasNext
+   def isFullyConsumed: Boolean = !iterator.hasNext
+   def next(): T = {
+count+=1
--- End diff --

Will change it ..thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31976373
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, 
SparkException}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
-  def blockId: StreamBlockId  // Any implementation of this trait will 
store a block id
+  // Any implementation of this trait will store a block id
+  def blockId: StreamBlockId
+  // Any implementation of this trait will have to return the number of 
records
+  def numRecords: Option[Long]
--- End diff --

For all other place where count is recorded (refer to this PR 
https://github.com/apache/spark/pull/6659/files), it call as numRecords. Just 
wanted to keep this consistent naming across all classes.  



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31976054
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -79,7 +93,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
   throw new SparkException(
 s"Could not store $blockId to block manager with storage level 
$storageLevel")
 }
-BlockManagerBasedStoreResult(blockId)
+BlockManagerBasedStoreResult(blockId, numRecords)
--- End diff --

But how we can count ByteBufferBlock ? if you count one block as 1 message, 
that is also wrong. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-110146521
  
This looks really good. I posted some comments - mostly just minor (except 
one, which relates to counts for `ByteBufferBlock`s)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31961427
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -62,6 +61,19 @@ class ReceivedBlockHandlerSuite
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
   var tempDirectory: File = null
+  var storageLevel = StorageLevel.MEMORY_ONLY_SER
+
+  private def makeBlockManager(
--- End diff --

Can we call `createBlockManager`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31961338
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -199,3 +221,16 @@ private[streaming] object 
WriteAheadLogBasedBlockHandler {
 new Path(checkpointDir, new Path("receivedData", 
streamId.toString)).toString
   }
 }
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends 
Iterator[T] {
+   var count = 0
+   def hasNext(): Boolean = iterator.hasNext
+   def isFullyConsumed: Boolean = !iterator.hasNext
+   def next(): T = {
+count+=1
--- End diff --

nit: `count += 1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31961280
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -151,12 +166,19 @@ private[streaming] class 
WriteAheadLogBasedBlockHandler(
*/
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+var numRecords = None: Option[Long]
 // Serialize the block so that it can be inserted into both
 val serializedBlock = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+val countIterator = new CountingIterator(arrayBuffer.iterator)
+val serializedBlock = blockManager.dataSerialize(blockId, 
countIterator)
+numRecords = Some(countIterator.count)
+serializedBlock
   case IteratorBlock(iterator) =>
-blockManager.dataSerialize(blockId, iterator)
+val countIterator = new CountingIterator(iterator)
+val serializedBlock = blockManager.dataSerialize(blockId, 
countIterator)
+numRecords = Some(countIterator.count)
+serializedBlock
   case ByteBufferBlock(byteBuffer) =>
--- End diff --

Same issue as before. How did we count `ByteBufferBlock`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31961193
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -79,7 +93,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
   throw new SparkException(
 s"Could not store $blockId to block manager with storage level 
$storageLevel")
 }
-BlockManagerBasedStoreResult(blockId)
+BlockManagerBasedStoreResult(blockId, numRecords)
--- End diff --

If a `ByteBufferBlock` is added, then this means that none of the data ever 
gets counted as a record. This is a bit of an issue, since the number of 
records being zero does not make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread harishreedharan
Github user harishreedharan commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31960457
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, 
SparkException}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
-  def blockId: StreamBlockId  // Any implementation of this trait will 
store a block id
+  // Any implementation of this trait will store a block id
+  def blockId: StreamBlockId
+  // Any implementation of this trait will have to return the number of 
records
+  def numRecords: Option[Long]
--- End diff --

This is just a preference thing I guess, but this would sound better if it 
was called `recordCount`, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-110086361
  
Hi @zsxwing, incorporated all your changes ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31934898
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -64,11 +68,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
   extends ReceivedBlockHandler with Logging {
 
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
+var numRecords = None: Option[Long]
+val countIterator = block match {
+  case ArrayBufferBlock(arrayBuffer) => new 
CountingIterator(arrayBuffer.iterator)
+  case IteratorBlock(iterator) => new CountingIterator(iterator)
+  case _ => null
+}
 val putResult: Seq[(BlockId, BlockStatus)] = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.putIterator(blockId, arrayBuffer.iterator, 
storageLevel, tellMaster = true)
+blockManager.putIterator(blockId, countIterator, storageLevel, 
tellMaster = true)
--- End diff --

that's true..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31934707
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -64,11 +68,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
   extends ReceivedBlockHandler with Logging {
 
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
+var numRecords = None: Option[Long]
+val countIterator = block match {
+  case ArrayBufferBlock(arrayBuffer) => new 
CountingIterator(arrayBuffer.iterator)
+  case IteratorBlock(iterator) => new CountingIterator(iterator)
+  case _ => null
+}
 val putResult: Seq[(BlockId, BlockStatus)] = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.putIterator(blockId, arrayBuffer.iterator, 
storageLevel, tellMaster = true)
+blockManager.putIterator(blockId, countIterator, storageLevel, 
tellMaster = true)
--- End diff --

If `SparkException` is thrown later, `numRecords` won't be used. Right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31933693
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -151,12 +165,18 @@ private[streaming] class 
WriteAheadLogBasedBlockHandler(
*/
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+var numRecords = None: Option[Long]
+val countIterator = block match {
+  case ArrayBufferBlock(arrayBuffer) => new 
CountingIterator(arrayBuffer.iterator)
+  case IteratorBlock(iterator) => new CountingIterator(iterator)
+  case _ => null
+}
 // Serialize the block so that it can be inserted into both
 val serializedBlock = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+blockManager.dataSerialize(blockId, countIterator)
--- End diff --

Yes, this is fine.. As for WAL based handler,  block count can be done here 
while it does dataSerialize..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31933485
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -64,11 +68,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
   extends ReceivedBlockHandler with Logging {
 
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
+var numRecords = None: Option[Long]
+val countIterator = block match {
+  case ArrayBufferBlock(arrayBuffer) => new 
CountingIterator(arrayBuffer.iterator)
+  case IteratorBlock(iterator) => new CountingIterator(iterator)
+  case _ => null
+}
 val putResult: Seq[(BlockId, BlockStatus)] = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.putIterator(blockId, arrayBuffer.iterator, 
storageLevel, tellMaster = true)
+blockManager.putIterator(blockId, countIterator, storageLevel, 
tellMaster = true)
--- End diff --

I think numRecords after putIterator will have issue if blockmanager not 
able to unroll the block safely to memory. In that case block-id will come as 
null and SparkException will be thrown. We should count the number of records 
only after block-id is there in putResult. Do let me know what you think. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-110050559
  
Sure.. Will do the change..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31928282
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -61,7 +60,21 @@ class ReceivedBlockHandlerSuite
   var rpcEnv: RpcEnv = null
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
+  var handler: ReceivedBlockHandler = null
--- End diff --

`handler` is not used. Right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31927599
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 ---
@@ -174,6 +187,172 @@ class ReceivedBlockHandlerSuite
 }
   }
 
+  test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count 
messages") {
+storageLevel = StorageLevel.MEMORY_ONLY
+// Create a non-trivial (not all zeros) byte array
+var counter = 0.toByte
+def incr: Byte = {counter = (counter + 1).toByte; counter;}
+val bytes = Array.fill[Byte](100)(incr)
--- End diff --

These 3 lines can be replaced by `val bytes = Array.tabulate(100)(i => 
i.toByte)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31927001
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -199,3 +222,16 @@ private[streaming] object 
WriteAheadLogBasedBlockHandler {
 new Path(checkpointDir, new Path("receivedData", 
streamId.toString)).toString
   }
 }
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T: Manifest](iterator: Iterator[T]) extends 
Iterator[T] {
--- End diff --

I think `Manifest` is redundant. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31926944
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -151,12 +165,18 @@ private[streaming] class 
WriteAheadLogBasedBlockHandler(
*/
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+var numRecords = None: Option[Long]
+val countIterator = block match {
+  case ArrayBufferBlock(arrayBuffer) => new 
CountingIterator(arrayBuffer.iterator)
+  case IteratorBlock(iterator) => new CountingIterator(iterator)
+  case _ => null
+}
 // Serialize the block so that it can be inserted into both
 val serializedBlock = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+blockManager.dataSerialize(blockId, countIterator)
--- End diff --

`dataSerialize` will consume the iterator. So you could create 
CountingIterator here and put numRecords = Some(countIterator.count) after 
`dataSerialize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/6707#discussion_r31926597
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
@@ -64,11 +68,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
   extends ReceivedBlockHandler with Logging {
 
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
+var numRecords = None: Option[Long]
+val countIterator = block match {
+  case ArrayBufferBlock(arrayBuffer) => new 
CountingIterator(arrayBuffer.iterator)
+  case IteratorBlock(iterator) => new CountingIterator(iterator)
+  case _ => null
+}
 val putResult: Seq[(BlockId, BlockStatus)] = block match {
   case ArrayBufferBlock(arrayBuffer) =>
-blockManager.putIterator(blockId, arrayBuffer.iterator, 
storageLevel, tellMaster = true)
+blockManager.putIterator(blockId, countIterator, storageLevel, 
tellMaster = true)
--- End diff --

Could you create `CountingIterator` here and put `numRecords = 
Some(countIterator.count)` after  `putIterator`? Then you could avoid matching 
`block` twice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6707#issuecomment-110035520
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
GitHub user dibbhatt opened a pull request:

https://github.com/apache/spark/pull/6707

[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct 
count at Spark UI

@tdas  @zsxwing this is the new PR for Spark-8080

I have merged https://github.com/apache/spark/pull/6659

Also to mention , for MEMORY_ONLY settings , when Block is not able to 
unrollSafely to memory if enough space is not there, BlockManager won't try to 
put the block and ReceivedBlockHandler will throw SparkException as it could 
not find the block id in PutResult. Thus number of records in block won't be 
counted if Block failed to unroll in memory. Which is fine. 

For MEMORY_DISK settings , if BlockManager not able to unroll block to 
memory, block will still get deseralized to Disk. Same for WAL based store. So 
for those cases ( storage level = memory + disk )  number of records will be 
counted even though the block not able to unroll to memory.

thus I added the isFullyConsumed in the CountingIterator but have not used 
it as such case will never happen that block not fully consumed and 
ReceivedBlockHandler still get the block ID.

I have added few test cases to cover those block unrolling scenarios also.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dibbhatt/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/6707.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6707


commit 01e6dc8ad9ac6353ef8e073b93a96bffb6e46ca6
Author: U-PEROOT\UBHATD1 
Date:   2015-06-08T14:17:16Z

A

commit 4c5931d660c6d0642dbb63c2340b24f5493e19d3
Author: Dibyendu Bhattacharya 
Date:   2015-06-08T15:04:04Z

[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct 
count at Spark UI




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt closed the pull request at:

https://github.com/apache/spark/pull/6614


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-08 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6614#issuecomment-109984395
  
Closing this PR. Will open a new one. 6614 got issue with merging from 
upstream/master and lot of unwanted files has come to this PR . Will open a new 
one 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-07 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/6614#issuecomment-109875293
  
I've no idea why it's screwed up. Feel free to open a new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-07 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6614#issuecomment-109862250
  
@tdas @zsxwing , if this PR is screwed up , we can close this one and 
create a fresh one ? How to proceed further ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8080][STREAMING] Receiver.store with It...

2015-06-07 Thread dibbhatt
Github user dibbhatt commented on the pull request:

https://github.com/apache/spark/pull/6614#issuecomment-109857699
  
hi @zsxwing ...No , I have made only commit for my changes . Not sure why 
it says 219 files changed. If you see the commits details , you can see only 
files related to this PR has changed. Just now I modified comments on a file to 
trigger the build once again. 

This 219 files changed came after I merged my repo from upstream/master to 
take your PR changes. And I merged only your changed with mine and committed 
those .

Below are the my changes since the merge...and I have not committed all 
these 219 files :(


https://github.com/dibbhatt/spark/commit/0892156a1f08e3092d924e57a77a871a4843b016

https://github.com/dibbhatt/spark/commit/c250fb5fb6a08d89b0aed05b5d3eec44b9991c01

https://github.com/apache/spark/commit/28225d58846e6c214fbc6f7e1ef09cda6b7cd94d



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >