Repository: spark
Updated Branches:
  refs/heads/master b6e71032d -> dad499f32


[SPARK-20209][SS] Execute next trigger immediately if previous batch took 
longer than trigger interval

## What changes were proposed in this pull request?

For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, 
then it will wait for 9 mins before starting the next batch. This does not make 
sense. The processing time based trigger policy should be to do process batches 
as fast as possible, but no faster than 1 in every trigger interval. If batches 
are taking longer than trigger interval anyways, then no point waiting extra 
trigger interval.

In this PR, I modified the ProcessingTimeExecutor to do so. Another minor 
change I did was to extract our StreamManualClock into a separate class so that 
it can be used outside subclasses of StreamTest. For example, 
ProcessingTimeExecutorSuite does not need to create any context for testing, 
just needs the StreamManualClock.

## How was this patch tested?
Added new unit tests to comprehensively test this behavior.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #17525 from tdas/SPARK-20209.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dad499f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dad499f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dad499f3

Branch: refs/heads/master
Commit: dad499f324c6a93650aecfeb8cde10a405372930
Parents: b6e7103
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Apr 4 23:20:17 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Apr 4 23:20:17 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  1 +
 .../execution/streaming/TriggerExecutor.scala   | 17 ++--
 .../streaming/ProcessingTimeExecutorSuite.scala | 83 +++++++++++++++++--
 .../sql/streaming/FileStreamSourceSuite.scala   |  1 +
 .../streaming/FlatMapGroupsWithStateSuite.scala |  3 +-
 .../spark/sql/streaming/StreamSuite.scala       |  1 +
 .../apache/spark/sql/streaming/StreamTest.scala | 20 +----
 .../streaming/StreamingAggregationSuite.scala   |  1 +
 .../streaming/StreamingQueryListenerSuite.scala |  1 +
 .../sql/streaming/StreamingQuerySuite.scala     | 87 ++++++++++++--------
 .../sql/streaming/util/StreamManualClock.scala  | 51 ++++++++++++
 11 files changed, 194 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 6391d62..0046ba7 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index 02996ac..d188566 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -47,21 +47,22 @@ case class ProcessingTimeExecutor(processingTime: 
ProcessingTime, clock: Clock =
   extends TriggerExecutor with Logging {
 
   private val intervalMs = processingTime.intervalMs
+  require(intervalMs >= 0)
 
-  override def execute(batchRunner: () => Boolean): Unit = {
+  override def execute(triggerHandler: () => Boolean): Unit = {
     while (true) {
-      val batchStartTimeMs = clock.getTimeMillis()
-      val terminated = !batchRunner()
+      val triggerTimeMs = clock.getTimeMillis
+      val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
+      val terminated = !triggerHandler()
       if (intervalMs > 0) {
-        val batchEndTimeMs = clock.getTimeMillis()
-        val batchElapsedTimeMs = batchEndTimeMs - batchStartTimeMs
+        val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
         if (batchElapsedTimeMs > intervalMs) {
           notifyBatchFallingBehind(batchElapsedTimeMs)
         }
         if (terminated) {
           return
         }
-        clock.waitTillTime(nextBatchTime(batchEndTimeMs))
+        clock.waitTillTime(nextTriggerTimeMs)
       } else {
         if (terminated) {
           return
@@ -70,7 +71,7 @@ case class ProcessingTimeExecutor(processingTime: 
ProcessingTime, clock: Clock =
     }
   }
 
-  /** Called when a batch falls behind. Expose for test only */
+  /** Called when a batch falls behind */
   def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = {
     logWarning("Current batch is falling behind. The trigger interval is " +
       s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} 
milliseconds")
@@ -83,6 +84,6 @@ case class ProcessingTimeExecutor(processingTime: 
ProcessingTime, clock: Clock =
    * an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather 
than `0`).
    */
   def nextBatchTime(now: Long): Long = {
-    now / intervalMs * intervalMs + intervalMs
+    if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
index 00d5e05..007554a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
@@ -17,14 +17,24 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.mutable
+
+import org.eclipse.jetty.util.ConcurrentHashSet
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.streaming.ProcessingTime
-import org.apache.spark.util.{Clock, ManualClock, SystemClock}
+import org.apache.spark.sql.streaming.util.StreamManualClock
 
 class ProcessingTimeExecutorSuite extends SparkFunSuite {
 
+  val timeout = 10.seconds
+
   test("nextBatchTime") {
     val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
     assert(processingTimeExecutor.nextBatchTime(0) === 100)
@@ -35,6 +45,57 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
     assert(processingTimeExecutor.nextBatchTime(150) === 200)
   }
 
+  test("trigger timing") {
+    val triggerTimes = new ConcurrentHashSet[Int]
+    val clock = new StreamManualClock()
+    @volatile var continueExecuting = true
+    @volatile var clockIncrementInTrigger = 0L
+    val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), 
clock)
+    val executorThread = new Thread() {
+      override def run(): Unit = {
+        executor.execute(() => {
+          // Record the trigger time, increment clock if needed and
+          triggerTimes.add(clock.getTimeMillis.toInt)
+          clock.advance(clockIncrementInTrigger)
+          clockIncrementInTrigger = 0 // reset this so that there are no 
runaway triggers
+          continueExecuting
+        })
+      }
+    }
+    executorThread.start()
+    // First batch should execute immediately, then executor should wait for 
next one
+    eventually {
+      assert(triggerTimes.contains(0))
+      assert(clock.isStreamWaitingAt(0))
+      assert(clock.isStreamWaitingFor(1000))
+    }
+
+    // Second batch should execute when clock reaches the next trigger time.
+    // If next trigger takes less than the trigger interval, executor should 
wait for next one
+    clockIncrementInTrigger = 500
+    clock.setTime(1000)
+    eventually {
+      assert(triggerTimes.contains(1000))
+      assert(clock.isStreamWaitingAt(1500))
+      assert(clock.isStreamWaitingFor(2000))
+    }
+
+    // If next trigger takes less than the trigger interval, executor should 
immediately execute
+    // another one
+    clockIncrementInTrigger = 1500
+    clock.setTime(2000)   // allow another trigger by setting clock to 2000
+    eventually {
+      // Since the next trigger will take 1500 (which is more than trigger 
interval of 1000)
+      // executor will immediately execute another trigger
+      assert(triggerTimes.contains(2000) && triggerTimes.contains(3500))
+      assert(clock.isStreamWaitingAt(3500))
+      assert(clock.isStreamWaitingFor(4000))
+    }
+    continueExecuting = false
+    clock.advance(1000)
+    waitForThreadJoin(executorThread)
+  }
+
   test("calling nextBatchTime with the result of a previous call should return 
the next interval") {
     val intervalMS = 100
     val processingTimeExecutor = 
ProcessingTimeExecutor(ProcessingTime(intervalMS))
@@ -54,7 +115,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
     val processingTimeExecutor = 
ProcessingTimeExecutor(ProcessingTime(intervalMs))
     processingTimeExecutor.execute(() => {
       batchCounts += 1
-      // If the batch termination works well, batchCounts should be 3 after 
`execute`
+      // If the batch termination works correctly, batchCounts should be 3 
after `execute`
       batchCounts < 3
     })
     assert(batchCounts === 3)
@@ -66,9 +127,8 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
   }
 
   test("notifyBatchFallingBehind") {
-    val clock = new ManualClock()
+    val clock = new StreamManualClock()
     @volatile var batchFallingBehindCalled = false
-    val latch = new CountDownLatch(1)
     val t = new Thread() {
       override def run(): Unit = {
         val processingTimeExecutor = new 
ProcessingTimeExecutor(ProcessingTime(100), clock) {
@@ -77,7 +137,6 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
           }
         }
         processingTimeExecutor.execute(() => {
-          latch.countDown()
           clock.waitTillTime(200)
           false
         })
@@ -85,9 +144,17 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
     }
     t.start()
     // Wait until the batch is running so that we don't call `advance` too 
early
-    assert(latch.await(10, TimeUnit.SECONDS), "the batch has not yet started 
in 10 seconds")
+    eventually { assert(clock.isStreamWaitingFor(200)) }
     clock.advance(200)
-    t.join()
+    waitForThreadJoin(t)
     assert(batchFallingBehindCalled === true)
   }
+
+  private def eventually(body: => Unit): Unit = {
+    Eventually.eventually(Timeout(timeout)) { body }
+  }
+
+  private def waitForThreadJoin(thread: Thread): Unit = {
+    failAfter(timeout) { thread.join() }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 171877a..2696778 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, 
SeenFilesMap}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._
+import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index c8e31e3..85aa7db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -21,8 +21,6 @@ import java.sql.Date
 import java.util.concurrent.ConcurrentHashMap
 
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually.eventually
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
 import org.apache.spark.SparkException
 import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
@@ -35,6 +33,7 @@ import org.apache.spark.sql.execution.RDDScanExec
 import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, 
GroupStateImpl, MemoryStream}
 import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreId, StoreUpdate}
 import 
org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore
+import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.types.{DataType, IntegerType}
 
 /** Class to check custom state types */

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 388f154..5ab9dc2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.StreamSourceProvider
+import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 951ff2c..03aa45b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -214,24 +214,6 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       AssertOnQuery(query => { func(query); true })
   }
 
-  class StreamManualClock(time: Long = 0L) extends ManualClock(time) with 
Serializable {
-    private var waitStartTime: Option[Long] = None
-
-    override def waitTillTime(targetTime: Long): Long = synchronized {
-      try {
-        waitStartTime = Some(getTimeMillis())
-        super.waitTillTime(targetTime)
-      } finally {
-        waitStartTime = None
-      }
-    }
-
-    def isStreamWaitingAt(time: Long): Boolean = synchronized {
-      waitStartTime == Some(time)
-    }
-  }
-
-
   /**
    * Executes the specified actions on the given streaming DataFrame and 
provides helpful
    * error messages in the case of failures or incorrect answers.
@@ -242,6 +224,8 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
   def testStream(
       _stream: Dataset[_],
       outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): 
Unit = synchronized {
+    import org.apache.spark.sql.streaming.util.StreamManualClock
+
     // `synchronized` is added to prevent the user from calling multiple 
`testStream`s concurrently
     // because this method assumes there is only one active query in its 
`StreamingQueryListener`
     // and it may not work correctly when multiple `testStream`s run 
concurrently.

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 600c039..e5d5b4f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -30,6 +30,7 @@ import 
org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming.OutputMode._
+import org.apache.spark.sql.streaming.util.StreamManualClock
 
 object FailureSinglton {
   var firstTime = true

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 03dad8a..b8a694c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.{Encoder, SparkSession}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.util.JsonProtocol
 
 class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 1172531..2ebbfcd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider}
+import org.apache.spark.sql.streaming.util.{BlockingSource, 
MockSourceProvider, StreamManualClock}
 import org.apache.spark.util.ManualClock
 
 
@@ -207,46 +207,53 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
     /** Custom MemoryStream that waits for manual clock to reach a time */
     val inputData = new MemoryStream[Int](0, sqlContext) {
-      // Wait for manual clock to be 100 first time there is data
+      // getOffset should take 50 ms the first time it is called
       override def getOffset: Option[Offset] = {
         val offset = super.getOffset
         if (offset.nonEmpty) {
-          clock.waitTillTime(300)
+          clock.waitTillTime(1050)
         }
         offset
       }
 
-      // Wait for manual clock to be 300 first time there is data
+      // getBatch should take 100 ms the first time it is called
       override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-        clock.waitTillTime(600)
+        if (start.isEmpty) clock.waitTillTime(1150)
         super.getBatch(start, end)
       }
     }
 
-    // This is to make sure thatquery waits for manual clock to be 600 first 
time there is data
-    val mapped = inputData.toDS().as[Long].map { x =>
-      clock.waitTillTime(1100)
+    // query execution should take 350 ms the first time it is called
+    val mapped = inputData.toDS.coalesce(1).as[Long].map { x =>
+      clock.waitTillTime(1500)  // this will only wait the first time when 
clock < 1500
       10 / x
     }.agg(count("*")).as[Long]
 
-    case class AssertStreamExecThreadToWaitForClock()
+    case class AssertStreamExecThreadIsWaitingForTime(targetTime: Long)
       extends AssertOnQuery(q => {
         eventually(Timeout(streamingTimeout)) {
           if (q.exception.isEmpty) {
-            
assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
+            assert(clock.isStreamWaitingFor(targetTime))
           }
         }
         if (q.exception.isDefined) {
           throw q.exception.get
         }
         true
-      }, "")
+      }, "") {
+      override def toString: String = 
s"AssertStreamExecThreadIsWaitingForTime($targetTime)"
+    }
+
+    case class AssertClockTime(time: Long)
+      extends AssertOnQuery(q => clock.getTimeMillis() === time, "") {
+      override def toString: String = s"AssertClockTime($time)"
+    }
 
     var lastProgressBeforeStop: StreamingQueryProgress = null
 
     testStream(mapped, OutputMode.Complete)(
-      StartStream(ProcessingTime(100), triggerClock = clock),
-      AssertStreamExecThreadToWaitForClock(),
+      StartStream(ProcessingTime(1000), triggerClock = clock),
+      AssertStreamExecThreadIsWaitingForTime(1000),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
       AssertOnQuery(_.status.message === "Waiting for next trigger"),
@@ -254,33 +261,37 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
       // Test status and progress while offset is being fetched
       AddData(inputData, 1, 2),
-      AdvanceManualClock(100), // time = 100 to start new trigger, will block 
on getOffset
-      AssertStreamExecThreadToWaitForClock(),
+      AdvanceManualClock(1000), // time = 1000 to start new trigger, will 
block on getOffset
+      AssertStreamExecThreadIsWaitingForTime(1050),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
       AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
       // Test status and progress while batch is being fetched
-      AdvanceManualClock(200), // time = 300 to unblock getOffset, will block 
on getBatch
-      AssertStreamExecThreadToWaitForClock(),
+      AdvanceManualClock(50), // time = 1050 to unblock getOffset
+      AssertClockTime(1050),
+      AssertStreamExecThreadIsWaitingForTime(1150),      // will block on 
getBatch that needs 1150
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
       // Test status and progress while batch is being processed
-      AdvanceManualClock(300), // time = 600 to unblock getBatch, will block 
in Spark job
+      AdvanceManualClock(100), // time = 1150 to unblock getBatch
+      AssertClockTime(1150),
+      AssertStreamExecThreadIsWaitingForTime(1500), // will block in Spark job 
that needs 1500
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
       // Test status and progress while batch processing has completed
-      AdvanceManualClock(500), // time = 1100 to unblock job
-      AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
+      AssertOnQuery { _ => clock.getTimeMillis() === 1150 },
+      AdvanceManualClock(350), // time = 1500 to unblock job
+      AssertClockTime(1500),
       CheckAnswer(2),
-      AssertStreamExecThreadToWaitForClock(),
+      AssertStreamExecThreadIsWaitingForTime(2000),
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === false),
       AssertOnQuery(_.status.message === "Waiting for next trigger"),
@@ -293,21 +304,21 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         assert(progress.id === query.id)
         assert(progress.name === query.name)
         assert(progress.batchId === 0)
-        assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in 
UTC
+        assert(progress.timestamp === "1970-01-01T00:00:01.000Z") // 100 ms in 
UTC
         assert(progress.numInputRows === 2)
-        assert(progress.processedRowsPerSecond === 2.0)
+        assert(progress.processedRowsPerSecond === 4.0)
 
-        assert(progress.durationMs.get("getOffset") === 200)
-        assert(progress.durationMs.get("getBatch") === 300)
+        assert(progress.durationMs.get("getOffset") === 50)
+        assert(progress.durationMs.get("getBatch") === 100)
         assert(progress.durationMs.get("queryPlanning") === 0)
         assert(progress.durationMs.get("walCommit") === 0)
-        assert(progress.durationMs.get("triggerExecution") === 1000)
+        assert(progress.durationMs.get("triggerExecution") === 500)
 
         assert(progress.sources.length === 1)
         assert(progress.sources(0).description contains "MemoryStream")
         assert(progress.sources(0).startOffset === null)
         assert(progress.sources(0).endOffset !== null)
-        assert(progress.sources(0).processedRowsPerSecond === 2.0)
+        assert(progress.sources(0).processedRowsPerSecond === 4.0)  // 2 rows 
processed in 500 ms
 
         assert(progress.stateOperators.length === 1)
         assert(progress.stateOperators(0).numRowsUpdated === 1)
@@ -317,9 +328,12 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         true
       },
 
+      // Test whether input rate is updated after two batches
+      AssertStreamExecThreadIsWaitingForTime(2000),  // blocked waiting for 
next trigger time
       AddData(inputData, 1, 2),
-      AdvanceManualClock(100), // allow another trigger
-      AssertStreamExecThreadToWaitForClock(),
+      AdvanceManualClock(500), // allow another trigger
+      AssertClockTime(2000),
+      AssertStreamExecThreadIsWaitingForTime(3000),  // will block waiting for 
next trigger time
       CheckAnswer(4),
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === false),
@@ -327,13 +341,14 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       AssertOnQuery { query =>
         assert(query.recentProgress.last.eq(query.lastProgress))
         assert(query.lastProgress.batchId === 1)
-        assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
+        assert(query.lastProgress.inputRowsPerSecond === 2.0)
+        assert(query.lastProgress.sources(0).inputRowsPerSecond === 2.0)
         true
       },
 
       // Test status and progress after data is not available for a trigger
