Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/126#discussion_r10550266
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -181,15 +186,49 @@ private[spark] class MapOutputTracker(conf: 
SparkConf) extends Logging {
       }
     }
     
    +/**
    + * MapOutputTracker for the workers. This uses BoundedHashMap to keep 
track of
    + * a limited number of most recently used map output information.
    + */
    +private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends 
MapOutputTracker(conf) {
    +
    +  /**
    +   * Bounded HashMap for storing serialized statuses in the worker. This 
allows
    +   * the HashMap stay bounded in memory-usage. Things dropped from this 
HashMap will be
    +   * automatically repopulated by fetching them again from the driver.
    +   */
    +  protected val MAX_MAP_STATUSES = 100
    +  protected val mapStatuses = new BoundedHashMap[Int, 
Array[MapStatus]](MAX_MAP_STATUSES, true)
    +}
    +
    +/**
    + * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep 
track of map
    + * output information, which allows old output information based on a TTL.
    + */
     private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       extends MapOutputTracker(conf) {
     
       // Cache a serialized version of the output statuses for each shuffle to 
send them out faster
       private var cacheEpoch = epoch
    -  private val cachedSerializedStatuses = new TimeStampedHashMap[Int, 
Array[Byte]]
    +
    +  /**
    +   * Timestamp based HashMap for storing mapStatuses in the master, so 
that statuses are dropped
    +   * only by explicit deregistering or by ttl-based cleaning (if set). 
Other than these two
    +   * scenarios, nothing should be dropped from this HashMap.
    +   */
    +  protected val mapStatuses = new TimeStampedHashMap[Int, 
Array[MapStatus]]()
    +
    +  /**
    +   * Bounded HashMap for storing serialized statuses in the master. This 
allows
    +   * the HashMap stay bounded in memory-usage. Things dropped from this 
HashMap will be
    +   * automatically repopulated by serializing the lost statuses again .
    +   */
    +  protected val MAX_SERIALIZED_STATUSES = 100
    +  private val cachedSerializedStatuses =
    +    new BoundedHashMap[Int, Array[Byte]](MAX_SERIALIZED_STATUSES, true)
    --- End diff --
    
    I'd prefer if this just used a normal TimeStampedHashMap or even just a 
normal HashMap and then we added an explicit message to the MapOutputTracker to 
clean up the statuses when a given shuffle dependency goes out of scope. I 
think it's fine to relay this through the block manager for now since the 
driver doesn't have a direct path to the MapOutputTracker.
    
    The main motivation is to simplify this patch and avoid introducing another 
mechanism for deciding when to keep/evict things (this bounded size map). We 
can just have the invariant that if a shuffle is in scope then it's statuses 
are still present everywhere, if it goes out of scope it's statuses go away. If 
people want other behavior then this they can use the timestamped map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to