Repository: spark Updated Branches: refs/heads/master b6e71032d -> dad499f32
[SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval ## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #17525 from tdas/SPARK-20209. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dad499f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dad499f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dad499f3 Branch: refs/heads/master Commit: dad499f324c6a93650aecfeb8cde10a405372930 Parents: b6e7103 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Apr 4 23:20:17 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Apr 4 23:20:17 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaSourceSuite.scala | 1 + .../execution/streaming/TriggerExecutor.scala | 17 ++-- .../streaming/ProcessingTimeExecutorSuite.scala | 83 +++++++++++++++++-- .../sql/streaming/FileStreamSourceSuite.scala | 1 + .../streaming/FlatMapGroupsWithStateSuite.scala | 3 +- .../spark/sql/streaming/StreamSuite.scala | 1 + .../apache/spark/sql/streaming/StreamTest.scala | 20 +---- .../streaming/StreamingAggregationSuite.scala | 1 + .../streaming/StreamingQueryListenerSuite.scala | 1 + .../sql/streaming/StreamingQuerySuite.scala | 87 ++++++++++++-------- .../sql/streaming/util/StreamManualClock.scala | 51 ++++++++++++ 11 files changed, 194 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 6391d62..0046ba7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} +import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index 02996ac..d188566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -47,21 +47,22 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = extends TriggerExecutor with Logging { private val intervalMs = processingTime.intervalMs + require(intervalMs >= 0) - override def execute(batchRunner: () => Boolean): Unit = { + override def execute(triggerHandler: () => Boolean): Unit = { while (true) { - val batchStartTimeMs = clock.getTimeMillis() - val terminated = !batchRunner() + val triggerTimeMs = clock.getTimeMillis + val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) + val terminated = !triggerHandler() if (intervalMs > 0) { - val batchEndTimeMs = clock.getTimeMillis() - val batchElapsedTimeMs = batchEndTimeMs - batchStartTimeMs + val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs if (batchElapsedTimeMs > intervalMs) { notifyBatchFallingBehind(batchElapsedTimeMs) } if (terminated) { return } - clock.waitTillTime(nextBatchTime(batchEndTimeMs)) + clock.waitTillTime(nextTriggerTimeMs) } else { if (terminated) { return @@ -70,7 +71,7 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = } } - /** Called when a batch falls behind. Expose for test only */ + /** Called when a batch falls behind */ def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = { logWarning("Current batch is falling behind. The trigger interval is " + s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds") @@ -83,6 +84,6 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = * an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`). */ def nextBatchTime(now: Long): Long = { - now / intervalMs * intervalMs + intervalMs + if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs } } http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 00d5e05..007554a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -17,14 +17,24 @@ package org.apache.spark.sql.execution.streaming -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.mutable + +import org.eclipse.jetty.util.ConcurrentHashSet +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.streaming.ProcessingTime -import org.apache.spark.util.{Clock, ManualClock, SystemClock} +import org.apache.spark.sql.streaming.util.StreamManualClock class ProcessingTimeExecutorSuite extends SparkFunSuite { + val timeout = 10.seconds + test("nextBatchTime") { val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100)) assert(processingTimeExecutor.nextBatchTime(0) === 100) @@ -35,6 +45,57 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { assert(processingTimeExecutor.nextBatchTime(150) === 200) } + test("trigger timing") { + val triggerTimes = new ConcurrentHashSet[Int] + val clock = new StreamManualClock() + @volatile var continueExecuting = true + @volatile var clockIncrementInTrigger = 0L + val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), clock) + val executorThread = new Thread() { + override def run(): Unit = { + executor.execute(() => { + // Record the trigger time, increment clock if needed and + triggerTimes.add(clock.getTimeMillis.toInt) + clock.advance(clockIncrementInTrigger) + clockIncrementInTrigger = 0 // reset this so that there are no runaway triggers + continueExecuting + }) + } + } + executorThread.start() + // First batch should execute immediately, then executor should wait for next one + eventually { + assert(triggerTimes.contains(0)) + assert(clock.isStreamWaitingAt(0)) + assert(clock.isStreamWaitingFor(1000)) + } + + // Second batch should execute when clock reaches the next trigger time. + // If next trigger takes less than the trigger interval, executor should wait for next one + clockIncrementInTrigger = 500 + clock.setTime(1000) + eventually { + assert(triggerTimes.contains(1000)) + assert(clock.isStreamWaitingAt(1500)) + assert(clock.isStreamWaitingFor(2000)) + } + + // If next trigger takes less than the trigger interval, executor should immediately execute + // another one + clockIncrementInTrigger = 1500 + clock.setTime(2000) // allow another trigger by setting clock to 2000 + eventually { + // Since the next trigger will take 1500 (which is more than trigger interval of 1000) + // executor will immediately execute another trigger + assert(triggerTimes.contains(2000) && triggerTimes.contains(3500)) + assert(clock.isStreamWaitingAt(3500)) + assert(clock.isStreamWaitingFor(4000)) + } + continueExecuting = false + clock.advance(1000) + waitForThreadJoin(executorThread) + } + test("calling nextBatchTime with the result of a previous call should return the next interval") { val intervalMS = 100 val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS)) @@ -54,7 +115,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs)) processingTimeExecutor.execute(() => { batchCounts += 1 - // If the batch termination works well, batchCounts should be 3 after `execute` + // If the batch termination works correctly, batchCounts should be 3 after `execute` batchCounts < 3 }) assert(batchCounts === 3) @@ -66,9 +127,8 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { } test("notifyBatchFallingBehind") { - val clock = new ManualClock() + val clock = new StreamManualClock() @volatile var batchFallingBehindCalled = false - val latch = new CountDownLatch(1) val t = new Thread() { override def run(): Unit = { val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTime(100), clock) { @@ -77,7 +137,6 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { } } processingTimeExecutor.execute(() => { - latch.countDown() clock.waitTillTime(200) false }) @@ -85,9 +144,17 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { } t.start() // Wait until the batch is running so that we don't call `advance` too early - assert(latch.await(10, TimeUnit.SECONDS), "the batch has not yet started in 10 seconds") + eventually { assert(clock.isStreamWaitingFor(200)) } clock.advance(200) - t.join() + waitForThreadJoin(t) assert(batchFallingBehindCalled === true) } + + private def eventually(body: => Unit): Unit = { + Eventually.eventually(Timeout(timeout)) { body } + } + + private def waitForThreadJoin(thread: Thread): Unit = { + failAfter(timeout) { thread.join() } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 171877a..2696778 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ +import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index c8e31e3..85aa7db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -21,8 +21,6 @@ import java.sql.Date import java.util.concurrent.ConcurrentHashMap import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually.eventually -import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.SparkException import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction @@ -35,6 +33,7 @@ import org.apache.spark.sql.execution.RDDScanExec import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, GroupStateImpl, MemoryStream} import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreId, StoreUpdate} import org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore +import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.{DataType, IntegerType} /** Class to check custom state types */ http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 388f154..5ab9dc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider +import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 951ff2c..03aa45b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -214,24 +214,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { AssertOnQuery(query => { func(query); true }) } - class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { - private var waitStartTime: Option[Long] = None - - override def waitTillTime(targetTime: Long): Long = synchronized { - try { - waitStartTime = Some(getTimeMillis()) - super.waitTillTime(targetTime) - } finally { - waitStartTime = None - } - } - - def isStreamWaitingAt(time: Long): Boolean = synchronized { - waitStartTime == Some(time) - } - } - - /** * Executes the specified actions on the given streaming DataFrame and provides helpful * error messages in the case of failures or incorrect answers. @@ -242,6 +224,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { def testStream( _stream: Dataset[_], outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized { + import org.apache.spark.sql.streaming.util.StreamManualClock + // `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently // because this method assumes there is only one active query in its `StreamingQueryListener` // and it may not work correctly when multiple `testStream`s run concurrently. http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 600c039..e5d5b4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.OutputMode._ +import org.apache.spark.sql.streaming.util.StreamManualClock object FailureSinglton { var firstTime = true http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 03dad8a..b8a694c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.{Encoder, SparkSession} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryListener._ +import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.util.JsonProtocol class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 1172531..2ebbfcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider} +import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} import org.apache.spark.util.ManualClock @@ -207,46 +207,53 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi /** Custom MemoryStream that waits for manual clock to reach a time */ val inputData = new MemoryStream[Int](0, sqlContext) { - // Wait for manual clock to be 100 first time there is data + // getOffset should take 50 ms the first time it is called override def getOffset: Option[Offset] = { val offset = super.getOffset if (offset.nonEmpty) { - clock.waitTillTime(300) + clock.waitTillTime(1050) } offset } - // Wait for manual clock to be 300 first time there is data + // getBatch should take 100 ms the first time it is called override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - clock.waitTillTime(600) + if (start.isEmpty) clock.waitTillTime(1150) super.getBatch(start, end) } } - // This is to make sure thatquery waits for manual clock to be 600 first time there is data - val mapped = inputData.toDS().as[Long].map { x => - clock.waitTillTime(1100) + // query execution should take 350 ms the first time it is called + val mapped = inputData.toDS.coalesce(1).as[Long].map { x => + clock.waitTillTime(1500) // this will only wait the first time when clock < 1500 10 / x }.agg(count("*")).as[Long] - case class AssertStreamExecThreadToWaitForClock() + case class AssertStreamExecThreadIsWaitingForTime(targetTime: Long) extends AssertOnQuery(q => { eventually(Timeout(streamingTimeout)) { if (q.exception.isEmpty) { - assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis)) + assert(clock.isStreamWaitingFor(targetTime)) } } if (q.exception.isDefined) { throw q.exception.get } true - }, "") + }, "") { + override def toString: String = s"AssertStreamExecThreadIsWaitingForTime($targetTime)" + } + + case class AssertClockTime(time: Long) + extends AssertOnQuery(q => clock.getTimeMillis() === time, "") { + override def toString: String = s"AssertClockTime($time)" + } var lastProgressBeforeStop: StreamingQueryProgress = null testStream(mapped, OutputMode.Complete)( - StartStream(ProcessingTime(100), triggerClock = clock), - AssertStreamExecThreadToWaitForClock(), + StartStream(ProcessingTime(1000), triggerClock = clock), + AssertStreamExecThreadIsWaitingForTime(1000), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), @@ -254,33 +261,37 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi // Test status and progress while offset is being fetched AddData(inputData, 1, 2), - AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset - AssertStreamExecThreadToWaitForClock(), + AdvanceManualClock(1000), // time = 1000 to start new trigger, will block on getOffset + AssertStreamExecThreadIsWaitingForTime(1050), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message.startsWith("Getting offsets from")), AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being fetched - AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch - AssertStreamExecThreadToWaitForClock(), + AdvanceManualClock(50), // time = 1050 to unblock getOffset + AssertClockTime(1050), + AssertStreamExecThreadIsWaitingForTime(1150), // will block on getBatch that needs 1150 AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being processed - AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job + AdvanceManualClock(100), // time = 1150 to unblock getBatch + AssertClockTime(1150), + AssertStreamExecThreadIsWaitingForTime(1500), // will block in Spark job that needs 1500 AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch processing has completed - AdvanceManualClock(500), // time = 1100 to unblock job - AssertOnQuery { _ => clock.getTimeMillis() === 1100 }, + AssertOnQuery { _ => clock.getTimeMillis() === 1150 }, + AdvanceManualClock(350), // time = 1500 to unblock job + AssertClockTime(1500), CheckAnswer(2), - AssertStreamExecThreadToWaitForClock(), + AssertStreamExecThreadIsWaitingForTime(2000), AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), @@ -293,21 +304,21 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.id === query.id) assert(progress.name === query.name) assert(progress.batchId === 0) - assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC + assert(progress.timestamp === "1970-01-01T00:00:01.000Z") // 100 ms in UTC assert(progress.numInputRows === 2) - assert(progress.processedRowsPerSecond === 2.0) + assert(progress.processedRowsPerSecond === 4.0) - assert(progress.durationMs.get("getOffset") === 200) - assert(progress.durationMs.get("getBatch") === 300) + assert(progress.durationMs.get("getOffset") === 50) + assert(progress.durationMs.get("getBatch") === 100) assert(progress.durationMs.get("queryPlanning") === 0) assert(progress.durationMs.get("walCommit") === 0) - assert(progress.durationMs.get("triggerExecution") === 1000) + assert(progress.durationMs.get("triggerExecution") === 500) assert(progress.sources.length === 1) assert(progress.sources(0).description contains "MemoryStream") assert(progress.sources(0).startOffset === null) assert(progress.sources(0).endOffset !== null) - assert(progress.sources(0).processedRowsPerSecond === 2.0) + assert(progress.sources(0).processedRowsPerSecond === 4.0) // 2 rows processed in 500 ms assert(progress.stateOperators.length === 1) assert(progress.stateOperators(0).numRowsUpdated === 1) @@ -317,9 +328,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi true }, + // Test whether input rate is updated after two batches + AssertStreamExecThreadIsWaitingForTime(2000), // blocked waiting for next trigger time AddData(inputData, 1, 2), - AdvanceManualClock(100), // allow another trigger - AssertStreamExecThreadToWaitForClock(), + AdvanceManualClock(500), // allow another trigger + AssertClockTime(2000), + AssertStreamExecThreadIsWaitingForTime(3000), // will block waiting for next trigger time CheckAnswer(4), AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === false), @@ -327,13 +341,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery { query => assert(query.recentProgress.last.eq(query.lastProgress)) assert(query.lastProgress.batchId === 1) - assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818) + assert(query.lastProgress.inputRowsPerSecond === 2.0) + assert(query.lastProgress.sources(0).inputRowsPerSecond === 2.0) true }, // Test status and progress after data is not available for a trigger - AdvanceManualClock(100), // allow another trigger - AssertStreamExecThreadToWaitForClock(), + AdvanceManualClock(1000), // allow another trigger + AssertStreamExecThreadIsWaitingForTime(4000), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), @@ -350,10 +365,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery(_.status.message === "Stopped"), // Test status and progress after query terminated with error - StartStream(ProcessingTime(100), triggerClock = clock), - AdvanceManualClock(100), // ensure initial trigger completes before AddData + StartStream(ProcessingTime(1000), triggerClock = clock), + AdvanceManualClock(1000), // ensure initial trigger completes before AddData AddData(inputData, 0), - AdvanceManualClock(100), // allow another trigger + AdvanceManualClock(1000), // allow another trigger ExpectFailure[SparkException](), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), @@ -678,5 +693,5 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi object StreamingQuerySuite { // Singleton reference to clock that does not get serialized in task closures - var clock: ManualClock = null + var clock: StreamManualClock = null } http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala new file mode 100644 index 0000000..c769a79 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StreamManualClock.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.util + +import org.apache.spark.util.ManualClock + +/** + * ManualClock used for streaming tests that allows checking whether the stream is waiting + * on the clock at expected times. + */ +class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { + private var waitStartTime: Option[Long] = None + private var waitTargetTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { + try { + waitStartTime = Some(getTimeMillis()) + waitTargetTime = Some(targetTime) + super.waitTillTime(targetTime) + } finally { + waitStartTime = None + waitTargetTime = None + } + } + + /** Is the streaming thread waiting for the clock to advance when it is at the given time */ + def isStreamWaitingAt(time: Long): Boolean = synchronized { + waitStartTime == Some(time) + } + + /** Is the streaming thread waiting for clock to advance to the given time */ + def isStreamWaitingFor(target: Long): Boolean = synchronized { + waitTargetTime == Some(target) + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org