Repository: spark
Updated Branches:
  refs/heads/master b5042d75c -> cbe7c6fbf


[SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of 
broadcast variable values

When resources happen to be constrained on an executor the first time a 
broadcast variable is instantiated it is persisted to disk by the BlockManager. 
Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock 
from other instances of that broadcast variable spawns another instance of the 
underlying value. That is, broadcast variables are spawned once per executor 
**unless** memory is constrained, in which case every instance of a broadcast 
variable is provided with a unique copy of the underlying value.

This patch fixes the above by explicitly caching the underlying values using 
weak references in a ReferenceMap.

Author: ho3rexqj <ho3re...@gmail.com>

Closes #20183 from ho3rexqj/fix/cache-broadcast-values.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbe7c6fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbe7c6fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbe7c6fb

Branch: refs/heads/master
Commit: cbe7c6fbf9dc2fc422b93b3644c40d449a869eea
Parents: b5042d7
Author: ho3rexqj <ho3re...@gmail.com>
Authored: Fri Jan 12 15:27:00 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Jan 12 15:27:00 2018 +0800

----------------------------------------------------------------------
 .../spark/broadcast/BroadcastManager.scala      |  6 ++
 .../spark/broadcast/TorrentBroadcast.scala      | 72 ++++++++++++--------
 .../apache/spark/broadcast/BroadcastSuite.scala | 34 +++++++++
 3 files changed, 83 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala 
b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index e88988f..8d7a4a3 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.reflect.ClassTag
 
+import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}
+
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
 
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+    new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
+  }
+
   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
     broadcastFactory.newBroadcast[T](value_, isLocal, 
nextBroadcastId.getAndIncrement())
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 7aecd3c..e125095 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
     TorrentBroadcast.synchronized {
-      setConf(SparkEnv.get.conf)
-      val blockManager = SparkEnv.get.blockManager
-      blockManager.getLocalValues(broadcastId) match {
-        case Some(blockResult) =>
-          if (blockResult.data.hasNext) {
-            val x = blockResult.data.next().asInstanceOf[T]
-            releaseLock(broadcastId)
-            x
-          } else {
-            throw new SparkException(s"Failed to get locally stored broadcast 
data: $broadcastId")
-          }
-        case None =>
-          logInfo("Started reading broadcast variable " + id)
-          val startTimeMs = System.currentTimeMillis()
-          val blocks = readBlocks()
-          logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-          try {
-            val obj = TorrentBroadcast.unBlockifyObject[T](
-              blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-            // Store the merged copy in BlockManager so other tasks on this 
executor don't
-            // need to re-fetch it.
-            val storageLevel = StorageLevel.MEMORY_AND_DISK
-            if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-              throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+      val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+      Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse 
{
+        setConf(SparkEnv.get.conf)
+        val blockManager = SparkEnv.get.blockManager
+        blockManager.getLocalValues(broadcastId) match {
+          case Some(blockResult) =>
+            if (blockResult.data.hasNext) {
+              val x = blockResult.data.next().asInstanceOf[T]
+              releaseLock(broadcastId)
+
+              if (x != null) {
+                broadcastCache.put(broadcastId, x)
+              }
+
+              x
+            } else {
+              throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
             }
-            obj
-          } finally {
-            blocks.foreach(_.dispose())
-          }
+          case None =>
+            logInfo("Started reading broadcast variable " + id)
+            val startTimeMs = System.currentTimeMillis()
+            val blocks = readBlocks()
+            logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
+
+            try {
+              val obj = TorrentBroadcast.unBlockifyObject[T](
+                blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
+              // Store the merged copy in BlockManager so other tasks on this 
executor don't
+              // need to re-fetch it.
+              val storageLevel = StorageLevel.MEMORY_AND_DISK
+              if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
+                throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+              }
+
+              if (obj != null) {
+                broadcastCache.put(broadcastId, obj)
+              }
+
+              obj
+            } finally {
+              blocks.foreach(_.dispose())
+            }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala 
b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 1596298..9ad2e9a 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -153,6 +153,40 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
     assert(broadcast.value.sum === 10)
   }
 
+  test("One broadcast value instance per executor") {
+    val conf = new SparkConf()
+      .setMaster("local[4]")
+      .setAppName("test")
+
+    sc = new SparkContext(conf)
+    val list = List[Int](1, 2, 3, 4)
+    val broadcast = sc.broadcast(list)
+    val instances = sc.parallelize(1 to 10)
+      .map(x => System.identityHashCode(broadcast.value))
+      .collect()
+      .toSet
+
+    assert(instances.size === 1)
+  }
+
+  test("One broadcast value instance per executor when memory is constrained") 
{
+    val conf = new SparkConf()
+      .setMaster("local[4]")
+      .setAppName("test")
+      .set("spark.memory.useLegacyMode", "true")
+      .set("spark.storage.memoryFraction", "0.0")
+
+    sc = new SparkContext(conf)
+    val list = List[Int](1, 2, 3, 4)
+    val broadcast = sc.broadcast(list)
+    val instances = sc.parallelize(1 to 10)
+      .map(x => System.identityHashCode(broadcast.value))
+      .collect()
+      .toSet
+
+    assert(instances.size === 1)
+  }
+
   /**
    * Verify the persistence of state associated with a TorrentBroadcast in a 
local-cluster.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to