Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r159797164
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -418,11 +418,16 @@ abstract class StreamExecution(
        * Blocks the current thread until processing for data from the given 
`source` has reached at
        * least the given `Offset`. This method is intended for use primarily 
when writing tests.
        */
    -  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
    +  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit 
= {
         assertAwaitThread()
         def notDone = {
           val localCommittedOffsets = committedOffsets
    -      !localCommittedOffsets.contains(source) || 
localCommittedOffsets(source) != newOffset
    +      if (sources.length <= sourceIndex) {
    +        false
    --- End diff --
    
    Sources is a var which might not be populated yet. (This race condition 
showed up in AddKafkaData in my tests.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to