anishshri-db commented on code in PR #36620: URL: https://github.com/apache/spark/pull/36620#discussion_r878569698
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala: ########## @@ -445,7 +445,17 @@ abstract class StreamExecution( false } else { val source = sources(sourceIndex) - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + // SPARK-39242 For numeric increasing offsets, we could have called awaitOffset + // after the stream has moved past the expected newOffset or if committedOffsets + // changed after notify. In this case, its safe to exit, since at-least the given + // Offset has been reached and the equality condition might never be met. + if (!localCommittedOffsets.contains(source)) { + true + } else if (newOffset.isInstanceOf[LongOffset]) { + localCommittedOffsets(source).toString.toLong < newOffset.toString.toLong Review Comment: Right yea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org