anishshri-db commented on code in PR #45709:
URL: https://github.com/apache/spark/pull/45709#discussion_r1540147961


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -188,17 +187,20 @@ class TimerStateImpl(
 
   /**
    * Function to get all the registered timers for all grouping keys
+   * @param expiryTimestampMs Threshold for expired timestamp in milliseconds, 
this function
+   *                          will return every timers that has (strictly) 
smaller timestamp
    * @return - iterator of all the registered timers for all grouping keys
    */
-  def getExpiredTimers(): Iterator[(Any, Long)] = {
+  def getExpiredTimers(expiryTimestampMs: Long): Iterator[(Any, Long)] = {
+    // this iter is increasingly sorted on timestamp
     val iter = store.iterator(tsToKeyCFName)
 
     new NextIterator[(Any, Long)] {
       override protected def getNext(): (Any, Long) = {
-        if (iter.hasNext) {
-          val rowPair = iter.next()
-          val keyRow = rowPair.key
-          val result = getTimerRowFromSecIndex(keyRow)
+        val rowPair = if (iter.hasNext) iter.next() else null
+        val result: (Any, Long) =
+          if (rowPair != null) getTimerRowFromSecIndex(rowPair.key) else null
+        if (result != null && result._2 < expiryTimestampMs) {

Review Comment:
   Lets do `<=` comparison so that the timer doesn't have to wait for the next 
batch to fire ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to