Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/247#discussion_r11284020 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala --- @@ -17,44 +17,80 @@ package org.apache.spark.streaming.util +import org.apache.spark.Logging + private[streaming] -class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { +class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) + extends Logging { - private val thread = new Thread("RecurringTimer") { + private val thread = new Thread("RecurringTimer - " + name) { + setDaemon(true) override def run() { loop } } - - private var nextTime = 0L + private var prevTime = -1L + private var nextTime = -1L + private var stopped = false + + /** + * Get the earliest time when this timer can be started. The time must be a + * multiple of this timer's period and more than current time. + */ def getStartTime(): Long = { (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period } + /** + * Get the earliest time when this timer can be restarted, based on the earlier start time. + * The time must be a multiple of this timer's period and more than current time. + */ def getRestartTime(originalStartTime: Long): Long = { val gap = clock.currentTime - originalStartTime (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime } - def start(startTime: Long): Long = { + /** + * Start at the given start time. + */ + def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() + logInfo("Started timer for " + name + " at time " + nextTime) nextTime } + /** + * Start at the earliest time it can start based on the period. + */ def start(): Long = { start(getStartTime()) } - def stop() { - thread.interrupt() + /** + * Stop the timer. stopAfterNextCallback = true make it wait for next callback to be completed. + * Returns the last time when it had called back. + */ + def stop(stopAfterNextCallback: Boolean = false): Long = synchronized { --- End diff -- I made it interruptTimer to make it even more generic. And made it a non-default param.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---