This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4f96670 [SPARK-31953][SS] Add Spark Structured Streaming History Server Support 4f96670 is described below commit 4f9667035886a67e6c9a4e8fad2efa390e87ca68 Author: uncleGen <husty...@gmail.com> AuthorDate: Wed Dec 2 17:11:51 2020 -0800 [SPARK-31953][SS] Add Spark Structured Streaming History Server Support ### What changes were proposed in this pull request? Add Spark Structured Streaming History Server Support. ### Why are the changes needed? Add a streaming query history server plugin. ![image](https://user-images.githubusercontent.com/7402327/84248291-d26cfe80-ab3b-11ea-86d2-98205fa2bcc4.png) ![image](https://user-images.githubusercontent.com/7402327/84248347-e44ea180-ab3b-11ea-81de-eefe207656f2.png) ![image](https://user-images.githubusercontent.com/7402327/84248396-f0d2fa00-ab3b-11ea-9b0d-e410115471b0.png) - Follow-ups - Query duration should not update in history UI. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update UT. Closes #28781 from uncleGen/SPARK-31953. Lead-authored-by: uncleGen <husty...@gmail.com> Co-authored-by: Genmao Yu <husty...@gmail.com> Co-authored-by: Yuanjian Li <yuanjian...@databricks.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- dev/.rat-excludes | 1 + .../org.apache.spark.status.AppHistoryServerPlugin | 1 + .../streaming/StreamingQueryListenerBus.scala | 26 +++- .../ui/StreamingQueryHistoryServerPlugin.scala | 43 ++++++ .../execution/ui/StreamingQueryStatusStore.scala | 53 +++++++ .../apache/spark/sql/internal/SharedState.scala | 8 +- .../sql/streaming/StreamingQueryManager.scala | 3 +- .../sql/streaming/ui/StreamingQueryPage.scala | 44 +++--- .../ui/StreamingQueryStatisticsPage.scala | 27 ++-- .../ui/StreamingQueryStatusListener.scala | 166 +++++++++++++-------- .../spark/sql/streaming/ui/StreamingQueryTab.scala | 3 +- .../apache/spark/sql/streaming/ui/UIUtils.scala | 12 +- .../resources/spark-events/local-1596020211915 | 160 ++++++++++++++++++++ .../org/apache/spark/deploy/history/Utils.scala} | 39 ++--- .../streaming/ui/StreamingQueryHistorySuite.scala | 63 ++++++++ .../sql/streaming/ui/StreamingQueryPageSuite.scala | 42 +++--- .../ui/StreamingQueryStatusListenerSuite.scala | 159 ++++++++++++++++---- 17 files changed, 673 insertions(+), 177 deletions(-) diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 7da330d..167cf22 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -123,6 +123,7 @@ SessionHandler.java GangliaReporter.java application_1578436911597_0052 config.properties +local-1596020211915 app-20200706201101-0003 py.typed _metadata diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin index 0bba2f8..6771eef 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin @@ -1 +1,2 @@ org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin +org.apache.spark.sql.execution.ui.StreamingQueryHistoryServerPlugin diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 1b8d69f..4b98acd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -31,16 +31,21 @@ import org.apache.spark.util.ListenerBus * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them * to StreamingQueryListeners. * - * Note that each bus and its registered listeners are associated with a single SparkSession + * Note 1: Each bus and its registered listeners are associated with a single SparkSession * and StreamingQueryManager. So this bus will dispatch events to registered listeners for only * those queries that were started in the associated SparkSession. + * + * Note 2: To rebuild Structured Streaming UI in SHS, this bus will be registered into + * [[org.apache.spark.scheduler.ReplayListenerBus]]. We check `sparkListenerBus` defined or not to + * determine how to process [[StreamingQueryListener.Event]]. If false, it means this bus is used to + * replay all streaming query event from eventLog. */ -class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) +class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus]) extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] { import StreamingQueryListener._ - sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY) + sparkListenerBus.foreach(_.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY)) /** * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus @@ -67,11 +72,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) event match { case s: QueryStartedEvent => activeQueryRunIds.synchronized { activeQueryRunIds += s.runId } - sparkListenerBus.post(s) + sparkListenerBus.foreach(bus => bus.post(s)) // post to local listeners to trigger callbacks postToAll(s) case _ => - sparkListenerBus.post(event) + sparkListenerBus.foreach(bus => bus.post(event)) } } @@ -95,7 +100,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) // synchronously and the ones attached to LiveListenerBus asynchronously. Therefore, // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus // thread - if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) { + // + // When loaded by Spark History Server, we should process all event coming from replay + // listener bus. + if (sparkListenerBus.isEmpty || !LiveListenerBus.withinListenerThread.value || + !e.isInstanceOf[QueryStartedEvent]) { postToAll(e) } case _ => @@ -110,7 +119,10 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) listener: StreamingQueryListener, event: StreamingQueryListener.Event): Unit = { def shouldReport(runId: UUID): Boolean = { - activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) } + // When loaded by Spark History Server, we should process all event coming from replay + // listener bus. + sparkListenerBus.isEmpty || + activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) } } event match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala new file mode 100644 index 0000000..a127fa5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus +import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab} +import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore} +import org.apache.spark.ui.SparkUI + +class StreamingQueryHistoryServerPlugin extends AppHistoryServerPlugin { + + override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = { + val listenerBus = new StreamingQueryListenerBus(None) + listenerBus.addListener(new StreamingQueryStatusListener(conf, store)) + Seq(listenerBus) + } + + override def setupUI(ui: SparkUI): Unit = { + val streamingQueryStatusStore = new StreamingQueryStatusStore(ui.store.store) + if (streamingQueryStatusStore.allQueryUIData.nonEmpty) { + new StreamingQueryTab(streamingQueryStatusStore, ui) + } + } + + override def displayOrder: Int = 1 +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala new file mode 100644 index 0000000..9eb14a6a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.util.UUID + +import org.apache.spark.sql.streaming.ui.{StreamingQueryData, StreamingQueryProgressWrapper, StreamingQueryUIData} +import org.apache.spark.status.KVUtils +import org.apache.spark.util.kvstore.KVStore + +/** + * Provides a view of a KVStore with methods that make it easy to query Streaming Query state. + * There's no state kept in this class, so it's ok to have multiple instances of it in an + * application. + */ +class StreamingQueryStatusStore(store: KVStore) { + + def allQueryUIData: Seq[StreamingQueryUIData] = { + val view = store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L) + KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData) + } + + // visible for test + private[sql] def getQueryProgressData(runId: UUID): Seq[StreamingQueryProgressWrapper] = { + val view = store.view(classOf[StreamingQueryProgressWrapper]) + .index("runId").first(runId.toString).last(runId.toString) + KVUtils.viewToSeq(view, Int.MaxValue)(_ => true) + } + + private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = { + val runId = summary.runId.toString + val view = store.view(classOf[StreamingQueryProgressWrapper]) + .index("runId").first(runId).last(runId) + val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true) + .map(_.progress).sortBy(_.timestamp).toArray + StreamingQueryUIData(summary, recentProgress) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 89aceac..ea430db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.streaming.StreamExecution -import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} +import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab, StreamingQueryStatusStore} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab} import org.apache.spark.status.ElementTrackingStore @@ -111,9 +111,9 @@ private[sql] class SharedState( lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = { sparkContext.ui.flatMap { ui => if (conf.get(STREAMING_UI_ENABLED)) { - val statusListener = new StreamingQueryStatusListener(conf) - new StreamingQueryTab(statusListener, ui) - Some(statusListener) + val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] + new StreamingQueryTab(new StreamingQueryStatusStore(kvStore), ui) + Some(new StreamingQueryStatusListener(conf, kvStore)) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index ffdbe9d..b66037d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -49,7 +49,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) - private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus) + private val listenerBus = + new StreamingQueryListenerBus(Some(sparkSession.sparkContext.listenerBus)) @GuardedBy("activeQueriesSharedLock") private val activeQueries = new mutable.HashMap[UUID, StreamingQuery] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala index b98fdf1..96e4989 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -40,8 +40,8 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) } private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = { - val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus - .partition(_.isActive) + val (activeQueries, inactiveQueries) = + parent.store.allQueryUIData.partition(_.summary.isActive) val content = mutable.ListBuffer[Node]() // show active queries table only if there is at least one active query @@ -176,7 +176,7 @@ class StreamingQueryPagedTable( val streamingQuery = query.streamingUIData val statisticsLink = "%s/%s/statistics?id=%s" .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, - streamingQuery.runId) + streamingQuery.summary.runId) def details(detail: Any): Seq[Node] = { if (isActive) { @@ -194,14 +194,14 @@ class StreamingQueryPagedTable( <tr> <td>{UIUtils.getQueryName(streamingQuery)}</td> <td>{UIUtils.getQueryStatus(streamingQuery)}</td> - <td>{streamingQuery.id}</td> - <td><a href={statisticsLink}>{streamingQuery.runId}</a></td> - <td>{SparkUIUtils.formatDate(streamingQuery.startTimestamp)}</td> + <td>{streamingQuery.summary.id}</td> + <td><a href={statisticsLink}>{streamingQuery.summary.runId}</a></td> + <td>{SparkUIUtils.formatDate(streamingQuery.summary.startTimestamp)}</td> <td>{SparkUIUtils.formatDurationVerbose(query.duration)}</td> <td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td> <td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td> <td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td> - {details(streamingQuery.exception.getOrElse("-"))} + {details(streamingQuery.summary.exception.getOrElse("-"))} </tr> } } @@ -222,32 +222,32 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to) - private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = { + private def streamingRow(uiData: StreamingQueryUIData): StructuredStreamingRow = { val duration = if (isActive) { - System.currentTimeMillis() - query.startTimestamp + System.currentTimeMillis() - uiData.summary.startTimestamp } else { - withNoProgress(query, { - val endTimeMs = query.lastProgress.timestamp - parseProgressTimestamp(endTimeMs) - query.startTimestamp + withNoProgress(uiData, { + val endTimeMs = uiData.lastProgress.timestamp + parseProgressTimestamp(endTimeMs) - uiData.summary.startTimestamp }, 0) } - val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / - query.recentProgress.length) + val avgInput = (uiData.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / + uiData.recentProgress.length) - val avgProcess = (query.recentProgress.map(p => - withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length) + val avgProcess = (uiData.recentProgress.map(p => + withNumberInvalid(p.processedRowsPerSecond)).sum / uiData.recentProgress.length) - StructuredStreamingRow(duration, avgInput, avgProcess, query) + StructuredStreamingRow(duration, avgInput, avgProcess, uiData) } private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = { val ordering: Ordering[StructuredStreamingRow] = sortColumn match { - case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData)) - case "Status" => Ordering.by(q => UIUtils.getQueryStatus(q.streamingUIData)) - case "ID" => Ordering.by(_.streamingUIData.id) - case "Run ID" => Ordering.by(_.streamingUIData.runId) - case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp) + case "Name" => Ordering.by(row => UIUtils.getQueryName(row.streamingUIData)) + case "Status" => Ordering.by(row => UIUtils.getQueryStatus(row.streamingUIData)) + case "ID" => Ordering.by(_.streamingUIData.summary.id) + case "Run ID" => Ordering.by(_.streamingUIData.summary.runId) + case "Start Time" => Ordering.by(_.streamingUIData.summary.startTimestamp) case "Duration" => Ordering.by(_.duration) case "Avg Input /sec" => Ordering.by(_.avgInput) case "Avg Process /sec" => Ordering.by(_.avgProcess) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala index 24709ba..97691d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala @@ -58,8 +58,8 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - val query = parent.statusListener.allQueryStatus.find { case q => - q.runId.equals(UUID.fromString(parameterId)) + val query = parent.store.allQueryUIData.find { uiData => + uiData.summary.runId.equals(UUID.fromString(parameterId)) }.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId")) val resources = generateLoadResources(request) @@ -109,34 +109,35 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) <script>{Unparsed(js)}</script> } - def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = { - val duration = if (query.isActive) { - SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp) + def generateBasicInfo(uiData: StreamingQueryUIData): Seq[Node] = { + val duration = if (uiData.summary.isActive) { + val durationMs = System.currentTimeMillis() - uiData.summary.startTimestamp + SparkUIUtils.formatDurationVerbose(durationMs) } else { - withNoProgress(query, { - val end = query.lastProgress.timestamp - val start = query.recentProgress.head.timestamp + withNoProgress(uiData, { + val end = uiData.lastProgress.timestamp + val start = uiData.recentProgress.head.timestamp SparkUIUtils.formatDurationVerbose( parseProgressTimestamp(end) - parseProgressTimestamp(start)) }, "-") } - val name = UIUtils.getQueryName(query) - val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0) + val name = UIUtils.getQueryName(uiData) + val numBatches = withNoProgress(uiData, { uiData.lastProgress.batchId + 1L }, 0) <div>Running batches for <strong> {duration} </strong> since <strong> - {SparkUIUtils.formatDate(query.startTimestamp)} + {SparkUIUtils.formatDate(uiData.summary.startTimestamp)} </strong> (<strong>{numBatches}</strong> completed batches) </div> <br /> <div><strong>Name: </strong>{name}</div> - <div><strong>Id: </strong>{query.id}</div> - <div><strong>RunId: </strong>{query.runId}</div> + <div><strong>Id: </strong>{uiData.summary.id}</div> + <div><strong>RunId: </strong>{uiData.summary.runId}</div> <br /> } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index e331083..fdd3754 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConverters._ import scala.collection.mutable +import com.fasterxml.jackson.annotation.JsonIgnore + import org.apache.spark.SparkConf import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} +import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._ import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp +import org.apache.spark.status.{ElementTrackingStore, KVUtils} +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.kvstore.KVIndex /** * A customized StreamingQueryListener used in structured streaming UI, which contains all * UI data for both active and inactive query. - * TODO: Add support for history server. */ -private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener { - - /** - * We use runId as the key here instead of id in active query status map, - * because the runId is unique for every started query, even it its a restart. - */ - private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]() - private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]() +private[sql] class StreamingQueryStatusListener( + conf: SparkConf, + store: ElementTrackingStore) extends StreamingQueryListener { private val streamingProgressRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES) private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES) + store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) { count => + cleanupInactiveQueries(count) + } + + // Events from the same query run will never be processed concurrently, so it's safe to + // access `progressIds` without any protection. + private val queryToProgress = new ConcurrentHashMap[UUID, mutable.Queue[String]]() + + private def cleanupInactiveQueries(count: Long): Unit = { + val view = store.view(classOf[StreamingQueryData]).index("active").first(false).last(false) + val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true) + val numInactiveQueries = inactiveQueries.size + if (numInactiveQueries <= inactiveQueryStatusRetention) { + return + } + val toDelete = inactiveQueries.sortBy(_.endTimestamp.get) + .take(numInactiveQueries - inactiveQueryStatusRetention) + val runIds = toDelete.map { e => + store.delete(e.getClass, e.runId) + e.runId.toString + } + // Delete wrappers in one pass, as deleting them for each summary is slow + store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], "runId", runIds) + } + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { val startTimestamp = parseProgressTimestamp(event.timestamp) - activeQueryStatus.putIfAbsent(event.runId, - new StreamingQueryUIData(event.name, event.id, event.runId, startTimestamp)) + store.write(new StreamingQueryData( + event.name, + event.id, + event.runId, + isActive = true, + None, + startTimestamp + ), checkTriggers = true) } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { - val batchTimestamp = parseProgressTimestamp(event.progress.timestamp) - val queryStatus = activeQueryStatus.getOrDefault( - event.progress.runId, - new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId, - batchTimestamp)) - queryStatus.updateProcess(event.progress, streamingProgressRetention) - } - - override def onQueryTerminated( - event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized { - val queryStatus = activeQueryStatus.remove(event.runId) - if (queryStatus != null) { - queryStatus.queryTerminated(event) - inactiveQueryStatus += queryStatus - while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) { - inactiveQueryStatus.dequeue() - } + val runId = event.progress.runId + val batchId = event.progress.batchId + val timestamp = event.progress.timestamp + if (!queryToProgress.containsKey(runId)) { + queryToProgress.put(runId, mutable.Queue.empty[String]) + } + val progressIds = queryToProgress.get(runId) + progressIds.enqueue(getUniqueId(runId, batchId, timestamp)) + store.write(new StreamingQueryProgressWrapper(event.progress)) + while (progressIds.length > streamingProgressRetention) { + val uniqueId = progressIds.dequeue + store.delete(classOf[StreamingQueryProgressWrapper], uniqueId) } } - def allQueryStatus: Seq[StreamingQueryUIData] = synchronized { - activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus + override def onQueryTerminated( + event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + val querySummary = store.read(classOf[StreamingQueryData], event.runId) + val curTime = System.currentTimeMillis() + store.write(new StreamingQueryData( + querySummary.name, + querySummary.id, + querySummary.runId, + isActive = false, + querySummary.exception, + querySummary.startTimestamp, + Some(curTime) + ), checkTriggers = true) + queryToProgress.remove(event.runId) } } +private[sql] class StreamingQueryData( + val name: String, + val id: UUID, + @KVIndexParam val runId: UUID, + @KVIndexParam("active") val isActive: Boolean, + val exception: Option[String], + @KVIndexParam("startTimestamp") val startTimestamp: Long, + val endTimestamp: Option[Long] = None) + /** * This class contains all message related to UI display, each instance corresponds to a single * [[org.apache.spark.sql.streaming.StreamingQuery]]. */ -private[ui] class StreamingQueryUIData( - val name: String, - val id: UUID, - val runId: UUID, - val startTimestamp: Long) { - - /** Holds the most recent query progress updates. */ - private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() - - private var _isActive = true - private var _exception: Option[String] = None - - def isActive: Boolean = synchronized { _isActive } - - def exception: Option[String] = synchronized { _exception } - - def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized { - _isActive = false - _exception = event.exception - } - - def updateProcess( - newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized { - progressBuffer += newProgress - while (progressBuffer.length >= retentionNum) { - progressBuffer.dequeue() +private[sql] case class StreamingQueryUIData( + summary: StreamingQueryData, + recentProgress: Array[StreamingQueryProgress]) { + + def lastProgress: StreamingQueryProgress = { + if (recentProgress.nonEmpty) { + recentProgress.last + } else { + null } } +} - def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized { - progressBuffer.toArray - } +private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { + @JsonIgnore @KVIndex + private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp) - def lastProgress: StreamingQueryProgress = progressBuffer.synchronized { - progressBuffer.lastOption.orNull + @JsonIgnore @KVIndex("runId") + private def runIdIndex: String = progress.runId.toString +} + +private[sql] object StreamingQueryProgressWrapper { + /** + * Adding `timestamp` into unique id to support reporting `empty` query progress + * in which no data comes but with the same batchId. + */ + def getUniqueId( + runId: UUID, + batchId: Long, + timestamp: String): String = { + s"${runId}_${batchId}_$timestamp" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala index bb097ff..65cad8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.streaming.ui import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore import org.apache.spark.ui.{SparkUI, SparkUITab} private[sql] class StreamingQueryTab( - val statusListener: StreamingQueryStatusListener, + val store: StreamingQueryStatusStore, sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging { override val name = "Structured Streaming" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala index 1f7e65d..88a110f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala @@ -46,19 +46,19 @@ private[ui] object UIUtils { } } - def getQueryName(query: StreamingQueryUIData): String = { - if (query.name == null || query.name.isEmpty) { + def getQueryName(uiData: StreamingQueryUIData): String = { + if (uiData.summary.name == null || uiData.summary.name.isEmpty) { "<no name>" } else { - query.name + uiData.summary.name } } - def getQueryStatus(query: StreamingQueryUIData): String = { - if (query.isActive) { + def getQueryStatus(uiData: StreamingQueryUIData): String = { + if (uiData.summary.isActive) { "RUNNING" } else { - query.exception.map(_ => "FAILED").getOrElse("FINISHED") + uiData.summary.exception.map(_ => "FAILED").getOrElse("FINISHED") } } diff --git a/sql/core/src/test/resources/spark-events/local-1596020211915 b/sql/core/src/test/resources/spark-events/local-1596020211915 new file mode 100644 index 0000000..ff34bbc --- /dev/null +++ b/sql/core/src/test/resources/spark-events/local-1596020211915 @@ -0,0 +1,160 @@ +{"Event":"SparkListenerLogStart","Spark Version":"3.1.0-SNAPSHOT"} +{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":0,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":1,"Discovery Script":"","Vendor":""},"memory":{"Resource Name":"memory","Amount":1024,"Discovery Script":"","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0}}} +{"Event":"SparkListenerExecutorAdded","Timestamp":1596020212090,"Executor ID":"driver","Executor Info":{"Host":"iZbp19vpr16ix621sdw476Z","Total Cores":4,"Log Urls":{},"Attributes":{},"Resources":{},"Resource Profile Id":0}} +{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Port":39845},"Maximum Memory":384093388,"Timestamp":1596020212109,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":0} +{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre","Java Version":"1.8.0_252 (Oracle Corporation)","Scala Version":"version 2.12.10"},"Spark Properties":{"spark.driver.host":"iZbp19vpr16ix621sdw476Z","spark.eventLog.enabled":"true","spark.driver.port":"46309","spark.jars":"file:/root/spark-3.1.0-SNAPSHOT-bin-hadoop2.8/./examples/jars/spark-examples_2.12-3.1.0-SNAPSHOT.jar","spark.app.name":"Structure [...] +{"Event":"SparkListenerApplicationStart","App Name":"StructuredKafkaWordCount","App ID":"local-1596020211915","Timestamp":1596020210919,"User":"root"} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent","id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:56:55.947Z"} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":0,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":1,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1596020221633,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Partiti [...] +{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1596020221738,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1596020221738,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222649,"Failed":false,"Killed":false,"Accumulables":[{"ID":21,"Name":"shuffle write time","Update":"9599308","Value":"9599308","Interna [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Partiti [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number of [...] +{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1596020222709,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":0,"Attempt":0,"Launch Time":1596020222713,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":0,"Attempt":0,"Launch Time":1596020222713,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222954,"Failed":false,"Killed":false,"Accumulables":[{"ID":44,"Name":"duration","Update":"19","Value":"19","Internal":true,"Count Failed Va [...] +{"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1596020222709,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222965,"Failed":false,"Killed":false,"Accumulables":[{"ID":44,"Name":"duration","Update":"33","Value":"52","Internal":true,"Count Failed Value [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number of [...] +{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1596020222973,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":2,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":2,"time":1596020223062} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":1,"time":1596020223069} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":0,"time":1596020223069} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:56:56.015Z","batchId":0,"batchDuration":7110,"durationMs":{"triggerExecution":7109,"queryPlanning":439,"getBatch":21,"latestOffset":3524,"addBatch":3011,"walCommit":35},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":776,"numL [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":3,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":4,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"SparkListenerJobStart","Job ID":1,"Submission Time":1596020223482,"Stage Infos":[{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...] +{"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":0,"Attempt":0,"Launch Time":1596020223493,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":2,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":0,"Attempt":0,"Launch Time":1596020223493,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223601,"Failed":false,"Killed":false,"Accumulables":[{"ID":178,"Name":"shuffle write time","Update":"837580","Value":"837580","Internal [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":3,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...] +{"Event":"SparkListenerTaskStart","Stage ID":3,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1596020223625,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":3,"Stage Attempt ID":0,"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1596020223626,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":3,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1596020223625,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223717,"Failed":false,"Killed":false,"Accumulables":[{"ID":201,"Name":"duration","Update":"4","Value":"4","Internal":true,"Count Failed Val [...] +{"Event":"SparkListenerTaskEnd","Stage ID":3,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1596020223626,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223720,"Failed":false,"Killed":false,"Accumulables":[{"ID":201,"Name":"duration","Update":"4","Value":"8","Internal":true,"Count Failed Val [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":3,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...] +{"Event":"SparkListenerJobEnd","Job ID":1,"Completion Time":1596020223725,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":5,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":5,"time":1596020223761} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":4,"time":1596020223762} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":3,"time":1596020223762} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:03.168Z","batchId":1,"batchDuration":622,"durationMs":{"triggerExecution":622,"queryPlanning":47,"getBatch":0,"latestOffset":7,"addBatch":478,"walCommit":59},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1216,"numLateInpu [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":6,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":7,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"SparkListenerJobStart","Job ID":2,"Submission Time":1596020224100,"Stage Infos":[{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":f [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":4,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":30,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"74\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[29],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...] +{"Event":"SparkListenerTaskStart","Stage ID":4,"Stage Attempt ID":0,"Task Info":{"Task ID":6,"Index":0,"Attempt":0,"Launch Time":1596020224113,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":4,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":6,"Index":0,"Attempt":0,"Launch Time":1596020224113,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224174,"Failed":false,"Killed":false,"Accumulables":[{"ID":335,"Name":"shuffle write time","Update":"686296","Value":"686296","Internal [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":4,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":30,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"74\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[29],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...] +{"Event":"SparkListenerTaskStart","Stage ID":5,"Stage Attempt ID":0,"Task Info":{"Task ID":7,"Index":0,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":5,"Stage Attempt ID":0,"Task Info":{"Task ID":8,"Index":1,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":5,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":7,"Index":0,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224256,"Failed":false,"Killed":false,"Accumulables":[{"ID":358,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Val [...] +{"Event":"SparkListenerTaskEnd","Stage ID":5,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":8,"Index":1,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224257,"Failed":false,"Killed":false,"Accumulables":[{"ID":358,"Name":"duration","Update":"4","Value":"7","Internal":true,"Count Failed Val [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...] +{"Event":"SparkListenerJobEnd","Job ID":2,"Completion Time":1596020224259,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":8,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":8,"time":1596020224287} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":7,"time":1596020224287} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":6,"time":1596020224288} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:03.793Z","batchId":2,"batchDuration":522,"durationMs":{"triggerExecution":522,"queryPlanning":41,"getBatch":1,"latestOffset":3,"addBatch":421,"walCommit":27},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":9,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":10,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"SparkListenerJobStart","Job ID":3,"Submission Time":1596020224533,"Stage Infos":[{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...] +{"Event":"SparkListenerTaskStart","Stage ID":6,"Stage Attempt ID":0,"Task Info":{"Task ID":9,"Index":0,"Attempt":0,"Launch Time":1596020224541,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":6,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":9,"Index":0,"Attempt":0,"Launch Time":1596020224541,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224581,"Failed":false,"Killed":false,"Accumulables":[{"ID":492,"Name":"shuffle write time","Update":"643278","Value":"643278","Internal [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":7,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":47,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"99\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[46],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...] +{"Event":"SparkListenerTaskStart","Stage ID":7,"Stage Attempt ID":0,"Task Info":{"Task ID":10,"Index":0,"Attempt":0,"Launch Time":1596020224596,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":7,"Stage Attempt ID":0,"Task Info":{"Task ID":11,"Index":1,"Attempt":0,"Launch Time":1596020224597,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":7,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":10,"Index":0,"Attempt":0,"Launch Time":1596020224596,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224670,"Failed":false,"Killed":false,"Accumulables":[{"ID":515,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Va [...] +{"Event":"SparkListenerTaskEnd","Stage ID":7,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":11,"Index":1,"Attempt":0,"Launch Time":1596020224597,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224687,"Failed":false,"Killed":false,"Accumulables":[{"ID":515,"Name":"duration","Update":"4","Value":"7","Internal":true,"Count Failed Va [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":7,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":47,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"99\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[46],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...] +{"Event":"SparkListenerJobEnd","Job ID":3,"Completion Time":1596020224689,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":11,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":11,"time":1596020224713} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":10,"time":1596020224714} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":9,"time":1596020224714} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:04.317Z","batchId":3,"batchDuration":415,"durationMs":{"triggerExecution":415,"queryPlanning":38,"getBatch":1,"latestOffset":3,"addBatch":332,"walCommit":21},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":12,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":13,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"SparkListenerJobStart","Job ID":4,"Submission Time":1596020224928,"Stage Infos":[{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier": [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":8,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":54,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"140\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[53],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...] +{"Event":"SparkListenerTaskStart","Stage ID":8,"Stage Attempt ID":0,"Task Info":{"Task ID":12,"Index":0,"Attempt":0,"Launch Time":1596020224941,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":8,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":12,"Index":0,"Attempt":0,"Launch Time":1596020224941,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224979,"Failed":false,"Killed":false,"Accumulables":[{"ID":649,"Name":"shuffle write time","Update":"572754","Value":"572754","Interna [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":8,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":54,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"140\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[53],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerTaskStart","Stage ID":9,"Stage Attempt ID":0,"Task Info":{"Task ID":13,"Index":0,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":9,"Stage Attempt ID":0,"Task Info":{"Task ID":14,"Index":1,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":9,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":14,"Index":1,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225056,"Failed":false,"Killed":false,"Accumulables":[{"ID":672,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Va [...] +{"Event":"SparkListenerTaskEnd","Stage ID":9,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":13,"Index":0,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225058,"Failed":false,"Killed":false,"Accumulables":[{"ID":672,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed Va [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerJobEnd","Job ID":4,"Completion Time":1596020225059,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":14,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":14,"time":1596020225087} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":13,"time":1596020225087} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":12,"time":1596020225087} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:04.734Z","batchId":4,"batchDuration":387,"durationMs":{"triggerExecution":387,"queryPlanning":30,"getBatch":1,"latestOffset":3,"addBatch":306,"walCommit":12},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":15,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":16,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"SparkListenerJobStart","Job ID":5,"Submission Time":1596020225342,"Stage Infos":[{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Numbe [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...] +{"Event":"SparkListenerTaskStart","Stage ID":10,"Stage Attempt ID":0,"Task Info":{"Task ID":15,"Index":0,"Attempt":0,"Launch Time":1596020225359,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":15,"Index":0,"Attempt":0,"Launch Time":1596020225359,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225400,"Failed":false,"Killed":false,"Accumulables":[{"ID":806,"Name":"shuffle write time","Update":"530930","Value":"530930","Intern [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":11,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":71,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"165\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[70],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerTaskStart","Stage ID":11,"Stage Attempt ID":0,"Task Info":{"Task ID":16,"Index":0,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":11,"Stage Attempt ID":0,"Task Info":{"Task ID":17,"Index":1,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":11,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":17,"Index":1,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225498,"Failed":false,"Killed":false,"Accumulables":[{"ID":829,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed V [...] +{"Event":"SparkListenerTaskEnd","Stage ID":11,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":16,"Index":0,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225509,"Failed":false,"Killed":false,"Accumulables":[{"ID":829,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed V [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":11,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":71,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"165\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[70],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerJobEnd","Job ID":5,"Completion Time":1596020225514,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":17,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":17,"time":1596020225541} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":16,"time":1596020225542} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":15,"time":1596020225542} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.123Z","batchId":5,"batchDuration":437,"durationMs":{"triggerExecution":437,"queryPlanning":35,"getBatch":1,"latestOffset":3,"addBatch":361,"walCommit":18},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":18,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":19,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"SparkListenerJobStart","Job ID":6,"Submission Time":1596020225759,"Stage Infos":[{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Numbe [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...] +{"Event":"SparkListenerTaskStart","Stage ID":12,"Stage Attempt ID":0,"Task Info":{"Task ID":18,"Index":0,"Attempt":0,"Launch Time":1596020225766,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":12,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":18,"Index":0,"Attempt":0,"Launch Time":1596020225766,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225796,"Failed":false,"Killed":false,"Accumulables":[{"ID":963,"Name":"shuffle write time","Update":"543836","Value":"543836","Intern [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":13,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":83,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"198\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[82],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerTaskStart","Stage ID":13,"Stage Attempt ID":0,"Task Info":{"Task ID":19,"Index":0,"Attempt":0,"Launch Time":1596020225808,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":13,"Stage Attempt ID":0,"Task Info":{"Task ID":20,"Index":1,"Attempt":0,"Launch Time":1596020225809,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":13,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":19,"Index":0,"Attempt":0,"Launch Time":1596020225808,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225868,"Failed":false,"Killed":false,"Accumulables":[{"ID":986,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed V [...] +{"Event":"SparkListenerTaskEnd","Stage ID":13,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":20,"Index":1,"Attempt":0,"Launch Time":1596020225809,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225874,"Failed":false,"Killed":false,"Accumulables":[{"ID":986,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed V [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":13,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":83,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"198\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[82],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerJobEnd","Job ID":6,"Completion Time":1596020225875,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":20,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":20,"time":1596020225896} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":19,"time":1596020225897} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":18,"time":1596020225897} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.562Z","batchId":6,"batchDuration":351,"durationMs":{"triggerExecution":351,"queryPlanning":28,"getBatch":1,"latestOffset":6,"addBatch":273,"walCommit":25},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":21,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":22,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"SparkListenerJobStart","Job ID":7,"Submission Time":1596020226076,"Stage Infos":[{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier" [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":14,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":90,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"239\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[89],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...] +{"Event":"SparkListenerTaskStart","Stage ID":14,"Stage Attempt ID":0,"Task Info":{"Task ID":21,"Index":0,"Attempt":0,"Launch Time":1596020226086,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":14,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":21,"Index":0,"Attempt":0,"Launch Time":1596020226086,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226116,"Failed":false,"Killed":false,"Accumulables":[{"ID":1120,"Name":"shuffle write time","Update":"543034","Value":"543034","Inter [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":14,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":90,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"239\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[89],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...] +{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerTaskStart","Stage ID":15,"Stage Attempt ID":0,"Task Info":{"Task ID":22,"Index":0,"Attempt":0,"Launch Time":1596020226128,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskStart","Stage ID":15,"Stage Attempt ID":0,"Task Info":{"Task ID":23,"Index":1,"Attempt":0,"Launch Time":1596020226129,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":15,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":23,"Index":1,"Attempt":0,"Launch Time":1596020226129,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226196,"Failed":false,"Killed":false,"Accumulables":[{"ID":1143,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed [...] +{"Event":"SparkListenerTaskEnd","Stage ID":15,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":22,"Index":0,"Attempt":0,"Launch Time":1596020226128,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226204,"Failed":false,"Killed":false,"Accumulables":[{"ID":1143,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed [...] +{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...] +{"Event":"SparkListenerJobEnd","Job ID":7,"Completion Time":1596020226204,"Job Result":{"Result":"JobSucceeded"}} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":23,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...] +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":23,"time":1596020226230} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":22,"time":1596020226231} +{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":21,"time":1596020226231} +{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.916Z","batchId":7,"batchDuration":341,"durationMs":{"triggerExecution":341,"queryPlanning":24,"getBatch":0,"latestOffset":3,"addBatch":271,"walCommit":14},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...] +{"Event":"SparkListenerApplicationEnd","Timestamp":1596020226301} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala similarity index 53% copy from sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala copy to sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala index bb097ff..f73305b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala +++ b/sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala @@ -14,26 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.streaming.ui -import org.apache.spark.internal.Logging -import org.apache.spark.ui.{SparkUI, SparkUITab} +package org.apache.spark.deploy.history -private[sql] class StreamingQueryTab( - val statusListener: StreamingQueryStatusListener, - sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging { +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.History.HISTORY_LOG_DIR +import org.apache.spark.util.ManualClock - override val name = "Structured Streaming" - - val parent = sparkUI - - attachPage(new StreamingQueryPage(this)) - attachPage(new StreamingQueryStatisticsPage(this)) - parent.attachTab(this) - - parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql") -} - -private[sql] object StreamingQueryTab { - private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static" +object Utils { + def withFsHistoryProvider(logDir: String)(fn: FsHistoryProvider => Unit): Unit = { + var provider: FsHistoryProvider = null + try { + val clock = new ManualClock() + val conf = new SparkConf().set(HISTORY_LOG_DIR, logDir) + val provider = new FsHistoryProvider(conf, clock) + provider.checkForLogs() + fn(provider) + } finally { + if (provider != null) { + provider.stop() + provider = null + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala new file mode 100644 index 0000000..160535e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.util.Locale +import javax.servlet.http.HttpServletRequest + +import org.mockito.Mockito.{mock, when} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.deploy.history.{Utils => HsUtils} +import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore +import org.apache.spark.sql.test.SharedSparkSession + +class StreamingQueryHistorySuite extends SharedSparkSession with BeforeAndAfter { + + test("support streaming query events") { + val logDir = Thread.currentThread().getContextClassLoader.getResource("spark-events").toString + HsUtils.withFsHistoryProvider(logDir) { provider => + val appUi = provider.getAppUI("local-1596020211915", None).getOrElse { + assert(false, "Failed to load event log of local-1596020211915.") + null + } + assert(appUi.ui.appName == "StructuredKafkaWordCount") + assert(appUi.ui.store.store.count(classOf[StreamingQueryData]) == 1) + assert(appUi.ui.store.store.count(classOf[StreamingQueryProgressWrapper]) == 8) + + val store = new StreamingQueryStatusStore(appUi.ui.store.store) + val tab = new StreamingQueryTab(store, appUi.ui) + val request = mock(classOf[HttpServletRequest]) + var html = new StreamingQueryPage(tab).render(request) + .toString().toLowerCase(Locale.ROOT) + // 81.39: Avg Input /sec + assert(html.contains("81.39")) + // 157.05: Avg Process /sec + assert(html.contains("157.05")) + + val id = "8d268dc2-bc9c-4be8-97a9-b135d2943028" + val runId = "e225d92f-2545-48f8-87a2-9c0309580f8a" + when(request.getParameter("id")).thenReturn(runId) + html = new StreamingQueryStatisticsPage(tab).render(request) + .toString().toLowerCase(Locale.ROOT) + assert(html.contains("<strong>8</strong> completed batches")) + assert(html.contains(id)) + assert(html.contains(runId)) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala index c2b6688..246fa1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.streaming.ui import java.util.{Locale, UUID} import javax.servlet.http.HttpServletRequest +import scala.xml.Node + import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.scalatest.BeforeAndAfter -import scala.xml.Node import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore import org.apache.spark.sql.streaming.StreamingQueryProgress import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.ui.SparkUI @@ -35,26 +37,26 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { val id = UUID.randomUUID() val request = mock(classOf[HttpServletRequest]) val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS) - val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS) + val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) - when(tab.statusListener).thenReturn(statusListener) + when(tab.store).thenReturn(store) val streamQuery = createStreamQueryUIData(id) - when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery)) + when(store.allQueryUIData).thenReturn(Seq(streamQuery)) var html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) assert(html.contains("active streaming queries (1)")) - when(streamQuery.isActive).thenReturn(false) - when(streamQuery.exception).thenReturn(None) + when(streamQuery.summary.isActive).thenReturn(false) + when(streamQuery.summary.exception).thenReturn(None) html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) assert(html.contains("completed streaming queries (1)")) assert(html.contains("finished")) - when(streamQuery.isActive).thenReturn(false) - when(streamQuery.exception).thenReturn(Option("exception in query")) + when(streamQuery.summary.isActive).thenReturn(false) + when(streamQuery.summary.exception).thenReturn(Option("exception in query")) html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) assert(html.contains("completed streaming queries (1)")) @@ -66,17 +68,20 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { val id = UUID.randomUUID() val request = mock(classOf[HttpServletRequest]) val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS) - val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS) + val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS) + when(request.getParameter("id")).thenReturn(id.toString) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + when(tab.store).thenReturn(store) val ui = mock(classOf[SparkUI]) when(request.getParameter("id")).thenReturn(id.toString) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) - when(tab.statusListener).thenReturn(statusListener) when(ui.conf).thenReturn(new SparkConf()) when(tab.parent).thenReturn(ui) val streamQuery = createStreamQueryUIData(id) - when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery)) + when(store.allQueryUIData).thenReturn(Seq(streamQuery)) val html = renderStreamingQueryStatisticsPage(request, tab) .toString().toLowerCase(Locale.ROOT) @@ -94,15 +99,18 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { when(progress.batchId).thenReturn(2) when(progress.prettyJson).thenReturn("""{"a":1}""") + val summary = mock(classOf[StreamingQueryData], RETURNS_SMART_NULLS) + when(summary.isActive).thenReturn(true) + when(summary.name).thenReturn("query") + when(summary.id).thenReturn(id) + when(summary.runId).thenReturn(id) + when(summary.startTimestamp).thenReturn(1L) + when(summary.exception).thenReturn(None) + val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS) - when(streamQuery.isActive).thenReturn(true) - when(streamQuery.name).thenReturn("query") - when(streamQuery.id).thenReturn(id) - when(streamQuery.runId).thenReturn(id) - when(streamQuery.startTimestamp).thenReturn(1L) + when(streamQuery.summary).thenReturn(summary) when(streamQuery.lastProgress).thenReturn(progress) when(streamQuery.recentProgress).thenReturn(Array(progress)) - when(streamQuery.exception).thenReturn(None) streamQuery } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index 6aa440e..91c55d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -17,19 +17,28 @@ package org.apache.spark.sql.streaming.ui -import java.util.UUID +import java.text.SimpleDateFormat +import java.util.{Date, UUID} import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} +import org.scalatest.time.SpanSugar._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone +import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest} import org.apache.spark.sql.streaming +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.InMemoryStore class StreamingQueryStatusListenerSuite extends StreamTest { test("onQueryStarted, onQueryProgress, onQueryTerminated") { - val listener = new StreamingQueryStatusListener(spark.sparkContext.conf) + val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore) + val queryStore = new StreamingQueryStatusStore(kvStore) - // hanlde query started event + // handle query started event val id = UUID.randomUUID() val runId = UUID.randomUUID() val startEvent = new StreamingQueryListener.QueryStartedEvent( @@ -37,8 +46,9 @@ class StreamingQueryStatusListenerSuite extends StreamTest { listener.onQueryStarted(startEvent) // result checking - assert(listener.activeQueryStatus.size() == 1) - assert(listener.activeQueryStatus.get(runId).name == "test") + assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1) + assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData => + uiData.summary.runId == runId && uiData.summary.name.equals("test"))) // handle query progress event val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS) @@ -53,28 +63,32 @@ class StreamingQueryStatusListenerSuite extends StreamTest { listener.onQueryProgress(processEvent) // result checking - val activeQuery = listener.activeQueryStatus.get(runId) - assert(activeQuery.isActive) - assert(activeQuery.recentProgress.length == 1) - assert(activeQuery.lastProgress.id == id) - assert(activeQuery.lastProgress.runId == runId) - assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z") - assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0) - assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0) - assert(activeQuery.lastProgress.batchId == 2) - assert(activeQuery.lastProgress.prettyJson == """{"a":1}""") + val activeQuery = + queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == runId) + assert(activeQuery.isDefined) + assert(activeQuery.get.summary.isActive) + assert(activeQuery.get.recentProgress.length == 1) + assert(activeQuery.get.lastProgress.id == id) + assert(activeQuery.get.lastProgress.runId == runId) + assert(activeQuery.get.lastProgress.timestamp == "2001-10-01T01:00:00.100Z") + assert(activeQuery.get.lastProgress.inputRowsPerSecond == 10.0) + assert(activeQuery.get.lastProgress.processedRowsPerSecond == 12.0) + assert(activeQuery.get.lastProgress.batchId == 2) + assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""") // handle terminate event val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None) listener.onQueryTerminated(terminateEvent) - assert(!listener.inactiveQueryStatus.head.isActive) - assert(listener.inactiveQueryStatus.head.runId == runId) - assert(listener.inactiveQueryStatus.head.id == id) + assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive) + assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId) + assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id) } test("same query start multiple times") { - val listener = new StreamingQueryStatusListener(spark.sparkContext.conf) + val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore) + val queryStore = new StreamingQueryStatusStore(kvStore) // handle first time start val id = UUID.randomUUID() @@ -94,11 +108,106 @@ class StreamingQueryStatusListenerSuite extends StreamTest { listener.onQueryStarted(startEvent1) // result checking - assert(listener.activeQueryStatus.size() == 1) - assert(listener.inactiveQueryStatus.length == 1) - assert(listener.activeQueryStatus.containsKey(runId1)) - assert(listener.activeQueryStatus.get(runId1).id == id) - assert(listener.inactiveQueryStatus.head.runId == runId0) - assert(listener.inactiveQueryStatus.head.id == id) + assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1) + assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1) + assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId == runId1)) + assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData => + uiData.summary.runId == runId1 && uiData.summary.id == id)) + assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId0) + assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id) + } + + test("test small retained queries") { + val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val conf = spark.sparkContext.conf + conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2") + val listener = new StreamingQueryStatusListener(conf, kvStore) + val queryStore = new StreamingQueryStatusStore(kvStore) + + def addNewQuery(): (UUID, UUID) = { + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + format.setTimeZone(getTimeZone("UTC")) + val id = UUID.randomUUID() + val runId = UUID.randomUUID() + val startEvent = new StreamingQueryListener.QueryStartedEvent( + id, runId, "test1", format.format(new Date(System.currentTimeMillis()))) + listener.onQueryStarted(startEvent) + (id, runId) + } + + def checkInactiveQueryStatus(numInactives: Int, targetInactives: Seq[UUID]): Unit = { + eventually(timeout(10.seconds)) { + val inactiveQueries = queryStore.allQueryUIData.filter(!_.summary.isActive) + assert(inactiveQueries.size == numInactives) + assert(inactiveQueries.map(_.summary.id).toSet == targetInactives.toSet) + } + } + + val (id1, runId1) = addNewQuery() + val (id2, runId2) = addNewQuery() + val (id3, runId3) = addNewQuery() + assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0) + + val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None) + listener.onQueryTerminated(terminateEvent1) + checkInactiveQueryStatus(1, Seq(id1)) + val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None) + listener.onQueryTerminated(terminateEvent2) + checkInactiveQueryStatus(2, Seq(id1, id2)) + val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None) + listener.onQueryTerminated(terminateEvent3) + checkInactiveQueryStatus(2, Seq(id2, id3)) + } + + test("test small retained progress") { + val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val conf = spark.sparkContext.conf + conf.set(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES.key, "5") + val listener = new StreamingQueryStatusListener(conf, kvStore) + val queryStore = new StreamingQueryStatusStore(kvStore) + + val id = UUID.randomUUID() + val runId = UUID.randomUUID() + val startEvent = new StreamingQueryListener.QueryStartedEvent( + id, runId, "test", "2016-12-05T20:54:20.827Z") + listener.onQueryStarted(startEvent) + + var batchId: Int = 0 + + def addQueryProgress(): Unit = { + val progress = mockProgressData(id, runId) + val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress) + listener.onQueryProgress(processEvent) + } + + def mockProgressData(id: UUID, runId: UUID): StreamingQueryProgress = { + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + format.setTimeZone(getTimeZone("UTC")) + + val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS) + when(progress.id).thenReturn(id) + when(progress.runId).thenReturn(runId) + when(progress.timestamp).thenReturn(format.format(new Date(System.currentTimeMillis()))) + when(progress.inputRowsPerSecond).thenReturn(10.0) + when(progress.processedRowsPerSecond).thenReturn(12.0) + when(progress.batchId).thenReturn(batchId) + when(progress.prettyJson).thenReturn("""{"a":1}""") + + batchId += 1 + progress + } + + def checkQueryProcessData(targetNum: Int): Unit = { + eventually(timeout(10.seconds)) { + assert(queryStore.getQueryProgressData(runId).size == targetNum) + } + } + + Array.tabulate(4) { _ => addQueryProgress() } + checkQueryProcessData(4) + addQueryProgress() + checkQueryProcessData(5) + addQueryProgress() + checkQueryProcessData(5) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org