Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/126#discussion_r11266861 --- Diff: core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala --- @@ -51,49 +50,37 @@ import org.apache.spark._ * @tparam T Type of the data contained in the broadcast variable. */ abstract class Broadcast[T](val id: Long) extends Serializable { - def value: T - - // We cannot have an abstract readObject here due to some weird issues with - // readObject having to be 'private' in sub-classes. - - override def toString = "Broadcast(" + id + ")" -} - -private[spark] -class BroadcastManager(val _isDriver: Boolean, conf: SparkConf, securityManager: SecurityManager) - extends Logging with Serializable { - - private var initialized = false - private var broadcastFactory: BroadcastFactory = null - initialize() + protected var _isValid: Boolean = true - // Called by SparkContext or Executor before using Broadcast - private def initialize() { - synchronized { - if (!initialized) { - val broadcastFactoryClass = conf.get( - "spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory") + /** + * Whether this Broadcast is actually usable. This should be false once persisted state is + * removed from the driver. + */ + def isValid: Boolean = _isValid - broadcastFactory = - Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] - - // Initialize appropriate BroadcastFactory and BroadcastObject - broadcastFactory.initialize(isDriver, conf, securityManager) + def value: T - initialized = true - } + /** + * Remove all persisted state associated with this broadcast on the executors. The next use + * of this broadcast on the executors will trigger a remote fetch. + */ + def unpersist() + + /** + * Remove all persisted state associated with this broadcast on both the executors and the + * driver. Overriding implementations should set isValid to false. + */ + private[spark] def destroy() --- End diff -- You could also make this concrete and have a function called `onDestroy` that sub-classes are required to implement. And then it could look like this ``` private[spark] def destroy() { _isValid = false onDestroy() } ``` This would make it so you don't count on subclasses to deal with the valid bit.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---