This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 77725c1acbc [SPARK-41895][SS][UI] Add tests for streaming UI with 
RocksDB backend
77725c1acbc is described below

commit 77725c1acbc278ec78e100ed1099de418322800e
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Fri Jan 6 11:16:54 2023 -0800

    [SPARK-41895][SS][UI] Add tests for streaming UI with RocksDB backend
    
    ### What changes were proposed in this pull request?
    
    Add tests for streaming UI with RocksDB backend
    
    ### Why are the changes needed?
    
    Better test coverage when live UI with RocksDB backend is enabled.
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    GA tests
    
    Closes #39415 from gengliangwang/streamingTest.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../sql/StreamingQueryDataSerializer.scala         |  2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala     | 91 ++++++++++++++++------
 .../spark/sql/streaming/ui/UISeleniumSuite.scala   | 29 ++++++-
 3 files changed, 93 insertions(+), 29 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
index f63ea80c77e..f05b186fea5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
@@ -30,10 +30,10 @@ class StreamingQueryDataSerializer extends ProtobufSerDe {
   override def serialize(input: Any): Array[Byte] = {
     val data = input.asInstanceOf[StreamingQueryData]
     val builder = StoreTypes.StreamingQueryData.newBuilder()
-      .setName(data.name)
       .setId(data.id.toString)
       .setRunId(data.runId)
       .setIsActive(data.isActive)
+    Option(data.name).foreach(builder.setName)
     data.exception.foreach(builder.setException)
     builder.setStartTimestamp(data.startTimestamp)
     data.endTimestamp.foreach(builder.setEndTimestamp)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 7781c4276c7..ad4c1ff28d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.streaming.ui
 
+import java.io.File
 import java.text.SimpleDateFormat
 import java.util.{Date, UUID}
 
@@ -28,7 +29,7 @@ import 
org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, Hybr
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
 import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
 import org.apache.spark.sql.internal.StaticSQLConf
-import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress, StreamTest}
+import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, 
StreamingQueryListener, StreamingQueryProgress, StreamTest}
 import org.apache.spark.sql.streaming
 import org.apache.spark.status.{ElementTrackingStore, KVUtils}
 import org.apache.spark.util.Utils
@@ -36,8 +37,17 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
 
 class StreamingQueryStatusListenerSuite extends StreamTest {
 
+  protected def createStore(): KVStore = new InMemoryStore()
+
+  protected def useInMemoryStore: Boolean = true
+
+  private val sourceProgresses = Array(
+    new SourceProgress("s1", "", "", "", 10, 4.0, 5.0),
+    new SourceProgress("s2", "", "", "", 10, 6.0, 7.0)
+  )
+
   test("onQueryStarted, onQueryProgress, onQueryTerminated") {
-    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val kvStore = new ElementTrackingStore(createStore(), sparkConf)
     val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, 
kvStore)
     val queryStore = new StreamingQueryStatusStore(kvStore)
 
@@ -62,6 +72,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     when(progress.processedRowsPerSecond).thenReturn(12.0)
     when(progress.batchId).thenReturn(2)
     when(progress.prettyJson).thenReturn("""{"a":1}""")
+    when(progress.sink).thenReturn(new SinkProgress("mock query", 1))
+    when(progress.sources).thenReturn(sourceProgresses)
     val processEvent = new 
streaming.StreamingQueryListener.QueryProgressEvent(progress)
     listener.onQueryProgress(processEvent)
 
@@ -77,7 +89,19 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     assert(activeQuery.get.lastProgress.inputRowsPerSecond == 10.0)
     assert(activeQuery.get.lastProgress.processedRowsPerSecond == 12.0)
     assert(activeQuery.get.lastProgress.batchId == 2)
-    assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""")
+    if (useInMemoryStore) {
+      assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""")
+    } else {
+      // When using disk-based KV Store, the mock progress object will be 
written to KV Store
+      // and read back as an instance of StreamingQueryProgress. Here we can 
simple check if
+      // the json value contains the id and runId fields.
+      val jsonFragment =
+        s"""
+           |  "id" : "$id",
+           |  "runId" : "$runId",
+           |""".stripMargin
+      assert(activeQuery.get.lastProgress.prettyJson.contains(jsonFragment))
+    }
 
     // handle terminate event
     val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, 
runId, None)
@@ -90,7 +114,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
   }
 
   test("same query start multiple times") {
-    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val kvStore = new ElementTrackingStore(createStore(), sparkConf)
     val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, 
kvStore)
     val queryStore = new StreamingQueryStatusStore(kvStore)
 
