[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161148920
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
--- End diff --

`ReferenceMap` is not thread safe, no - however, all operations on 
`broadcastCache` occur within the context of a synchronized block; 
TorrentBroadcast.scala lines 208-254.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161135870
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
+setConf(SparkEnv.get.conf)
--- End diff --

No, sorry - the cache update takes place within that block.  With the 
exception of those blocks (lines 220-222 and lines 244-246), yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161133496
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

This is the state of an executor at some point in time:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

After some time Thread1 finishes process the partition it's working on and 
starts on the next - the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

At some point the GC destroys TorrentBroadcastInstance1.  Now, if the key 
is a weak reference, the state becomes:

Cache: Empty
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

The next thread to finish processing a partition then creates a new 
instance of the broadcast value:

Cache: IdInstance6 => ValueInstance2
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = 
ValueInstance2)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

On the other hand, if the key is a strong reference the the value is weak, 
the cached value isn't eligible for GC above.  As such, when Thread3 finishes 
processing it's partition and starts the next, the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161132399
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
--- End diff --

Fixed, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161132203
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
+setConf(SparkEnv.get.conf)
--- End diff --

Yes - everything within the getOrElse block is unchanged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161132057
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

Suppose the first thread to request the broadcast variable's value 
destroyed it's instance of the broadcast variable (which, I believe, is what 
will happen when that thread finishes processing it's partition) - if the key 
were a weak reference in the above cache it would become eligible for GC at 
that point.  I'm reasonably certain at that point the associated key/value pair 
would be removed from the cache; in other words, if the key were a weak 
reference the key/value pair would be removed as soon as the key **or** value 
was garbage collected.

Note that I haven't used ReferenceMap extensively, so I could be wrong 
about the above - feel free to correct me if that's the case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20183: [SPARK-22986][Core] Use a cache to avoid instantiating m...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on the issue:

https://github.com/apache/spark/pull/20183
  
Updated the above, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Fix/cache broadcast values

2018-01-07 Thread ho3rexqj
GitHub user ho3rexqj opened a pull request:

https://github.com/apache/spark/pull/20183

[SPARK-22986][Core] Fix/cache broadcast values



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ho3rexqj/spark fix/cache-broadcast-values

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20183


commit 45d5024106c49b38dd5f913685fc587cfdd4c66e
Author: ho3rexqj 
Date:   2018-01-08T01:44:21Z

Adding tests to illustrate the problem we were having with broadcast 
variables.

commit c3e2f422a8274b1dded044c046e47338508d88a0
Author: ho3rexqj 
Date:   2018-01-08T01:45:20Z

Adding broadcast value cache to avoid instantiating broadcast values 
multiple times on executors when memory is constrained.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org