Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22449114
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
    @@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: 
ClassTag](parent: DStream[T],
     }
     
     /**
    + * This is an interface that can be used to block until certain events 
occur, such as
    + * the start/completion of batches.  This is much less brittle than 
waiting on wall-clock time.
    + * Internally, this is implemented using a StreamingListener.  
Constructing a new instance of this
    + * class automatically registers a StreamingListener on the given 
StreamingContext.
    + */
    +class StreamingTestWaiter(ssc: StreamingContext) {
    +
    +  // All access to this state should be guarded by 
`StreamingTestWaiter.this.synchronized`
    +  private var numCompletedBatches = 0
    +  private var numStartedBatches = 0
    +
    +  private val listener = new StreamingListener {
    +    override def onBatchStarted(batchStarted: 
StreamingListenerBatchStarted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numStartedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +    override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numCompletedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +  }
    +  ssc.addStreamingListener(listener)
    +
    +  def getNumCompletedBatches: Int = this.synchronized {
    +    numCompletedBatches
    +  }
    +
    +  def getNumStartedBatches: Int = this.synchronized {
    +    numStartedBatches
    +  }
    +
    +  /**
    +   * Block until the number of completed batches reaches the given 
threshold.
    +   */
    +  def waitForTotalBatchesCompleted(
    --- End diff --
    
    It occurred to me that this might be misleadingly-named since it waits 
until _at least_ that many batches have been processed.  To avoid this naming 
issue, plus a proliferation of similar methods, I might be able to just 
introduce a helper class that encapsulates this "synchronize on an object and 
wait for a condition involving it to become true" pattern.
    
    I'm imagining that it could look something vaguely like
    
    ```scala
    def waitUntil[T](obj: T, condition: T => Boolean, timeout: Long): Unit = {
      obj.synchronized {
         while(!condition(obj)) {
          [...] // do the wait() logic here
         }
       }
    }
    
    that encapsulates this wait / notify pattern, so could write something like
    
    ```scala
    waitUntil(waiter, _.completedBatches > 2, timeout, seconds(10)
    ```
    
    Or, with an implicit conversion, something like
    
    ```
    waiter.waitUntil(_.completedBatches > 2, timeout=seconds(10))
    ```
    
    which is a nice-looking syntax and avoids those issues of having to name 
inequalities.
    
    Similar to [your 
suggestion](https://github.com/apache/spark/pull/3868/files#r22447509) on 
another PR, we could add a `pollUntil` method that works for objects that don't 
support monitor notification / synchronization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to