spark git commit: [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values
Repository: spark Updated Branches: refs/heads/branch-2.3 55695c712 -> 3ae3e1bb7 [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values When resources happen to be constrained on an executor the first time a broadcast variable is instantiated it is persisted to disk by the BlockManager. Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock from other instances of that broadcast variable spawns another instance of the underlying value. That is, broadcast variables are spawned once per executor **unless** memory is constrained, in which case every instance of a broadcast variable is provided with a unique copy of the underlying value. This patch fixes the above by explicitly caching the underlying values using weak references in a ReferenceMap. Author: ho3rexqj Closes #20183 from ho3rexqj/fix/cache-broadcast-values. (cherry picked from commit cbe7c6fbf9dc2fc422b93b3644c40d449a869eea) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ae3e1bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ae3e1bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ae3e1bb Branch: refs/heads/branch-2.3 Commit: 3ae3e1bb71aa88be1c963b4416986ef679d7c8a2 Parents: 55695c7 Author: ho3rexqj Authored: Fri Jan 12 15:27:00 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 12 15:27:31 2018 +0800 -- .../spark/broadcast/BroadcastManager.scala | 6 ++ .../spark/broadcast/TorrentBroadcast.scala | 72 .../apache/spark/broadcast/BroadcastSuite.scala | 34 + 3 files changed, 83 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ae3e1bb/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index e88988f..8d7a4a3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.ClassTag +import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap} + import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) + } + def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } http://git-wip-us.apache.org/repos/asf/spark/blob/3ae3e1bb/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 7aecd3c..e125095 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.put
spark git commit: [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values
Repository: spark Updated Branches: refs/heads/master b5042d75c -> cbe7c6fbf [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values When resources happen to be constrained on an executor the first time a broadcast variable is instantiated it is persisted to disk by the BlockManager. Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock from other instances of that broadcast variable spawns another instance of the underlying value. That is, broadcast variables are spawned once per executor **unless** memory is constrained, in which case every instance of a broadcast variable is provided with a unique copy of the underlying value. This patch fixes the above by explicitly caching the underlying values using weak references in a ReferenceMap. Author: ho3rexqj Closes #20183 from ho3rexqj/fix/cache-broadcast-values. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbe7c6fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbe7c6fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbe7c6fb Branch: refs/heads/master Commit: cbe7c6fbf9dc2fc422b93b3644c40d449a869eea Parents: b5042d7 Author: ho3rexqj Authored: Fri Jan 12 15:27:00 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 12 15:27:00 2018 +0800 -- .../spark/broadcast/BroadcastManager.scala | 6 ++ .../spark/broadcast/TorrentBroadcast.scala | 72 .../apache/spark/broadcast/BroadcastSuite.scala | 34 + 3 files changed, 83 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index e88988f..8d7a4a3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.ClassTag +import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap} + import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) + } + def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 7aecd3c..e125095 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"F