Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160007884 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -418,11 +418,16 @@ abstract class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ - private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { + private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit = { assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + if (sources.length <= sourceIndex) { + false --- End diff -- The race condition is present because `sources` is initialized to Seq.empty and then assigned to the actual sources. You can actually initialize `sources` to null, and then return `notDone = false` when `sources` is null. Any other mismatch should throw error. I dont like this current code which hides erroneous situations.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org