This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 77725c1acbc [SPARK-41895][SS][UI] Add tests for streaming UI with RocksDB backend 77725c1acbc is described below commit 77725c1acbc278ec78e100ed1099de418322800e Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Fri Jan 6 11:16:54 2023 -0800 [SPARK-41895][SS][UI] Add tests for streaming UI with RocksDB backend ### What changes were proposed in this pull request? Add tests for streaming UI with RocksDB backend ### Why are the changes needed? Better test coverage when live UI with RocksDB backend is enabled. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA tests Closes #39415 from gengliangwang/streamingTest. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../sql/StreamingQueryDataSerializer.scala | 2 +- .../ui/StreamingQueryStatusListenerSuite.scala | 91 ++++++++++++++++------ .../spark/sql/streaming/ui/UISeleniumSuite.scala | 29 ++++++- 3 files changed, 93 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala index f63ea80c77e..f05b186fea5 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala @@ -30,10 +30,10 @@ class StreamingQueryDataSerializer extends ProtobufSerDe { override def serialize(input: Any): Array[Byte] = { val data = input.asInstanceOf[StreamingQueryData] val builder = StoreTypes.StreamingQueryData.newBuilder() - .setName(data.name) .setId(data.id.toString) .setRunId(data.runId) .setIsActive(data.isActive) + Option(data.name).foreach(builder.setName) data.exception.foreach(builder.setException) builder.setStartTimestamp(data.startTimestamp) data.endTimestamp.foreach(builder.setEndTimestamp) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index 7781c4276c7..ad4c1ff28d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming.ui +import java.io.File import java.text.SimpleDateFormat import java.util.{Date, UUID} @@ -28,7 +29,7 @@ import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, Hybr import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore import org.apache.spark.sql.internal.StaticSQLConf -import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest} +import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, StreamingQueryListener, StreamingQueryProgress, StreamTest} import org.apache.spark.sql.streaming import org.apache.spark.status.{ElementTrackingStore, KVUtils} import org.apache.spark.util.Utils @@ -36,8 +37,17 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} class StreamingQueryStatusListenerSuite extends StreamTest { + protected def createStore(): KVStore = new InMemoryStore() + + protected def useInMemoryStore: Boolean = true + + private val sourceProgresses = Array( + new SourceProgress("s1", "", "", "", 10, 4.0, 5.0), + new SourceProgress("s2", "", "", "", 10, 6.0, 7.0) + ) + test("onQueryStarted, onQueryProgress, onQueryTerminated") { - val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val kvStore = new ElementTrackingStore(createStore(), sparkConf) val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore) val queryStore = new StreamingQueryStatusStore(kvStore) @@ -62,6 +72,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest { when(progress.processedRowsPerSecond).thenReturn(12.0) when(progress.batchId).thenReturn(2) when(progress.prettyJson).thenReturn("""{"a":1}""") + when(progress.sink).thenReturn(new SinkProgress("mock query", 1)) + when(progress.sources).thenReturn(sourceProgresses) val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress) listener.onQueryProgress(processEvent) @@ -77,7 +89,19 @@ class StreamingQueryStatusListenerSuite extends StreamTest { assert(activeQuery.get.lastProgress.inputRowsPerSecond == 10.0) assert(activeQuery.get.lastProgress.processedRowsPerSecond == 12.0) assert(activeQuery.get.lastProgress.batchId == 2) - assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""") + if (useInMemoryStore) { + assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""") + } else { + // When using disk-based KV Store, the mock progress object will be written to KV Store + // and read back as an instance of StreamingQueryProgress. Here we can simple check if + // the json value contains the id and runId fields. + val jsonFragment = + s""" + | "id" : "$id", + | "runId" : "$runId", + |""".stripMargin + assert(activeQuery.get.lastProgress.prettyJson.contains(jsonFragment)) + } // handle terminate event val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None) @@ -90,7 +114,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { } test("same query start multiple times") { - val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val kvStore = new ElementTrackingStore(createStore(), sparkConf) val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore) val queryStore = new StreamingQueryStatusStore(kvStore) @@ -124,7 +148,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { } test("test small retained queries") { - val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val kvStore = new ElementTrackingStore(createStore(), sparkConf) val conf = spark.sparkContext.conf conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2") val listener = new StreamingQueryStatusListener(conf, kvStore) @@ -166,7 +190,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { } test("test small retained progress") { - val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf) + val kvStore = new ElementTrackingStore(createStore(), sparkConf) val conf = spark.sparkContext.conf conf.set(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES.key, "5") val listener = new StreamingQueryStatusListener(conf, kvStore) @@ -189,7 +213,6 @@ class StreamingQueryStatusListenerSuite extends StreamTest { def mockProgressData(id: UUID, runId: UUID): StreamingQueryProgress = { val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 format.setTimeZone(getTimeZone("UTC")) - val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS) when(progress.id).thenReturn(id) when(progress.runId).thenReturn(runId) @@ -198,6 +221,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest { when(progress.processedRowsPerSecond).thenReturn(12.0) when(progress.batchId).thenReturn(batchId) when(progress.prettyJson).thenReturn("""{"a":1}""") + when(progress.sink).thenReturn(new SinkProgress("mock query", 1)) + when(progress.sources).thenReturn(sourceProgresses) batchId += 1 progress @@ -218,7 +243,41 @@ class StreamingQueryStatusListenerSuite extends StreamTest { } test("SPARK-38056: test writing StreamingQueryData to an in-memory store") { - testStreamingQueryData(new InMemoryStore()) + testStreamingQueryData(createStore()) + } + + protected def testStreamingQueryData(kvStore: KVStore): Unit = { + val id = UUID.randomUUID() + val testData = new StreamingQueryData( + "some-query", + id, + id.toString, + isActive = false, + None, + 1L, + None + ) + val store = new ElementTrackingStore(kvStore, sparkConf) + store.write(testData) + store.close(closeParent = false) + } +} + +class StreamingQueryStatusListenerWithDiskStoreSuite extends StreamingQueryStatusListenerSuite { + private var storePath: File = _ + + override def createStore(): KVStore = { + storePath = Utils.createTempDir() + KVUtils.createKVStore(Some(storePath), live = true, sparkConf) + } + + override def useInMemoryStore: Boolean = false + + override def afterEach(): Unit = { + super.afterEach() + if (storePath != null && storePath.exists()) { + Utils.deleteRecursively(storePath) + } } test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") { @@ -247,20 +306,4 @@ class StreamingQueryStatusListenerSuite extends StreamTest { Utils.deleteRecursively(testDir) } } - - private def testStreamingQueryData(kvStore: KVStore): Unit = { - val id = UUID.randomUUID() - val testData = new StreamingQueryData( - "some-query", - id, - id.toString, - isActive = false, - None, - 1L, - None - ) - val store = new ElementTrackingStore(kvStore, sparkConf) - store.write(testData) - store.close(closeParent = false) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala index 83f4ff15eb4..c3de44c3ba1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.time.SpanSugar._ import org.scalatestplus.selenium.WebBrowser import org.apache.spark._ +import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT} import org.apache.spark.sql.LocalSparkSession.withSparkSession import org.apache.spark.sql.SparkSession @@ -35,6 +36,7 @@ import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger} import org.apache.spark.ui.SparkUICssErrorHandler +import org.apache.spark.util.Utils class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers { @@ -47,16 +49,20 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers { } } - private def newSparkSession( - master: String = "local", - additionalConfs: Map[String, String] = Map.empty): SparkSession = { - val conf = new SparkConf() + def getSparkConf(master: String): SparkConf = { + new SparkConf() .setMaster(master) .setAppName("ui-test") .set(SHUFFLE_PARTITIONS, 5) .set(UI_ENABLED, true) .set(UI_PORT, 0) .set(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST, Seq("stateOnCurrentVersionSizeBytes")) + } + + private def newSparkSession( + master: String = "local", + additionalConfs: Map[String, String] = Map.empty): SparkSession = { + val conf = getSparkConf(master) additionalConfs.foreach { case (k, v) => conf.set(k, v) } val spark = SparkSession.builder().master(master).config(conf).getOrCreate() assert(spark.sparkContext.ui.isDefined) @@ -172,3 +178,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers { } } } + +class UISeleniumWithRocksDBBackendSuite extends UISeleniumSuite { + private val storePath = Utils.createTempDir() + + override def getSparkConf(master: String): SparkConf = { + super.getSparkConf(master).set(LIVE_UI_LOCAL_STORE_DIR, storePath.getCanonicalPath) + } + + override def afterAll(): Unit = { + super.afterAll() + if (storePath.exists()) { + Utils.deleteRecursively(storePath) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org