Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11229496
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala 
---
    @@ -17,44 +17,80 @@
     
     package org.apache.spark.streaming.util
     
    +import org.apache.spark.Logging
    +
     private[streaming]
    -class RecurringTimer(val clock: Clock, val period: Long, val callback: 
(Long) => Unit) {
    +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, 
name: String)
    +  extends Logging {
       
    -  private val thread = new Thread("RecurringTimer") {
    +  private val thread = new Thread("RecurringTimer - " + name) {
    +    setDaemon(true)
         override def run() { loop }    
       }
    -  
    -  private var nextTime = 0L
     
    +  private var prevTime = -1L
    +  private var nextTime = -1L
    +  private var stopped = false
    +
    +  /**
    +   * Get the earliest time when this timer can be started. The time must 
be a
    +   * multiple of this timer's period and more than current time.
    +   */
       def getStartTime(): Long = {
         (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
       }
     
    +  /**
    +   * Get the earliest time when this timer can be restarted, based on the 
earlier start time.
    +   * The time must be a multiple of this timer's period and more than 
current time.
    +   */
       def getRestartTime(originalStartTime: Long): Long = {
         val gap = clock.currentTime - originalStartTime
         (math.floor(gap.toDouble / period).toLong + 1) * period + 
originalStartTime
       }
     
    -  def start(startTime: Long): Long = {
    +  /**
    +   * Start at the given start time.
    +   */
    +  def start(startTime: Long): Long = synchronized {
         nextTime = startTime
         thread.start()
    +    logInfo("Started timer for " + name + " at time " + nextTime)
         nextTime
       }
     
    +  /**
    +   * Start at the earliest time it can start based on the period.
    +   */
       def start(): Long = {
         start(getStartTime())
       }
     
    -  def stop() {
    -    thread.interrupt() 
    +  /**
    +   * Stop the timer. stopAfterNextCallback = true make it wait for next 
callback to be completed.
    +   * Returns the last time when it had called back.
    +   */
    +  def stop(stopAfterNextCallback: Boolean = false): Long = synchronized {
    +    if (!stopped) {
    --- End diff --
    
    does `stopped` need to be `volatile` here to work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to