@@ -124,7 +148,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
   }
 
   test("test small retained queries") {
-    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val kvStore = new ElementTrackingStore(createStore(), sparkConf)
     val conf = spark.sparkContext.conf
     conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2")
     val listener = new StreamingQueryStatusListener(conf, kvStore)
@@ -166,7 +190,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
   }
 
   test("test small retained progress") {
-    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val kvStore = new ElementTrackingStore(createStore(), sparkConf)
     val conf = spark.sparkContext.conf
     conf.set(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES.key, "5")
     val listener = new StreamingQueryStatusListener(conf, kvStore)
@@ -189,7 +213,6 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     def mockProgressData(id: UUID, runId: UUID): StreamingQueryProgress = {
       val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // 
ISO8601
       format.setTimeZone(getTimeZone("UTC"))
-
       val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
       when(progress.id).thenReturn(id)
       when(progress.runId).thenReturn(runId)
@@ -198,6 +221,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
       when(progress.processedRowsPerSecond).thenReturn(12.0)
       when(progress.batchId).thenReturn(batchId)
       when(progress.prettyJson).thenReturn("""{"a":1}""")
+      when(progress.sink).thenReturn(new SinkProgress("mock query", 1))
+      when(progress.sources).thenReturn(sourceProgresses)
 
       batchId += 1
       progress
@@ -218,7 +243,41 @@ class StreamingQueryStatusListenerSuite extends StreamTest 
{
   }
 
   test("SPARK-38056: test writing StreamingQueryData to an in-memory store") {
-    testStreamingQueryData(new InMemoryStore())
+    testStreamingQueryData(createStore())
+  }
+
+  protected def testStreamingQueryData(kvStore: KVStore): Unit = {
+    val id = UUID.randomUUID()
+    val testData = new StreamingQueryData(
+      "some-query",
+      id,
+      id.toString,
+      isActive = false,
+      None,
+      1L,
+      None
+    )
+    val store = new ElementTrackingStore(kvStore, sparkConf)
+    store.write(testData)
+    store.close(closeParent = false)
+  }
+}
+
+class StreamingQueryStatusListenerWithDiskStoreSuite extends 
StreamingQueryStatusListenerSuite {
+  private var storePath: File = _
+
+  override def createStore(): KVStore = {
+    storePath = Utils.createTempDir()
+    KVUtils.createKVStore(Some(storePath), live = true, sparkConf)
+  }
+
+  override def useInMemoryStore: Boolean = false
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    if (storePath != null && storePath.exists()) {
+      Utils.deleteRecursively(storePath)
+    }
   }
 
   test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") {
@@ -247,20 +306,4 @@ class StreamingQueryStatusListenerSuite extends StreamTest 
{
       Utils.deleteRecursively(testDir)
     }
   }
-
-  private def testStreamingQueryData(kvStore: KVStore): Unit = {
-    val id = UUID.randomUUID()
-    val testData = new StreamingQueryData(
-      "some-query",
-      id,
-      id.toString,
-      isActive = false,
-      None,
-      1L,
-      None
-    )
-    val store = new ElementTrackingStore(kvStore, sparkConf)
-    store.write(testData)
-    store.close(closeParent = false)
-  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
index 83f4ff15eb4..c3de44c3ba1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.time.SpanSugar._
 import org.scalatestplus.selenium.WebBrowser
 
 import org.apache.spark._
+import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
 import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT}
 import org.apache.spark.sql.LocalSparkSession.withSparkSession
 import org.apache.spark.sql.SparkSession
@@ -35,6 +36,7 @@ import 
org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
 import 
org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
 import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger}
 import org.apache.spark.ui.SparkUICssErrorHandler
+import org.apache.spark.util.Utils
 
 class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {
 
@@ -47,16 +49,20 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser 
with Matchers {
     }
   }
 
-  private def newSparkSession(
-      master: String = "local",
-      additionalConfs: Map[String, String] = Map.empty): SparkSession = {
-    val conf = new SparkConf()
+  def getSparkConf(master: String): SparkConf = {
+    new SparkConf()
       .setMaster(master)
       .setAppName("ui-test")
       .set(SHUFFLE_PARTITIONS, 5)
       .set(UI_ENABLED, true)
       .set(UI_PORT, 0)
       .set(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST, 
Seq("stateOnCurrentVersionSizeBytes"))
+  }
+
+  private def newSparkSession(
+      master: String = "local",
+      additionalConfs: Map[String, String] = Map.empty): SparkSession = {
+    val conf = getSparkConf(master)
     additionalConfs.foreach { case (k, v) => conf.set(k, v) }
     val spark = 
SparkSession.builder().master(master).config(conf).getOrCreate()
     assert(spark.sparkContext.ui.isDefined)
@@ -172,3 +178,18 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers {
     }
   }
 }
+
+class UISeleniumWithRocksDBBackendSuite extends UISeleniumSuite {
+  private val storePath = Utils.createTempDir()
+
+  override def getSparkConf(master: String): SparkConf = {
+    super.getSparkConf(master).set(LIVE_UI_LOCAL_STORE_DIR, 
storePath.getCanonicalPath)
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    if (storePath.exists()) {
+      Utils.deleteRecursively(storePath)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to