This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dee294b [SPARK-38056][WEB UI] Fix issue of Structured streaming not working in history server when using LevelDB dee294b is described below commit dee294b453b550471028fdbd9e17952963504a3a Author: kuwii <kuwii.some...@gmail.com> AuthorDate: Wed Feb 9 16:59:38 2022 +0900 [SPARK-38056][WEB UI] Fix issue of Structured streaming not working in history server when using LevelDB ### What changes were proposed in this pull request? Change type of `org.apache.spark.sql.streaming.ui.StreamingQueryData.runId` from `UUID` to `String`. ### Why are the changes needed? In [SPARK-31953](https://github.com/apache/spark/commit/4f9667035886a67e6c9a4e8fad2efa390e87ca68), structured streaming support is added in history server. However this does not work when history server is using LevelDB instead of in-memory KV store. - Level DB does not support `UUID` as key. - If `spark.history.store.path` is set in history server to use Level DB, when writing info to the store during replaying events, error will occur. - `StreamingQueryStatusListener` will throw exceptions when writing info, saying `java.lang.IllegalArgumentException: Type java.util.UUID not allowed as key.`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests in `StreamingQueryStatusListenerSuite` to test whether `StreamingQueryData` can be successfully written to in-memory store, LevelDB and RocksDB. Closes #35356 from kuwii/hs-streaming-fix. Authored-by: kuwii <kuwii.some...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../ui/StreamingQueryStatisticsPage.scala | 4 +- .../ui/StreamingQueryStatusListener.scala | 6 +- .../sql/streaming/ui/StreamingQueryPageSuite.scala | 2 +- .../ui/StreamingQueryStatusListenerSuite.scala | 64 +++++++++++++++++++--- 4 files changed, 62 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala index 97691d9..e13ac4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.ui import java.{util => ju} import java.lang.{Long => JLong} -import java.util.{Locale, UUID} +import java.util.Locale import javax.servlet.http.HttpServletRequest import scala.collection.JavaConverters._ @@ -59,7 +59,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") val query = parent.store.allQueryUIData.find { uiData => - uiData.summary.runId.equals(UUID.fromString(parameterId)) + uiData.summary.runId.equals(parameterId) }.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId")) val resources = generateLoadResources(request) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index fdd3754..b59ec04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -75,7 +75,7 @@ private[sql] class StreamingQueryStatusListener( store.write(new StreamingQueryData( event.name, event.id, - event.runId, + event.runId.toString, isActive = true, None, startTimestamp @@ -100,7 +100,7 @@ private[sql] class StreamingQueryStatusListener( override def onQueryTerminated( event: StreamingQueryListener.QueryTerminatedEvent): Unit = { - val querySummary = store.read(classOf[StreamingQueryData], event.runId) + val querySummary = store.read(classOf[StreamingQueryData], event.runId.toString) val curTime = System.currentTimeMillis() store.write(new StreamingQueryData( querySummary.name, @@ -118,7 +118,7 @@ private[sql] class StreamingQueryStatusListener( private[sql] class StreamingQueryData( val name: String, val id: UUID, - @KVIndexParam val runId: UUID, + @KVIndexParam val runId: String, @KVIndexParam("active") val isActive: Boolean, val exception: Option[String], @KVIndexParam("startTimestamp") val startTimestamp: Long, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala index 246fa1f..78ade6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala @@ -103,7 +103,7 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { when(summary.isActive).thenReturn(true) when(summary.name).thenReturn("query") when(summary.id).thenReturn(id) - when(summary.runId).thenReturn(id) + when(summary.runId).thenReturn(id.toString) when(summary.startTimestamp).thenReturn(1L) when(summary.exception).thenReturn(None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index 91c55d5..eee1a7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -28,8 +28,9 @@ import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest} import org.apache.spark.sql.streaming -import org.apache.spark.status.ElementTrackingStore -import org.apache.spark.util.kvstore.InMemoryStore +import org.apache.spark.status.{ElementTrackingStore, KVUtils} +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, RocksDB} class StreamingQueryStatusListenerSuite extends StreamTest { @@ -48,7 +49,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { // result checking assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1) assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData => - uiData.summary.runId == runId && uiData.summary.name.equals("test"))) + uiData.summary.runId == runId.toString && uiData.summary.name.equals("test"))) // handle query progress event val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS) @@ -64,7 +65,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { // result checking val activeQuery = - queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == runId) + queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == runId.toString) assert(activeQuery.isDefined) assert(activeQuery.get.summary.isActive) assert(activeQuery.get.recentProgress.length == 1) @@ -81,7 +82,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest { listener.onQueryTerminated(terminateEvent) assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive) - assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId) + assert( + queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId.toString) assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id) } @@ -110,10 +112,12 @@ class StreamingQueryStatusListenerSuite extends StreamTest { // result checking assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1) assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1) - assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId == runId1)) + assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists( + _.summary.runId == runId1.toString)) assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData => - uiData.summary.runId == runId1 && uiData.summary.id == id)) - assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId0) + uiData.summary.runId == runId1.toString && uiData.summary.id == id)) + assert( + queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId0.toString) assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id) } @@ -210,4 +214,48 @@ class StreamingQueryStatusListenerSuite extends StreamTest { addQueryProgress() checkQueryProcessData(5) } + + test("SPARK-38056: test writing StreamingQueryData to an in-memory store") { + testStreamingQueryData(new InMemoryStore()) + } + + test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") { + assume(!Utils.isMacOnAppleSilicon) + val testDir = Utils.createTempDir() + val kvStore = KVUtils.open(testDir, getClass.getName) + try { + testStreamingQueryData(kvStore) + } finally { + kvStore.close() + Utils.deleteRecursively(testDir) + } + } + + test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") { + assume(!Utils.isMacOnAppleSilicon) + val testDir = Utils.createTempDir() + val kvStore = new RocksDB(testDir) + try { + testStreamingQueryData(kvStore) + } finally { + kvStore.close() + Utils.deleteRecursively(testDir) + } + } + + private def testStreamingQueryData(kvStore: KVStore): Unit = { + val id = UUID.randomUUID() + val testData = new StreamingQueryData( + "some-query", + id, + id.toString, + isActive = false, + None, + 1L, + None + ) + val store = new ElementTrackingStore(kvStore, sparkConf) + store.write(testData) + store.close(closeParent = false) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org