This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9c583b8 [SPARK-30984][SS] Add UI test for Structured Streaming UI 9c583b8 is described below commit 9c583b8aff4f3d5799524619f4997281ae428da5 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Wed Mar 4 13:55:34 2020 +0800 [SPARK-30984][SS] Add UI test for Structured Streaming UI ### What changes were proposed in this pull request? - Add a UI test for Structured Streaming UI - Fix the unsafe usages of `SimpleDateFormat` by using a ThreadLocal shared object. - Use `start` to replace `submission` to be consistent with the API `StreamingQuery.start()`. ### Why are the changes needed? Structured Streaming UI is missing UI tests. ### Does this PR introduce any user-facing change? No ### How was this patch tested? The new test. Closes #27732 from zsxwing/ss-ui-test. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit ebfff7af6a9b2d672871317d30161cdafaa32ca4) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- sql/core/pom.xml | 10 ++ .../sql/execution/streaming/ProgressReporter.scala | 2 +- .../sql/execution/streaming/StreamExecution.scala | 4 +- .../sql/streaming/StreamingQueryListener.scala | 4 +- .../sql/streaming/ui/StreamingQueryPage.scala | 22 ++-- .../ui/StreamingQueryStatisticsPage.scala | 38 +++--- .../ui/StreamingQueryStatusListener.scala | 13 +- .../apache/spark/sql/streaming/ui/UIUtils.scala | 17 +++ .../streaming/StreamingQueryListenerSuite.scala | 6 +- .../sql/streaming/ui/StreamingQueryPageSuite.scala | 2 +- .../ui/StreamingQueryStatusListenerSuite.scala | 9 +- .../spark/sql/streaming/ui/UISeleniumSuite.scala | 145 +++++++++++++++++++++ 12 files changed, 221 insertions(+), 51 deletions(-) diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 0e664ec..37da614 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -150,6 +150,16 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.seleniumhq.selenium</groupId> + <artifactId>selenium-java</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.seleniumhq.selenium</groupId> + <artifactId>selenium-htmlunit-driver</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index f20291e..feb151a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -349,7 +349,7 @@ trait ProgressReporter extends Logging { result } - private def formatTimestamp(millis: Long): String = { + protected def formatTimestamp(millis: Long): String = { timestampFormat.format(new Date(millis)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8b3534b..8006437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -307,8 +307,8 @@ abstract class StreamExecution( } // `postEvent` does not throw non fatal exception. - val submissionTime = triggerClock.getTimeMillis() - postEvent(new QueryStartedEvent(id, runId, name, submissionTime)) + val startTimestamp = triggerClock.getTimeMillis() + postEvent(new QueryStartedEvent(id, runId, name, formatTimestamp(startTimestamp))) // Unblock starting thread startLatch.countDown() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index dd842cd..7ae38c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -82,7 +82,7 @@ object StreamingQueryListener { * @param id A unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param submissionTime The timestamp to start a query. + * @param timestamp The timestamp to start a query. * @since 2.1.0 */ @Evolving @@ -90,7 +90,7 @@ object StreamingQueryListener { val id: UUID, val runId: UUID, val name: String, - val submissionTime: Long) extends Event + val timestamp: String) extends Event /** * Event representing any progress updates in a query. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala index 650f64f..7336765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -17,22 +17,18 @@ package org.apache.spark.sql.streaming.ui -import java.text.SimpleDateFormat import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.commons.lang3.StringEscapeUtils +import org.apache.commons.text.StringEscapeUtils import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.streaming.ui.UIUtils._ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} private[ui] class StreamingQueryPage(parent: StreamingQueryTab) extends WebUIPage("") with Logging { - val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - df.setTimeZone(getTimeZone("UTC")) override def render(request: HttpServletRequest): Seq[Node] = { val content = generateStreamingQueryTable(request) @@ -61,11 +57,11 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) val name = UIUtils.getQueryName(query) val status = UIUtils.getQueryStatus(query) val duration = if (queryActive) { - SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime) + SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp) } else { withNoProgress(query, { val endTimeMs = query.lastProgress.timestamp - SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime) + SparkUIUtils.formatDurationVerbose(parseProgressTimestamp(endTimeMs) - query.startTimestamp) }, "-") } @@ -74,7 +70,7 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) <td> {status} </td> <td> {query.id} </td> <td> <a href={statisticsLink}> {query.runId} </a> </td> - <td> {SparkUIUtils.formatDate(query.submissionTime)} </td> + <td> {SparkUIUtils.formatDate(query.startTimestamp)} </td> <td> {duration} </td> <td> {withNoProgress(query, { (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / @@ -94,29 +90,29 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) .partition(_.isActive) val activeQueryTables = if (activeQueries.nonEmpty) { val headerRow = Seq( - "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec", + "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", "Avg Process /sec", "Lastest Batch") Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true), - activeQueries, true, None, Seq(null), false)) + activeQueries, true, Some("activeQueries-table"), Seq(null), false)) } else { None } val inactiveQueryTables = if (inactiveQueries.nonEmpty) { val headerRow = Seq( - "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec", + "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", "Avg Process /sec", "Lastest Batch", "Error") Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false), - inactiveQueries, true, None, Seq(null), false)) + inactiveQueries, true, Some("completedQueries-table"), Seq(null), false)) } else { None } // scalastyle:off val content = - <span id="completed" class="collapse-aggregated-activeQueries collapse-table" + <span id="active" class="collapse-aggregated-activeQueries collapse-table" onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')"> <h5 id="activequeries"> <span class="collapse-table-arrow arrow-open"></span> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala index fa9896e..65052d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala @@ -19,21 +19,17 @@ package org.apache.spark.sql.streaming.ui import java.{util => ju} import java.lang.{Long => JLong} -import java.text.SimpleDateFormat import java.util.UUID import javax.servlet.http.HttpServletRequest import scala.xml.{Node, Unparsed} import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.streaming.ui.UIUtils._ import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) extends WebUIPage("statistics") with Logging { - val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - df.setTimeZone(getTimeZone("UTC")) def generateLoadResources(request: HttpServletRequest): Seq[Node] = { // scalastyle:off @@ -101,13 +97,13 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = { val duration = if (query.isActive) { - SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime) + SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp) } else { withNoProgress(query, { val end = query.lastProgress.timestamp val start = query.recentProgress.head.timestamp SparkUIUtils.formatDurationVerbose( - df.parse(end).getTime - df.parse(start).getTime) + parseProgressTimestamp(end) - parseProgressTimestamp(start)) }, "-") } @@ -119,7 +115,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) </strong> since <strong> - {SparkUIUtils.formatDate(query.submissionTime)} + {SparkUIUtils.formatDate(query.startTimestamp)} </strong> (<strong>{numBatches}</strong> completed batches) </div> @@ -132,13 +128,13 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) def generateStatTable(query: StreamingQueryUIData): Seq[Node] = { val batchToTimestamps = withNoProgress(query, - query.recentProgress.map(p => (p.batchId, df.parse(p.timestamp).getTime)), + query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))), Array.empty[(Long, Long)]) val batchTimes = batchToTimestamps.map(_._2) val minBatchTime = - withNoProgress(query, df.parse(query.recentProgress.head.timestamp).getTime, 0L) + withNoProgress(query, parseProgressTimestamp(query.recentProgress.head.timestamp), 0L) val maxBatchTime = - withNoProgress(query, df.parse(query.lastProgress.timestamp).getTime, 0L) + withNoProgress(query, parseProgressTimestamp(query.lastProgress.timestamp), 0L) val maxRecordRate = withNoProgress(query, query.recentProgress.map(_.inputRowsPerSecond).max, 0L) val minRecordRate = 0L @@ -152,22 +148,26 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) val minBatchDuration = 0L val inputRateData = withNoProgress(query, - query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), withNumberInvalid { p.inputRowsPerSecond })), Array.empty[(Long, Double)]) val processRateData = withNoProgress(query, - query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long, Double)]) val inputRowsData = withNoProgress(query, - query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)]) val batchDurations = withNoProgress(query, - query.recentProgress.map(p => (df.parse(p.timestamp).getTime, + query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)]) - val operationDurationData = withNoProgress(query, query.recentProgress.map { p => - val durationMs = p.durationMs - // remove "triggerExecution" as it count the other operation duration. - durationMs.remove("triggerExecution") - (df.parse(p.timestamp).getTime, durationMs)}, Array.empty[(Long, ju.Map[String, JLong])]) + val operationDurationData = withNoProgress( + query, + query.recentProgress.map { p => + val durationMs = p.durationMs + // remove "triggerExecution" as it count the other operation duration. + durationMs.remove("triggerExecution") + (parseProgressTimestamp(p.timestamp), durationMs) + }, + Array.empty[(Long, ju.Map[String, JLong])]) val jsCollector = new JsCollector val graphUIDataForInputRate = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index 9181511..e331083 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.streaming.ui -import java.text.SimpleDateFormat import java.util.UUID import java.util.concurrent.ConcurrentHashMap @@ -25,9 +24,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.SparkConf -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} +import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp /** * A customized StreamingQueryListener used in structured streaming UI, which contains all @@ -36,9 +35,6 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro */ private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener { - private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 - timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) - /** * We use runId as the key here instead of id in active query status map, * because the runId is unique for every started query, even it its a restart. @@ -51,12 +47,13 @@ private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends Streami private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES) override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + val startTimestamp = parseProgressTimestamp(event.timestamp) activeQueryStatus.putIfAbsent(event.runId, - new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime)) + new StreamingQueryUIData(event.name, event.id, event.runId, startTimestamp)) } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { - val batchTimestamp = timestampFormat.parse(event.progress.timestamp).getTime + val batchTimestamp = parseProgressTimestamp(event.progress.timestamp) val queryStatus = activeQueryStatus.getOrDefault( event.progress.runId, new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId, @@ -89,7 +86,7 @@ private[ui] class StreamingQueryUIData( val name: String, val id: UUID, val runId: UUID, - val submissionTime: Long) { + val startTimestamp: Long) { /** Holds the most recent query progress updates. */ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala index 57b9dec..cdad5ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming.ui +import java.text.SimpleDateFormat +import java.util.Locale + +import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone + private[ui] object UIUtils { /** @@ -57,4 +62,16 @@ private[ui] object UIUtils { query.exception.map(_ => "FAILED").getOrElse("FINISHED") } } + + private val progressTimestampFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = { + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + format.setTimeZone(getTimeZone("UTC")) + format + } + } + + def parseProgressTimestamp(timestamp: String): Long = { + progressTimestampFormat.get.parse(timestamp).getTime + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 6bb1646..e585b8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -254,8 +254,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(newEvent.name === event.name) } - testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", 1L)) - testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null, 1L)) + testSerialization( + new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", "2016-12-05T20:54:20.827Z")) + testSerialization( + new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null, "2016-12-05T20:54:20.827Z")) } test("QueryProgressEvent serialization") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala index de43e47..2a1e18a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala @@ -97,7 +97,7 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { when(streamQuery.name).thenReturn("query") when(streamQuery.id).thenReturn(id) when(streamQuery.runId).thenReturn(id) - when(streamQuery.submissionTime).thenReturn(1L) + when(streamQuery.startTimestamp).thenReturn(1L) when(streamQuery.lastProgress).thenReturn(progress) when(streamQuery.recentProgress).thenReturn(Array(progress)) when(streamQuery.exception).thenReturn(None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index adbb501..6aa440e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -32,7 +32,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest { // hanlde query started event val id = UUID.randomUUID() val runId = UUID.randomUUID() - val startEvent = new StreamingQueryListener.QueryStartedEvent(id, runId, "test", 1L) + val startEvent = new StreamingQueryListener.QueryStartedEvent( + id, runId, "test", "2016-12-05T20:54:20.827Z") listener.onQueryStarted(startEvent) // result checking @@ -78,7 +79,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest { // handle first time start val id = UUID.randomUUID() val runId0 = UUID.randomUUID() - val startEvent0 = new StreamingQueryListener.QueryStartedEvent(id, runId0, "test", 1L) + val startEvent0 = new StreamingQueryListener.QueryStartedEvent( + id, runId0, "test", "2016-12-05T20:54:20.827Z") listener.onQueryStarted(startEvent0) // handle terminate event @@ -87,7 +89,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest { // handle second time start val runId1 = UUID.randomUUID() - val startEvent1 = new StreamingQueryListener.QueryStartedEvent(id, runId1, "test", 1L) + val startEvent1 = new StreamingQueryListener.QueryStartedEvent( + id, runId1, "test", "2016-12-05T20:54:20.827Z") listener.onQueryStarted(startEvent1) // result checking diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala new file mode 100644 index 0000000..fdf4c66 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import org.openqa.selenium.WebDriver +import org.openqa.selenium.htmlunit.HtmlUnitDriver +import org.scalatest._ +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ +import org.scalatestplus.selenium.WebBrowser + +import org.apache.spark._ +import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT} +import org.apache.spark.sql.LocalSparkSession.withSparkSession +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.streaming.StreamingQueryException +import org.apache.spark.ui.SparkUICssErrorHandler + +class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with BeforeAndAfterAll { + + implicit var webDriver: WebDriver = _ + + override def beforeAll(): Unit = { + super.beforeAll() + webDriver = new HtmlUnitDriver { + getWebClient.setCssErrorHandler(new SparkUICssErrorHandler) + } + } + + private def newSparkSession( + master: String = "local", + additionalConfs: Map[String, String] = Map.empty): SparkSession = { + val conf = new SparkConf() + .setMaster(master) + .setAppName("ui-test") + .set(UI_ENABLED, true) + .set(UI_PORT, 0) + additionalConfs.foreach { case (k, v) => conf.set(k, v) } + val spark = SparkSession.builder().master(master).config(conf).getOrCreate() + assert(spark.sparkContext.ui.isDefined) + spark + } + + def goToUi(spark: SparkSession, path: String): Unit = { + go to (spark.sparkContext.ui.get.webUrl.stripSuffix("/") + path) + } + + test("SPARK-30984: Structured Streaming UI should be activated when running a streaming query") { + quietly { + withSparkSession(newSparkSession()) { spark => + import spark.implicits._ + try { + spark.range(1, 10).count() + + goToUi(spark, "/StreamingQuery") + + val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq + h3Text should not contain ("Streaming Query") + + val activeQuery = + spark.readStream.format("rate").load().writeStream.format("noop").start() + val completedQuery = + spark.readStream.format("rate").load().writeStream.format("noop").start() + completedQuery.stop() + val failedQuery = spark.readStream.format("rate").load().select("value").as[Long] + .map(_ / 0).writeStream.format("noop").start() + try { + failedQuery.awaitTermination() + } catch { + case _: StreamingQueryException => + } + + eventually(timeout(30.seconds), interval(100.milliseconds)) { + // Check the query list page + goToUi(spark, "/StreamingQuery") + + findAll(cssSelector("h3")).map(_.text).toSeq should contain("Streaming Query") + findAll(cssSelector("""#activeQueries-table th""")).map(_.text).toSeq should be { + List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", + "Avg Process /sec", "Lastest Batch") + } + val activeQueries = + findAll(cssSelector("""#activeQueries-table td""")).map(_.text).toSeq + activeQueries should contain(activeQuery.id.toString) + activeQueries should contain(activeQuery.runId.toString) + findAll(cssSelector("""#completedQueries-table th""")) + .map(_.text).toSeq should be { + List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", + "Avg Process /sec", "Lastest Batch", "Error") + } + val completedQueries = + findAll(cssSelector("""#completedQueries-table td""")).map(_.text).toSeq + completedQueries should contain(completedQuery.id.toString) + completedQueries should contain(completedQuery.runId.toString) + completedQueries should contain(failedQuery.id.toString) + completedQueries should contain(failedQuery.runId.toString) + + // Check the query statistics page + val activeQueryLink = + findAll(cssSelector("""#activeQueries-table a""")).flatMap(_.attribute("href")).next + go to activeQueryLink + + findAll(cssSelector("h3")) + .map(_.text).toSeq should contain("Streaming Query Statistics") + val summaryText = findAll(cssSelector("div strong")).map(_.text).toSeq + summaryText should contain ("Name:") + summaryText should contain ("Id:") + summaryText should contain ("RunId:") + findAll(cssSelector("""#stat-table th""")).map(_.text).toSeq should be { + List("", "Timelines", "Histograms") + } + } + } finally { + spark.streams.active.foreach(_.stop()) + } + } + } + } + + override def afterAll(): Unit = { + try { + if (webDriver != null) { + webDriver.quit() + } + } finally { + super.afterAll() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org