Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/3801#discussion_r22449114 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala --- @@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], } /** + * This is an interface that can be used to block until certain events occur, such as + * the start/completion of batches. This is much less brittle than waiting on wall-clock time. + * Internally, this is implemented using a StreamingListener. Constructing a new instance of this + * class automatically registers a StreamingListener on the given StreamingContext. + */ +class StreamingTestWaiter(ssc: StreamingContext) { + + // All access to this state should be guarded by `StreamingTestWaiter.this.synchronized` + private var numCompletedBatches = 0 + private var numStartedBatches = 0 + + private val listener = new StreamingListener { + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = + StreamingTestWaiter.this.synchronized { + numStartedBatches += 1 + StreamingTestWaiter.this.notifyAll() + } + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = + StreamingTestWaiter.this.synchronized { + numCompletedBatches += 1 + StreamingTestWaiter.this.notifyAll() + } + } + ssc.addStreamingListener(listener) + + def getNumCompletedBatches: Int = this.synchronized { + numCompletedBatches + } + + def getNumStartedBatches: Int = this.synchronized { + numStartedBatches + } + + /** + * Block until the number of completed batches reaches the given threshold. + */ + def waitForTotalBatchesCompleted( --- End diff -- It occurred to me that this might be misleadingly-named since it waits until _at least_ that many batches have been processed. To avoid this naming issue, plus a proliferation of similar methods, I might be able to just introduce a helper class that encapsulates this "synchronize on an object and wait for a condition involving it to become true" pattern. I'm imagining that it could look something vaguely like ```scala def waitUntil[T](obj: T, condition: T => Boolean, timeout: Long): Unit = { obj.synchronized { while(!condition(obj)) { [...] // do the wait() logic here } } } that encapsulates this wait / notify pattern, so could write something like ```scala waitUntil(waiter, _.completedBatches > 2, timeout, seconds(10) ``` Or, with an implicit conversion, something like ``` waiter.waitUntil(_.completedBatches > 2, timeout=seconds(10)) ``` which is a nice-looking syntax and avoids those issues of having to name inequalities. Similar to [your suggestion](https://github.com/apache/spark/pull/3868/files#r22447509) on another PR, we could add a `pollUntil` method that works for objects that don't support monitor notification / synchronization.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org