[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161148920 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { --- End diff -- `ReferenceMap` is not thread safe, no - however, all operations on `broadcastCache` occur within the context of a synchronized block; TorrentBroadcast.scala lines 208-254. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161135870 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ +setConf(SparkEnv.get.conf) --- End diff -- No, sorry - the cache update takes place within that block. With the exception of those blocks (lines 220-222 and lines 244-246), yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161133496 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- This is the state of an executor at some point in time: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) After some time Thread1 finishes process the partition it's working on and starts on the next - the state becomes: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) At some point the GC destroys TorrentBroadcastInstance1. Now, if the key is a weak reference, the state becomes: Cache: Empty Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) The next thread to finish processing a partition then creates a new instance of the broadcast value: Cache: IdInstance6 => ValueInstance2 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance2) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) On the other hand, if the key is a strong reference the the value is weak, the cached value isn't eligible for GC above. As such, when Thread3 finishes processing it's partition and starts the next, the state becomes: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161132399 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ --- End diff -- Fixed, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161132203 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ +setConf(SparkEnv.get.conf) --- End diff -- Yes - everything within the getOrElse block is unchanged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161132057 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- Suppose the first thread to request the broadcast variable's value destroyed it's instance of the broadcast variable (which, I believe, is what will happen when that thread finishes processing it's partition) - if the key were a weak reference in the above cache it would become eligible for GC at that point. I'm reasonably certain at that point the associated key/value pair would be removed from the cache; in other words, if the key were a weak reference the key/value pair would be removed as soon as the key **or** value was garbage collected. Note that I haven't used ReferenceMap extensively, so I could be wrong about the above - feel free to correct me if that's the case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20183: [SPARK-22986][Core] Use a cache to avoid instantiating m...
Github user ho3rexqj commented on the issue: https://github.com/apache/spark/pull/20183 Updated the above, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Fix/cache broadcast values
GitHub user ho3rexqj opened a pull request: https://github.com/apache/spark/pull/20183 [SPARK-22986][Core] Fix/cache broadcast values You can merge this pull request into a Git repository by running: $ git pull https://github.com/ho3rexqj/spark fix/cache-broadcast-values Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20183 commit 45d5024106c49b38dd5f913685fc587cfdd4c66e Author: ho3rexqj Date: 2018-01-08T01:44:21Z Adding tests to illustrate the problem we were having with broadcast variables. commit c3e2f422a8274b1dded044c046e47338508d88a0 Author: ho3rexqj Date: 2018-01-08T01:45:20Z Adding broadcast value cache to avoid instantiating broadcast values multiple times on executors when memory is constrained. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org