Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161133496 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { + new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- This is the state of an executor at some point in time: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) After some time Thread1 finishes process the partition it's working on and starts on the next - the state becomes: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) At some point the GC destroys TorrentBroadcastInstance1. Now, if the key is a weak reference, the state becomes: Cache: Empty Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) The next thread to finish processing a partition then creates a new instance of the broadcast value: Cache: IdInstance6 => ValueInstance2 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance2) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) On the other hand, if the key is a strong reference the the value is weak, the cached value isn't eligible for GC above. As such, when Thread3 finishes processing it's partition and starts the next, the state becomes: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org