anishshri-db commented on code in PR #45709: URL: https://github.com/apache/spark/pull/45709#discussion_r1540147961
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ########## @@ -188,17 +187,20 @@ class TimerStateImpl( /** * Function to get all the registered timers for all grouping keys + * @param expiryTimestampMs Threshold for expired timestamp in milliseconds, this function + * will return every timers that has (strictly) smaller timestamp * @return - iterator of all the registered timers for all grouping keys */ - def getExpiredTimers(): Iterator[(Any, Long)] = { + def getExpiredTimers(expiryTimestampMs: Long): Iterator[(Any, Long)] = { + // this iter is increasingly sorted on timestamp val iter = store.iterator(tsToKeyCFName) new NextIterator[(Any, Long)] { override protected def getNext(): (Any, Long) = { - if (iter.hasNext) { - val rowPair = iter.next() - val keyRow = rowPair.key - val result = getTimerRowFromSecIndex(keyRow) + val rowPair = if (iter.hasNext) iter.next() else null + val result: (Any, Long) = + if (rowPair != null) getTimerRowFromSecIndex(rowPair.key) else null + if (result != null && result._2 < expiryTimestampMs) { Review Comment: Lets do `<=` comparison so that the timer doesn't have to wait for the next batch to fire ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org