[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19130571
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on 
the driver.
*/
   def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+logInfo(sUnpersisting TorrentBroadcast $id)
--- End diff --

I don't feel super strongly over this one, but I feel given this is for 
debugging of exceptional cases, it should be in debug. If your worry is that 
the broadcast cleaner might clean up stuff prematurely, then I think we should 
log in the cleaner instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19130611
  
--- Diff: 
core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala ---
@@ -84,6 +89,24 @@ class BroadcastSuite extends FunSuite with 
LocalSparkContext {
 assert(results.collect().toSet === (1 to numSlaves).map(x = (x, 
10)).toSet)
   }
 
+  test(TorrentBroadcast's blockifyObject and unblockifyObject are 
inverses) {
+import org.apache.spark.broadcast.TorrentBroadcast._
+val blockSize = 1024
+val conf = new SparkConf()
+val compressionCodec = Some(new SnappyCompressionCodec(conf))
+val serializer = new JavaSerializer(conf)
+val objects = for (size - Gen.choose(1, 1024 * 10)) yield {
--- End diff --

as discussed offline, maybe just use a random number generator here since 
Gen brings extra complexity but not much benefit in this specific case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19130957
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on 
the driver.
*/
   def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+logInfo(sUnpersisting TorrentBroadcast $id)
--- End diff --

Its mostly for debugging what broadcasts have been removed and what has 
not. It can be probably be made debug once we have a UI for this (#2851), but 
right now this is the only way to figure out if a broadcast variable has been 
removed by looking at the driver logs.
Also its just one line per broadcast variable (we have 2-3 lines per 
variable when it is created)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19131009
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on 
the driver.
*/
   def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+logInfo(sUnpersisting TorrentBroadcast $id)
--- End diff --

I'll try to get #2851 merged this week; I'm in the middle of some 
significant UI code cleanup and I'm planning to merge most of the existing UI 
patches or to re-implement them myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59884782
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21974/consoleFull)
 for   PR 2844 at commit 
[`1e8268d`](https://github.com/apache/spark/commit/1e8268d6111e4ad45e2acfe47d837718f2170461).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59890238
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21974/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59890235
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21974/consoleFull)
 for   PR 2844 at commit 
[`1e8268d`](https://github.com/apache/spark/commit/1e8268d6111e4ad45e2acfe47d837718f2170461).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59891101
  
I've merged this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2844


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59803868
  
LGTM now, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19118825
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on 
the driver.
*/
   def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+logInfo(sUnpersisting TorrentBroadcast $id)
--- End diff --

this can be chatty. logdebug?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19118878
  
--- Diff: 
core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala ---
@@ -17,13 +17,18 @@
 
 package org.apache.spark.broadcast
 
+import scala.util.Random
+
+import org.scalacheck.Gen
--- End diff --

This is from ScalaCheck; see 
http://www.scalatest.org/user_guide/generator_driven_property_checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19119141
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 for (pid - Random.shuffle(Seq.range(0, numBlocks))) {
   val pieceId = BroadcastBlockId(id, piece + pid)
-
-  // First try getLocalBytes because  there is a chance that previous 
attempts to fetch the
+  logDebug(sReading piece $pieceId of $broadcastId)
+  // First try getLocalBytes because there is a chance that previous 
attempts to fetch the
   // broadcast blocks have already fetched some of the blocks. In that 
case, some blocks
   // would be available locally (on this executor).
-  var blockOpt = bm.getLocalBytes(pieceId)
-  if (!blockOpt.isDefined) {
-blockOpt = bm.getRemoteBytes(pieceId)
-blockOpt match {
-  case Some(block) =
-// If we found the block from remote executors/driver's 
BlockManager, put the block
-// in this executor's BlockManager.
-SparkEnv.get.blockManager.putBytes(
-  pieceId,
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-
-  case None =
-throw new SparkException(Failed to get  + pieceId +  of  + 
broadcastId)
-}
+  val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
+bm.getRemoteBytes(pieceId).map { block =
--- End diff --

given this block is long, can we avoid using map.getOrElse? Just make it 
more explicit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19119178
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 for (pid - Random.shuffle(Seq.range(0, numBlocks))) {
   val pieceId = BroadcastBlockId(id, piece + pid)
-
-  // First try getLocalBytes because  there is a chance that previous 
attempts to fetch the
+  logDebug(sReading piece $pieceId of $broadcastId)
+  // First try getLocalBytes because there is a chance that previous 
attempts to fetch the
   // broadcast blocks have already fetched some of the blocks. In that 
case, some blocks
   // would be available locally (on this executor).
-  var blockOpt = bm.getLocalBytes(pieceId)
-  if (!blockOpt.isDefined) {
-blockOpt = bm.getRemoteBytes(pieceId)
-blockOpt match {
-  case Some(block) =
-// If we found the block from remote executors/driver's 
BlockManager, put the block
-// in this executor's BlockManager.
-SparkEnv.get.blockManager.putBytes(
-  pieceId,
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-
-  case None =
-throw new SparkException(Failed to get  + pieceId +  of  + 
broadcastId)
-}
+  val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
+bm.getRemoteBytes(pieceId).map { block =
--- End diff --

to be more explicit, i'm suggesting the old style is easier to understand


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19119195
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* blocks from the driver and/or other executors.
*/
   @transient private var _value: T = obj
+  /** The compression codec to use, or None if compression is disabled */
+  @transient private var compressionCodec: Option[CompressionCodec] = _
+  /** Size of each block. Default value is 4MB.  This value is only read 
by the broadcaster. */
+  @transient private var blockSize: Int = _
+
+  private def setConf(conf: SparkConf) {
+compressionCodec = if (conf.getBoolean(spark.broadcast.compress, 
true)) {
+  Some(CompressionCodec.createCodec(conf))
+} else {
+  None
+}
+blockSize = conf.getInt(spark.broadcast.blockSize, 4096) * 1024
+  }
+  setConf(SparkEnv.get.conf)
--- End diff --

update the javadoc for this class to make it very obvious that at init 
time, this class reads configuration from SparkEnv.get.conf


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19119727
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 for (pid - Random.shuffle(Seq.range(0, numBlocks))) {
   val pieceId = BroadcastBlockId(id, piece + pid)
-
-  // First try getLocalBytes because  there is a chance that previous 
attempts to fetch the
+  logDebug(sReading piece $pieceId of $broadcastId)
+  // First try getLocalBytes because there is a chance that previous 
attempts to fetch the
   // broadcast blocks have already fetched some of the blocks. In that 
case, some blocks
   // would be available locally (on this executor).
-  var blockOpt = bm.getLocalBytes(pieceId)
-  if (!blockOpt.isDefined) {
-blockOpt = bm.getRemoteBytes(pieceId)
-blockOpt match {
-  case Some(block) =
-// If we found the block from remote executors/driver's 
BlockManager, put the block
-// in this executor's BlockManager.
-SparkEnv.get.blockManager.putBytes(
-  pieceId,
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-
-  case None =
-throw new SparkException(Failed to get  + pieceId +  of  + 
broadcastId)
-}
+  val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
+bm.getRemoteBytes(pieceId).map { block =
--- End diff --

FWIW I agree with @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19119774
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on 
the driver.
*/
   def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+logInfo(sUnpersisting TorrentBroadcast $id)
--- End diff --

Actually this is useful for debugging. I'd suggest keeping this at info


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19120243
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 for (pid - Random.shuffle(Seq.range(0, numBlocks))) {
   val pieceId = BroadcastBlockId(id, piece + pid)
-
-  // First try getLocalBytes because  there is a chance that previous 
attempts to fetch the
+  logDebug(sReading piece $pieceId of $broadcastId)
+  // First try getLocalBytes because there is a chance that previous 
attempts to fetch the
   // broadcast blocks have already fetched some of the blocks. In that 
case, some blocks
   // would be available locally (on this executor).
-  var blockOpt = bm.getLocalBytes(pieceId)
-  if (!blockOpt.isDefined) {
-blockOpt = bm.getRemoteBytes(pieceId)
-blockOpt match {
-  case Some(block) =
-// If we found the block from remote executors/driver's 
BlockManager, put the block
-// in this executor's BlockManager.
-SparkEnv.get.blockManager.putBytes(
-  pieceId,
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-
-  case None =
-throw new SparkException(Failed to get  + pieceId +  of  + 
broadcastId)
-}
+  val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
+bm.getRemoteBytes(pieceId).map { block =
--- End diff --

Would you like me to revert back to the old code layout then?  FWIW, I 
prefer the style here to the old code, which used a `var` and had this if we 
get here, the option is defined comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19121626
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 for (pid - Random.shuffle(Seq.range(0, numBlocks))) {
   val pieceId = BroadcastBlockId(id, piece + pid)
-
-  // First try getLocalBytes because  there is a chance that previous 
attempts to fetch the
+  logDebug(sReading piece $pieceId of $broadcastId)
+  // First try getLocalBytes because there is a chance that previous 
attempts to fetch the
   // broadcast blocks have already fetched some of the blocks. In that 
case, some blocks
   // would be available locally (on this executor).
-  var blockOpt = bm.getLocalBytes(pieceId)
-  if (!blockOpt.isDefined) {
-blockOpt = bm.getRemoteBytes(pieceId)
-blockOpt match {
-  case Some(block) =
-// If we found the block from remote executors/driver's 
BlockManager, put the block
-// in this executor's BlockManager.
-SparkEnv.get.blockManager.putBytes(
-  pieceId,
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-
-  case None =
-throw new SparkException(Failed to get  + pieceId +  of  + 
broadcastId)
-}
+  val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
+bm.getRemoteBytes(pieceId).map { block =
--- End diff --

Hmm -- The thing I want the code to reflect is that there are three cases
1. We get it locally
2. If not, we get it from remote
3. If that fails, we throw an exception. 

Right now it looks like one big block instead of this three way switch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19126060
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on 
the driver.
*/
   def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+logInfo(sUnpersisting TorrentBroadcast $id)
--- End diff --

HttpBroadcast has info-level logging for this.  I'm going to leave this at 
info for now while we debug TorrentBroadcast issues; we can always revisit 
later as part of a larger log-level cleanup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19126192
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* blocks from the driver and/or other executors.
*/
   @transient private var _value: T = obj
+  /** The compression codec to use, or None if compression is disabled */
+  @transient private var compressionCodec: Option[CompressionCodec] = _
+  /** Size of each block. Default value is 4MB.  This value is only read 
by the broadcaster. */
+  @transient private var blockSize: Int = _
+
+  private def setConf(conf: SparkConf) {
+compressionCodec = if (conf.getBoolean(spark.broadcast.compress, 
true)) {
+  Some(CompressionCodec.createCodec(conf))
+} else {
+  None
+}
+blockSize = conf.getInt(spark.broadcast.blockSize, 4096) * 1024
+  }
+  setConf(SparkEnv.get.conf)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19126189
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 for (pid - Random.shuffle(Seq.range(0, numBlocks))) {
   val pieceId = BroadcastBlockId(id, piece + pid)
-
-  // First try getLocalBytes because  there is a chance that previous 
attempts to fetch the
+  logDebug(sReading piece $pieceId of $broadcastId)
+  // First try getLocalBytes because there is a chance that previous 
attempts to fetch the
   // broadcast blocks have already fetched some of the blocks. In that 
case, some blocks
   // would be available locally (on this executor).
-  var blockOpt = bm.getLocalBytes(pieceId)
-  if (!blockOpt.isDefined) {
-blockOpt = bm.getRemoteBytes(pieceId)
-blockOpt match {
-  case Some(block) =
-// If we found the block from remote executors/driver's 
BlockManager, put the block
-// in this executor's BlockManager.
-SparkEnv.get.blockManager.putBytes(
-  pieceId,
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-
-  case None =
-throw new SparkException(Failed to get  + pieceId +  of  + 
broadcastId)
-}
+  val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
+bm.getRemoteBytes(pieceId).map { block =
--- End diff --

I pushed a new commit that simplifies this code.  I think that the problem 
was the use of nested getOrElse calls.  I replaced this with a series of `defs` 
that show how to get the bytes locally and remotely, followed by a non-nested 
`orElse` chain.  I think this is a lot cleaner now, since the core logic is a 
one-liner:

```scala
getLocal.orElse(getRemote).getOrElse(
throw new SparkException(sFailed to get $pieceId of $broadcastId))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59868399
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21953/consoleFull)
 for   PR 2844 at commit 
[`2a9fdfd`](https://github.com/apache/spark/commit/2a9fdfd6bfb6ae1f952d52162d9687e058159282).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/2844

[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.

This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:

- Remove all state from the global TorrentBroadcast object.  This state
  consisted mainly of configuration options, like the block size and
  compression codec, and was read by the blockify / unblockify methods.
  Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
  size was always determined by the first SparkConf that TorrentBroadast was
  initialized with; as a result, unit tests could not properly test
  TorrentBroadcast with different block sizes.

  Instead, blockifyObject and unBlockifyObject now accept compression codecs
  and blockSizes as arguments.  These arguments are supplied at the call 
sites
  inside of TorrentBroadcast instances.  Each TorrentBroadcast instance
  determines these values from SparkEnv's SparkConf.  I was careful to 
ensure
  that we do not accidentally serialize CompressionCodec or SparkConf 
objects
  as part of the TorrentBroadcast object.

- Remove special-case handling of local-mode in TorrentBroadcast.  I don't
  think that broadcast implementations should know about whether we're 
running
  in local mode.  If we want to optimize the performance of broadcast in 
local
  mode, then we should detect this at a higher level and use a dummy
  LocalBroadcastFactory implementation instead.

  Removing this code fixes a subtle error condition: in the old local mode
  code, a failure to find the broadcast in the local BlockManager would lead
  to an attempt to deblockify zero blocks, which could lead to confusing
  deserialization or decompression errors when we attempted to decompress
  an empty byte array.  This should never have happened, though: a failure 
to
  find the block in local mode is evidence of some other error.  The changes
  here will make it easier to debug those errors if they ever happen.

- Add a check that throws an exception when attempting to deblockify an
  empty array.

- Use ScalaCheck to add a test to check that TorrentBroadcast's
  blockifyObject and unBlockifyObject methods are inverses.

- Misc. cleanup and logging improvements.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark torrentbroadcast-bugfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2844


commit 48c98c1996c87cebbd0669924f57527b8e81c35e
Author: Josh Rosen joshro...@databricks.com
Date:   2014-10-19T06:36:49Z

[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.

This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:

- Remove all state from the global TorrentBroadcast object.  This state
  consisted mainly of configuration options, like the block size and
  compression codec, and was read by the blockify / unblockify methods.
  Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
  size was always determined by the first SparkConf that TorrentBroadast was
  initialized with; as a result, unit tests could not properly test
  TorrentBroadcast with different block sizes.

  Instead, blockifyObject and unBlockifyObject now accept compression codecs
  and blockSizes as arguments.  These arguments are supplied at the call 
sites
  inside of TorrentBroadcast instances.  Each TorrentBroadcast instance
  determines these values from SparkEnv's SparkConf.  I was careful to 
ensure
  that we do not accidentally serialize CompressionCodec or SparkConf 
objects
  as part of the TorrentBroadcast object.

- Remove special-case handling of local-mode in TorrentBroadcast.  I don't
  think that broadcast implementations should know about whether we're 
running
  in local mode.  If we want to optimize the performance of broadcast in 
local
  mode, then we should detect this at a higher level and use a dummy
  LocalBroadcastFactory implementation instead.

  Removing this code fixes a subtle error condition: in the old local mode
  code, a failure to find the broadcast in the local BlockManager would lead
  to an attempt to deblockify zero blocks, which could lead to confusing
  deserialization or decompression errors when we attempted to decompress
  an empty 

[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59641699
  
/cc @rxin for review.  I'd like to apply this to `branch-1.1` as well, 
since I believe that it's also affected by current TorrentBroadcast bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59641733
  
Also, /cc @davies, who helped me to spot the local mode might deblockify 
an empty array bug and who's been working on TorrentBroadcast optimizations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59641757
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21882/consoleFull)
 for   PR 2844 at commit 
[`618a872`](https://github.com/apache/spark/commit/618a87260faaebf353c1d9b4abc17af9f0cfa472).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59642634
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21882/consoleFull)
 for   PR 2844 at commit 
[`618a872`](https://github.com/apache/spark/commit/618a87260faaebf353c1d9b4abc17af9f0cfa472).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59642636
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21882/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59643009
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21885/consoleFull)
 for   PR 2844 at commit 
[`33fc754`](https://github.com/apache/spark/commit/33fc75447c676a5fca1f6f7e7095562f3a1583d5).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59643838
  
It looks like this build is going to fail a ReplSuite test:

```scala
test(broadcast vars) {
// Test that the value that a broadcast var had when it was created is 
used,
// even if that variable is then modified in the driver program
// TODO: This doesn't actually work for arrays when we run in local 
mode!
val output = runInterpreter(local,
  
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x = broadcastArray.value(x)).collect
|array(0) = 5
|sc.parallelize(0 to 4).map(x = broadcastArray.value(x)).collect
  .stripMargin)
assertDoesNotContain(error:, output)
assertDoesNotContain(Exception, output)
assertContains(res0: Array[Int] = Array(0, 0, 0, 0, 0), output)
assertContains(res2: Array[Int] = Array(5, 0, 0, 0, 0), output)
  }
```

I see now that my change to remove the special local-mode handling 
inadvertently leads to a duplication of the variable in the driver program.  
This could maybe be a performance issue, since now we will use 2x the memory in 
the driver for each broadcast variable.  I'll restore the line that stores the 
local copy of the broadcast variable when it's created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59643971
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21885/consoleFull)
 for   PR 2844 at commit 
[`33fc754`](https://github.com/apache/spark/commit/33fc75447c676a5fca1f6f7e7095562f3a1583d5).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59643974
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21885/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59644064
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21888/consoleFull)
 for   PR 2844 at commit 
[`5c22782`](https://github.com/apache/spark/commit/5c227825b3cf0bbe3826e20fe66370229bfc43a2).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59645141
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21888/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59645137
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21888/consoleFull)
 for   PR 2844 at commit 
[`5c22782`](https://github.com/apache/spark/commit/5c227825b3cf0bbe3826e20fe66370229bfc43a2).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59653126
  
This most recent test-failure is another side-effect of removing 
TorrentBroadcast's optimizations for local mode:

```
[info] - Unpersisting TorrentBroadcast on executors only in local mode *** 
FAILED ***
[info]   1 did not equal 0 (BroadcastSuite.scala:219)
[info] - Unpersisting TorrentBroadcast on executors and driver in local 
mode *** FAILED ***
[info]   1 did not equal 0 (BroadcastSuite.scala:219)
```

This time, the error is because there's a check that asserts that broadcast 
pieces are not stored into the driver's block manager when running in local 
mode.  I don't think that this optimization necessarily makes sense, since 
we'll have to store those blocks anyways when running in distributed mode.  
Therefore, I'm going to change these tests to remove this local-mode 
special-casing.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59653640
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21893/consoleFull)
 for   PR 2844 at commit 
[`c3b08f9`](https://github.com/apache/spark/commit/c3b08f93b61f0748b7c42fc32314bd92150e5b88).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59655954
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21893/consoleFull)
 for   PR 2844 at commit 
[`c3b08f9`](https://github.com/apache/spark/commit/c3b08f93b61f0748b7c42fc32314bd92150e5b88).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2844#issuecomment-59655958
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21893/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19063222
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -76,23 +87,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* @return number of blocks this broadcast variable is divided into
*/
   private def writeBlocks(): Int = {
-// For local mode, just put the object in the BlockManager so we can 
find it later.
-SparkEnv.get.blockManager.putSingle(
-  broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = 
false)
-
-if (!isLocal) {
-  val blocks = TorrentBroadcast.blockifyObject(_value)
-  blocks.zipWithIndex.foreach { case (block, i) =
-SparkEnv.get.blockManager.putBytes(
-  BroadcastBlockId(id, piece + i),
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-  }
-  blocks.length
-} else {
-  0
+// Store a copy of the broadcast variable in the driver so that tasks 
run on the driver
+// do not create a duplicate copy of the broadcast variable's value.
+SparkEnv.get.blockManager.putSingle(broadcastId, _value, 
StorageLevel.MEMORY_AND_DISK,
+  tellMaster = false)
--- End diff --

I wonder that store a serialized copy in local mode will not help anything. 
If it failed to fetch the original copy of value from blockManager, it will 
also can not fetch the serialized copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19063253
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* blocks from the driver and/or other executors.
*/
   @transient private var _value: T = obj
+  /** The compression codec to use, or None if compression is disabled */
+  @transient private var compressionCodec: Option[CompressionCodec] = _
+  /** Size of each block. Default value is 4MB.  This value is only read 
by the broadcaster. */
+  @transient private var blockSize: Int = _
--- End diff --

How about move these two as part of Constructor? Reading the Conf in 
TorrentBroadcastFactor


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19063271
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -156,6 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
   private def readObject(in: ObjectInputStream) {
 in.defaultReadObject()
 TorrentBroadcast.synchronized {
+  setConf(SparkEnv.get.conf)
--- End diff --

This looks wired, how can we make sure that this conf is equals to the one 
used when create the Broadcast?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19063287
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -156,6 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
   private def readObject(in: ObjectInputStream) {
 in.defaultReadObject()
 TorrentBroadcast.synchronized {
+  setConf(SparkEnv.get.conf)
--- End diff --

The conf is application-scoped.  The same conf should be present on this 
application's executors, where this task will be deserialized.  This assumption 
is used elsewhere, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19063336
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -76,23 +87,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* @return number of blocks this broadcast variable is divided into
*/
   private def writeBlocks(): Int = {
-// For local mode, just put the object in the BlockManager so we can 
find it later.
-SparkEnv.get.blockManager.putSingle(
-  broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = 
false)
-
-if (!isLocal) {
-  val blocks = TorrentBroadcast.blockifyObject(_value)
-  blocks.zipWithIndex.foreach { case (block, i) =
-SparkEnv.get.blockManager.putBytes(
-  BroadcastBlockId(id, piece + i),
-  block,
-  StorageLevel.MEMORY_AND_DISK_SER,
-  tellMaster = true)
-  }
-  blocks.length
-} else {
-  0
+// Store a copy of the broadcast variable in the driver so that tasks 
run on the driver
+// do not create a duplicate copy of the broadcast variable's value.
+SparkEnv.get.blockManager.putSingle(broadcastId, _value, 
StorageLevel.MEMORY_AND_DISK,
+  tellMaster = false)
--- End diff --

The reason for this store is to avoid creating two copies of `_value` in 
the driver.  If we serialize and deserialize a broadcast variable on the driver 
and then attempt to access its value, then without this code we will end up 
going through the regular de-chunking code path, which will cause us to 
deserialize the serialized copy of `_value` and waste memory. 

I believe that this serialization and deserialization can take place when 
tasks are run in local mode, since we still serialize tasks in order to help 
users be aware of serialization issues that would impact them if they moved to 
a cluster.  This complexity is another reason why I'm in favor of just 
scrapping all local-mode special-casing and configuring Spark to use a dummy 
LocalBroadcastFactory for local mode instead of whichever setting the user 
specified.  That would be a larger, more-invasive change, which is why I opted 
for the simpler fix here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19063363
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* blocks from the driver and/or other executors.
*/
   @transient private var _value: T = obj
+  /** The compression codec to use, or None if compression is disabled */
+  @transient private var compressionCodec: Option[CompressionCodec] = _
+  /** Size of each block. Default value is 4MB.  This value is only read 
by the broadcaster. */
+  @transient private var blockSize: Int = _
--- End diff --

I thought about this and agree that it might be cleaner, but this will 
require more refactoring of other code.  One design goal here was to minimize 
the serialized size of TorrentBroadcast objects, so we can't serialize the 
SparkConf or CompressionCodec instances (which contain SparkConfs).  
SparkEnv.conf determines these values anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19063455
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -179,43 +183,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 
 private object TorrentBroadcast extends Logging {
-  /** Size of each block. Default value is 4MB. */
-  private lazy val BLOCK_SIZE = conf.getInt(spark.broadcast.blockSize, 
4096) * 1024
-  private var initialized = false
-  private var conf: SparkConf = null
-  private var compress: Boolean = false
-  private var compressionCodec: CompressionCodec = null
-
-  def initialize(_isDriver: Boolean, conf: SparkConf) {
-TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
-synchronized {
-  if (!initialized) {
-compress = conf.getBoolean(spark.broadcast.compress, true)
-compressionCodec = CompressionCodec.createCodec(conf)
-initialized = true
-  }
-}
-  }
 
-  def stop() {
-initialized = false
-  }
-
-  def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
-val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
-val out: OutputStream = if (compress) 
compressionCodec.compressedOutputStream(bos) else bos
-val ser = SparkEnv.get.serializer.newInstance()
+  def blockifyObject[T: ClassTag](
--- End diff --

The conf has been moved into `class Broadcast`, maybe blockifyObject and 
unblockify also should be moved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

2014-10-19 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2844#discussion_r19066407
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -179,43 +183,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 
 private object TorrentBroadcast extends Logging {
-  /** Size of each block. Default value is 4MB. */
-  private lazy val BLOCK_SIZE = conf.getInt(spark.broadcast.blockSize, 
4096) * 1024
-  private var initialized = false
-  private var conf: SparkConf = null
-  private var compress: Boolean = false
-  private var compressionCodec: CompressionCodec = null
-
-  def initialize(_isDriver: Boolean, conf: SparkConf) {
-TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
-synchronized {
-  if (!initialized) {
-compress = conf.getBoolean(spark.broadcast.compress, true)
-compressionCodec = CompressionCodec.createCodec(conf)
-initialized = true
-  }
-}
-  }
 
-  def stop() {
-initialized = false
-  }
-
-  def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
-val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
-val out: OutputStream = if (compress) 
compressionCodec.compressedOutputStream(bos) else bos
-val ser = SparkEnv.get.serializer.newInstance()
+  def blockifyObject[T: ClassTag](
--- End diff --

These two methods, `blockifyObject` and `unBlockifyObject`, now accept all 
of their dependencies directly, which makes it easier to unit-test them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org