This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 47de971 [SPARK-38056][WEB UI][3.1] Fix issue of Structured streaming
not working in history server when using LevelDB
47de971 is described below
commit 47de971c8b2111a75665baad46a7f82ed0fc022c
Author: kuwii <[email protected]>
AuthorDate: Wed Feb 9 09:53:11 2022 -0800
[SPARK-38056][WEB UI][3.1] Fix issue of Structured streaming not working in
history server when using LevelDB
### What changes were proposed in this pull request?
Change type of `org.apache.spark.sql.streaming.ui.StreamingQueryData.runId`
from `UUID` to `String`.
### Why are the changes needed?
In
[SPARK-31953](https://github.com/apache/spark/commit/4f9667035886a67e6c9a4e8fad2efa390e87ca68),
structured streaming support is added in history server. However this does not
work when history server is using LevelDB instead of in-memory KV store.
- Level DB does not support `UUID` as key.
- If `spark.history.store.path` is set in history server to use Level DB,
when writing info to the store during replaying events, error will occur.
- `StreamingQueryStatusListener` will throw exceptions when writing info,
saying `java.lang.IllegalArgumentException: Type java.util.UUID not allowed as
key.`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added tests in `StreamingQueryStatusListenerSuite` to test whether
`StreamingQueryData` can be successfully written to in-memory store and
LevelDB.
Closes #35463 from kuwii/hs-streaming-fix-3.1.
Authored-by: kuwii <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../ui/StreamingQueryStatisticsPage.scala | 4 +-
.../ui/StreamingQueryStatusListener.scala | 6 +--
.../sql/streaming/ui/StreamingQueryPageSuite.scala | 2 +-
.../ui/StreamingQueryStatusListenerSuite.scala | 51 ++++++++++++++++++----
4 files changed, 49 insertions(+), 14 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index 97691d9..e13ac4e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.ui
import java.{util => ju}
import java.lang.{Long => JLong}
-import java.util.{Locale, UUID}
+import java.util.Locale
import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters._
@@ -59,7 +59,7 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
require(parameterId != null && parameterId.nonEmpty, "Missing id
parameter")
val query = parent.store.allQueryUIData.find { uiData =>
- uiData.summary.runId.equals(UUID.fromString(parameterId))
+ uiData.summary.runId.equals(parameterId)
}.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming
query $parameterId"))
val resources = generateLoadResources(request)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index fdd3754..b59ec04 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -75,7 +75,7 @@ private[sql] class StreamingQueryStatusListener(
store.write(new StreamingQueryData(
event.name,
event.id,
- event.runId,
+ event.runId.toString,
isActive = true,
None,
startTimestamp
@@ -100,7 +100,7 @@ private[sql] class StreamingQueryStatusListener(
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
- val querySummary = store.read(classOf[StreamingQueryData], event.runId)
+ val querySummary = store.read(classOf[StreamingQueryData],
event.runId.toString)
val curTime = System.currentTimeMillis()
store.write(new StreamingQueryData(
querySummary.name,
@@ -118,7 +118,7 @@ private[sql] class StreamingQueryStatusListener(
private[sql] class StreamingQueryData(
val name: String,
val id: UUID,
- @KVIndexParam val runId: UUID,
+ @KVIndexParam val runId: String,
@KVIndexParam("active") val isActive: Boolean,
val exception: Option[String],
@KVIndexParam("startTimestamp") val startTimestamp: Long,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
index 246fa1f..78ade6a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -103,7 +103,7 @@ class StreamingQueryPageSuite extends SharedSparkSession
with BeforeAndAfter {
when(summary.isActive).thenReturn(true)
when(summary.name).thenReturn("query")
when(summary.id).thenReturn(id)
- when(summary.runId).thenReturn(id)
+ when(summary.runId).thenReturn(id.toString)
when(summary.startTimestamp).thenReturn(1L)
when(summary.exception).thenReturn(None)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 91c55d5..a6fd9f4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -28,8 +28,9 @@ import
org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress, StreamTest}
import org.apache.spark.sql.streaming
-import org.apache.spark.status.ElementTrackingStore
-import org.apache.spark.util.kvstore.InMemoryStore
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
class StreamingQueryStatusListenerSuite extends StreamTest {
@@ -48,7 +49,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
// result checking
assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData
=>
- uiData.summary.runId == runId && uiData.summary.name.equals("test")))
+ uiData.summary.runId == runId.toString &&
uiData.summary.name.equals("test")))
// handle query progress event
val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
@@ -64,7 +65,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
// result checking
val activeQuery =
-
queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId ==
runId)
+
queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId ==
runId.toString)
assert(activeQuery.isDefined)
assert(activeQuery.get.summary.isActive)
assert(activeQuery.get.recentProgress.length == 1)
@@ -81,7 +82,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryTerminated(terminateEvent)
assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive)
-
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId
== runId)
+ assert(
+
queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId ==
runId.toString)
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id
== id)
}
@@ -110,10 +112,12 @@ class StreamingQueryStatusListenerSuite extends
StreamTest {
// result checking
assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1)
-
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId
== runId1))
+ assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(
+ _.summary.runId == runId1.toString))
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData
=>
- uiData.summary.runId == runId1 && uiData.summary.id == id))
-
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId
== runId0)
+ uiData.summary.runId == runId1.toString && uiData.summary.id == id))
+ assert(
+
queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId ==
runId0.toString)
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id
== id)
}
@@ -210,4 +214,35 @@ class StreamingQueryStatusListenerSuite extends StreamTest
{
addQueryProgress()
checkQueryProcessData(5)
}
+
+ test("SPARK-38056: test writing StreamingQueryData to an in-memory store") {
+ testStreamingQueryData(new InMemoryStore())
+ }
+
+ test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") {
+ val testDir = Utils.createTempDir()
+ val kvStore = KVUtils.open(testDir, getClass.getName)
+ try {
+ testStreamingQueryData(kvStore)
+ } finally {
+ kvStore.close()
+ Utils.deleteRecursively(testDir)
+ }
+ }
+
+ private def testStreamingQueryData(kvStore: KVStore): Unit = {
+ val id = UUID.randomUUID()
+ val testData = new StreamingQueryData(
+ "some-query",
+ id,
+ id.toString,
+ isActive = false,
+ None,
+ 1L,
+ None
+ )
+ val store = new ElementTrackingStore(kvStore, sparkConf)
+ store.write(testData)
+ store.close(closeParent = false)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]