Github user ho3rexqj commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20183#discussion_r161133496
  
    --- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
    @@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
     
       private val nextBroadcastId = new AtomicLong(0)
     
    +  private[broadcast] val cachedValues = {
    +    new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
    --- End diff --
    
    This is the state of an executor at some point in time:
    
    Cache: IdInstance1 => ValueInstance1
    Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = 
ValueInstance1)
    Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
    Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
    Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)
    
    After some time Thread1 finishes process the partition it's working on and 
starts on the next - the state becomes:
    
    Cache: IdInstance1 => ValueInstance1
    Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
    Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
    Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
    Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)
    
    At some point the GC destroys TorrentBroadcastInstance1.  Now, if the key 
is a weak reference, the state becomes:
    
    Cache: Empty
    Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
    Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
    Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
    Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)
    
    The next thread to finish processing a partition then creates a new 
instance of the broadcast value:
    
    Cache: IdInstance6 => ValueInstance2
    Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
    Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
    Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = 
ValueInstance2)
    Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)
    
    On the other hand, if the key is a strong reference the the value is weak, 
the cached value isn't eligible for GC above.  As such, when Thread3 finishes 
processing it's partition and starts the next, the state becomes:
    
    Cache: IdInstance1 => ValueInstance1
    Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
    Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
    Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = 
ValueInstance1)
    Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to