Repository: spark Updated Branches: refs/heads/master a985dd8e9 -> 56a503df5
[SPARK-18670][SS] Limit the number of StreamingQueryListener.StreamProgressEvent when there is no data ## What changes were proposed in this pull request? This PR adds a sql conf `spark.sql.streaming.noDataReportInterval` to control how long to wait before outputing the next StreamProgressEvent when there is no data. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <shixi...@databricks.com> Closes #16108 from zsxwing/SPARK-18670. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56a503df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56a503df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56a503df Branch: refs/heads/master Commit: 56a503df5ccbb233ad6569e22002cc989e676337 Parents: a985dd8 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Fri Dec 2 12:42:47 2016 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Fri Dec 2 12:42:47 2016 -0800 ---------------------------------------------------------------------- .../execution/streaming/StreamExecution.scala | 18 +++++++- .../org/apache/spark/sql/internal/SQLConf.scala | 10 +++++ .../streaming/StreamingQueryListenerSuite.scala | 44 ++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/56a503df/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6d0e269..8804c64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -63,6 +63,9 @@ class StreamExecution( private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + private val noDataProgressEventInterval = + sparkSession.sessionState.conf.streamingNoDataProgressEventInterval + /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ @@ -196,6 +199,9 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) + // The timestamp we report an event that has no input data + var lastNoDataProgressEventTime = Long.MinValue + triggerExecutor.execute(() => { startTrigger() @@ -218,7 +224,17 @@ class StreamExecution( // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) - postEvent(new QueryProgressEvent(lastProgress)) + if (dataAvailable) { + // Reset noDataEventTimestamp if we processed any data + lastNoDataProgressEventTime = Long.MinValue + postEvent(new QueryProgressEvent(lastProgress)) + } else { + val now = triggerClock.getTimeMillis() + if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { + lastNoDataProgressEventTime = now + postEvent(new QueryProgressEvent(lastProgress)) + } + } if (dataAvailable) { // We'll increase currentBatchId after we complete processing current batch's data http://git-wip-us.apache.org/repos/asf/spark/blob/56a503df/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 200f060..5b45df6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -603,6 +603,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL = + SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval") + .internal() + .doc("How long to wait between two progress events when there is no data") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(10000L) + val STREAMING_METRICS_ENABLED = SQLConfigBuilder("spark.sql.streaming.metricsEnabled") .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.") @@ -684,6 +691,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY) + def streamingNoDataProgressEventInterval: Long = + getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL) + def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED) def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION) http://git-wip-us.apache.org/repos/asf/spark/blob/56a503df/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 07a13a4..3086abf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.PrivateMethodTester._ import org.apache.spark.SparkException import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.util.JsonProtocol @@ -46,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(spark.streams.active.isEmpty) assert(addedListeners.isEmpty) // Make sure we don't leak any events to the next test + spark.sparkContext.listenerBus.waitUntilEmpty(10000) } testQuietly("single listener, check trigger events are generated correctly") { @@ -191,6 +193,48 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(queryQueryTerminated.exception === newQueryTerminated.exception) } + test("only one progress event per interval when no data") { + // This test will start a query but not push any data, and then check if we push too many events + withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100ms") { + @volatile var numProgressEvent = 0 + val listener = new StreamingQueryListener { + override def onQueryStarted(event: QueryStartedEvent): Unit = {} + override def onQueryProgress(event: QueryProgressEvent): Unit = { + numProgressEvent += 1 + } + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} + } + spark.streams.addListener(listener) + try { + val input = new MemoryStream[Int](0, sqlContext) { + @volatile var numTriggers = 0 + override def getOffset: Option[Offset] = { + numTriggers += 1 + super.getOffset + } + } + val clock = new StreamManualClock() + val actions = mutable.ArrayBuffer[StreamAction]() + actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock) + for (_ <- 1 to 100) { + actions += AdvanceManualClock(10) + } + actions += AssertOnQuery { _ => + eventually(timeout(streamingTimeout)) { + assert(input.numTriggers > 100) // at least 100 triggers have occurred + } + true + } + testStream(input.toDS)(actions: _*) + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + // 11 is the max value of the possible numbers of events. + assert(numProgressEvent > 1 && numProgressEvent <= 11) + } finally { + spark.streams.removeListener(listener) + } + } + } + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { // query-event-logs-version-2.0.0.txt has all types of events generated by // Structured Streaming in Spark 2.0.0. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org