Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/126#discussion_r11266861
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala ---
    @@ -51,49 +50,37 @@ import org.apache.spark._
      * @tparam T Type of the data contained in the broadcast variable.
      */
     abstract class Broadcast[T](val id: Long) extends Serializable {
    -  def value: T
    -
    -  // We cannot have an abstract readObject here due to some weird issues 
with
    -  // readObject having to be 'private' in sub-classes.
    -
    -  override def toString = "Broadcast(" + id + ")"
    -}
    -
    -private[spark]
    -class BroadcastManager(val _isDriver: Boolean, conf: SparkConf, 
securityManager: SecurityManager)
    -    extends Logging with Serializable {
    -
    -  private var initialized = false
    -  private var broadcastFactory: BroadcastFactory = null
     
    -  initialize()
    +  protected var _isValid: Boolean = true
     
    -  // Called by SparkContext or Executor before using Broadcast
    -  private def initialize() {
    -    synchronized {
    -      if (!initialized) {
    -        val broadcastFactoryClass = conf.get(
    -          "spark.broadcast.factory", 
"org.apache.spark.broadcast.HttpBroadcastFactory")
    +  /**
    +   * Whether this Broadcast is actually usable. This should be false once 
persisted state is
    +   * removed from the driver.
    +   */
    +  def isValid: Boolean = _isValid
     
    -        broadcastFactory =
    -          
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
    -
    -        // Initialize appropriate BroadcastFactory and BroadcastObject
    -        broadcastFactory.initialize(isDriver, conf, securityManager)
    +  def value: T
     
    -        initialized = true
    -      }
    +  /**
    +   * Remove all persisted state associated with this broadcast on the 
executors. The next use
    +   * of this broadcast on the executors will trigger a remote fetch.
    +   */
    +  def unpersist()
    +
    +  /**
    +   * Remove all persisted state associated with this broadcast on both the 
executors and the
    +   * driver. Overriding implementations should set isValid to false.
    +   */
    +  private[spark] def destroy()
    --- End diff --
    
    You could also make this concrete and have a function called `onDestroy` 
that sub-classes are required to implement. And then it could look like this
    
    ```
    private[spark] def destroy() {
      _isValid = false
      onDestroy()
    }
    ```
    This would make it so you don't count on subclasses to deal with the valid 
bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to