-      AdvanceManualClock(100), // allow another trigger
-      AssertStreamExecThreadToWaitForClock(),
+      AdvanceManualClock(1000), // allow another trigger
+      AssertStreamExecThreadIsWaitingForTime(4000),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
       AssertOnQuery(_.status.message === "Waiting for next trigger"),
@@ -350,10 +365,10 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       AssertOnQuery(_.status.message === "Stopped"),
 
       // Test status and progress after query terminated with error
-      StartStream(ProcessingTime(100), triggerClock = clock),
-      AdvanceManualClock(100), // ensure initial trigger completes before 
AddData
+      StartStream(ProcessingTime(1000), triggerClock = clock),
+      AdvanceManualClock(1000), // ensure initial trigger completes before 
AddData
       AddData(inputData, 0),
-      AdvanceManualClock(100), // allow another trigger
+      AdvanceManualClock(1000), // allow another trigger
       ExpectFailure[SparkException](),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
@@ -678,5 +693,5 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
 object StreamingQuerySuite {
   // Singleton reference to clock that does not get serialized in task closures
-  var clock: ManualClock = null
+  var clock: StreamManualClock = null
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala
new file mode 100644
index 0000000..c769a79
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.util.ManualClock
+
+/**
+ * ManualClock used for streaming tests that allows checking whether the 
stream is waiting
+ * on the clock at expected times.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with 
Serializable {
+  private var waitStartTime: Option[Long] = None
+  private var waitTargetTime: Option[Long] = None
+
+  override def waitTillTime(targetTime: Long): Long = synchronized {
+    try {
+      waitStartTime = Some(getTimeMillis())
+      waitTargetTime = Some(targetTime)
+      super.waitTillTime(targetTime)
+    } finally {
+      waitStartTime = None
+      waitTargetTime = None
+    }
+  }
+
+  /** Is the streaming thread waiting for the clock to advance when it is at 
the given time */
+  def isStreamWaitingAt(time: Long): Boolean = synchronized {
+    waitStartTime == Some(time)
+  }
+
+  /** Is the streaming thread waiting for clock to advance to the given time */
+  def isStreamWaitingFor(target: Long): Boolean = synchronized {
+    waitTargetTime == Some(target)
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to