anishshri-db commented on code in PR #36620:
URL: https://github.com/apache/spark/pull/36620#discussion_r878569698


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala:
##########
@@ -445,7 +445,17 @@ abstract class StreamExecution(
         false
       } else {
         val source = sources(sourceIndex)
-        !localCommittedOffsets.contains(source) || 
localCommittedOffsets(source) != newOffset
+        // SPARK-39242 For numeric increasing offsets, we could have called 
awaitOffset
+        // after the stream has moved past the expected newOffset or if 
committedOffsets
+        // changed after notify. In this case, its safe to exit, since 
at-least the given
+        // Offset has been reached and the equality condition might never be 
met.
+        if (!localCommittedOffsets.contains(source)) {
+          true
+        } else if (newOffset.isInstanceOf[LongOffset]) {
+          localCommittedOffsets(source).toString.toLong < 
newOffset.toString.toLong

Review Comment:
   Right yea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to