This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 24cdae8f3dc [SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution 24cdae8f3dc is described below commit 24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b Author: Linhong Liu <linhong....@databricks.com> AuthorDate: Tue Mar 14 09:06:53 2023 -0700 [SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/39268 / [SPARK-41752](https://issues.apache.org/jira/browse/SPARK-41752) added a new non-optional `rootExecutionId: Long` field to the SparkListenerSQLExecutionStart case class. When JsonProtocol deserializes this event it uses the "ignore missing properties" Jackson deserialization option, causing the rootExecutionField to be initialized with a default value of 0. The value 0 is a legitimate execution ID, so in the deserialized event we have no ability to distinguish between the absence of a value and a case where all queries have the first query as the root. Thanks JoshRosen for reporting and investigating this issue. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #40403 from linhongliu-db/fix-nested-execution. Authored-by: Linhong Liu <linhong....@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 4db8e7b7944302a3929dd6a1197ea1385eecc46a) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/sql/execution/SQLExecution.scala | 2 +- .../spark/sql/execution/ui/AllExecutionsPage.scala | 2 +- .../sql/execution/ui/SQLAppStatusListener.scala | 2 +- .../spark/sql/execution/ui/SQLListener.scala | 5 +- .../spark/sql/execution/SQLJsonProtocolSuite.scala | 82 ++++++++++++---------- .../history/SQLEventFilterBuilderSuite.scala | 2 +- .../history/SQLLiveEntitiesEventFilterSuite.scala | 8 +-- .../sql/execution/ui/AllExecutionsPageSuite.scala | 47 +++++++++++-- .../execution/ui/MetricsAggregationBenchmark.scala | 2 +- .../execution/ui/SQLAppStatusListenerSuite.scala | 20 +++--- 10 files changed, 111 insertions(+), 61 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 56fc9d946df..eeca1669e74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -106,7 +106,7 @@ object SQLExecution { try { sc.listenerBus.post(SparkListenerSQLExecutionStart( executionId = executionId, - rootExecutionId = rootExecutionId, + rootExecutionId = Some(rootExecutionId), description = desc, details = callSite.longForm, physicalPlanDescription = queryExecution.explainString(planDescriptionMode), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index cd8f31b3c21..058ecbbb1cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -260,7 +260,7 @@ private[ui] class ExecutionPagedTable( private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}" - private val showSubExecutions = subExecutions.nonEmpty + private val showSubExecutions = subExecutions.exists(_._2.nonEmpty) override def tableId: String = s"$executionTag-table" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 32b215b1c2e..7b9f877bdef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -356,7 +356,7 @@ class SQLAppStatusListener( kvstore.write(graphToStore) val exec = getOrCreateExecution(executionId) - exec.rootExecutionId = rootExecutionId + exec.rootExecutionId = rootExecutionId.getOrElse(executionId) exec.description = description exec.details = details exec.physicalPlanDescription = physicalPlanDescription diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b931b4fcde1..d4c8f600a4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -44,7 +44,10 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates( case class SparkListenerSQLExecutionStart( executionId: Long, // if the execution is a root, then rootExecutionId == executionId - rootExecutionId: Long, + // if the event is parsed from the event log that generated by Spark not support + // nested execution, then rootExecutionId = None + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + rootExecutionId: Option[Long], description: String, details: String, physicalPlanDescription: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index e9d98ee9715..49cca666d1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -30,43 +30,53 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { test("SparkPlanGraph backward compatibility: metadata") { Seq(true, false).foreach { newExecutionStartEvent => - val event = if (newExecutionStartEvent) { - "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart" - } else { - "org.apache.spark.sql.execution.OldVersionSQLExecutionStart" - } - val SQLExecutionStartJsonString = - s""" - |{ - | "Event":"$event", - | "executionId":0, - | "description":"test desc", - | "details":"test detail", - | "physicalPlanDescription":"test plan", - | "sparkPlanInfo": { - | "nodeName":"TestNode", - | "simpleString":"test string", - | "children":[], - | "metadata":{}, - | "metrics":[] - | }, - | "time":0, - | "modifiedConfigs": { - | "k1":"v1" - | } - |} - """.stripMargin + Seq(true, false).foreach { newExecutionStartJson => + val event = if (newExecutionStartEvent) { + "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart" + } else { + "org.apache.spark.sql.execution.OldVersionSQLExecutionStart" + } + + val SQLExecutionStartJsonString = + s""" + |{ + | "Event":"$event", + | ${if (newExecutionStartJson) """"rootExecutionId": "1",""" else ""} + | "executionId":0, + | "description":"test desc", + | "details":"test detail", + | "physicalPlanDescription":"test plan", + | "sparkPlanInfo": { + | "nodeName":"TestNode", + | "simpleString":"test string", + | "children":[], + | "metadata":{}, + | "metrics":[] + | }, + | "time":0, + | "modifiedConfigs": { + | "k1":"v1" + | } + |} + """.stripMargin - val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString) - if (newExecutionStartEvent) { - val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc", "test detail", - "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, - Map("k1" -> "v1")) - assert(reconstructedEvent == expectedEvent) - } else { - val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", - "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) - assert(reconstructedEvent == expectedOldEvent) + val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString) + if (newExecutionStartEvent) { + val expectedEvent = if (newExecutionStartJson) { + SparkListenerSQLExecutionStart(0, Some(1), "test desc", "test detail", + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, + Map("k1" -> "v1")) + } else { + SparkListenerSQLExecutionStart(0, None, "test desc", "test detail", + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, + Map("k1" -> "v1")) + } + assert(reconstructedEvent == expectedEvent) + } else { + val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) + assert(reconstructedEvent == expectedOldEvent) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala index 42b27bd9f28..87ac58dbc3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite { } // Start SQL Execution - listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1", "details1", "plan", + listener.onOtherEvent(SparkListenerSQLExecutionStart(1, Some(1), "desc1", "details1", "plan", new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty)) time += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala index f1b77e502df..f9eea3816fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -41,8 +41,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { val acceptFn = filter.acceptFn().lift // Verifying with finished SQL execution 1 - assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1, "description1", "details1", - "plan", null, 0, Map.empty))) + assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, Some(1), + "description1", "details1", "plan", null, 0, Map.empty))) assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty))) @@ -88,8 +88,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { } // Verifying with live SQL execution 2 - assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2, "description2", "details2", - "plan", null, 0, Map.empty))) + assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, Some(2), + "description2", "details2", "plan", null, 0, Map.empty))) assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 7af58867f33..d1cd32f3621 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -86,7 +86,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( 0, - 0, + Some(0), "test", "test", df.queryExecution.toString, @@ -142,7 +142,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( 0, - 0, + Some(0), "test", "test", df.queryExecution.toString, @@ -150,7 +150,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA System.currentTimeMillis())) listener.onOtherEvent(SparkListenerSQLExecutionStart( 1, - 0, + Some(0), "test", "test", df.queryExecution.toString, @@ -159,7 +159,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA // sub execution has a missing root execution listener.onOtherEvent(SparkListenerSQLExecutionStart( 2, - 100, + Some(100), "test", "test", df.queryExecution.toString, @@ -171,6 +171,43 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA assert(html.contains("id=2")) } + test("SPARK-42754: group sub executions - backward compatibility") { + val statusStore = createStatusStore + val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS) + val request = mock(classOf[HttpServletRequest]) + + val sparkConf = new SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true) + when(tab.conf).thenReturn(sparkConf) + when(tab.sqlStore).thenReturn(statusStore) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + + val listener = statusStore.listener.get + val page = new AllExecutionsPage(tab) + val df = createTestDataFrame + // testing compatibility with old event logs for which rootExecutionId = None + // because the field is missing when generated by a Spark version not support + // nested execution + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, + None, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 1, + None, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + assert(!html.contains("sub execution ids") && !html.contains("sub-execution-list")) + } + protected def createStatusStore: SQLAppStatusStore private def createTestDataFrame: DataFrame = { @@ -196,7 +233,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index 3b9efb18057..252bcea8b8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -75,7 +75,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { val executionId = idgen.incrementAndGet() val executionStart = SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), getClass().getName(), getClass().getName(), getClass().getName(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 81c745029fd..fdc633f3556 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -191,7 +191,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -382,7 +382,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -413,7 +413,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -455,7 +455,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -486,7 +486,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -518,7 +518,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -659,7 +659,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 1, - 1, + Some(1), "test", "test", df.queryExecution.toString, @@ -669,7 +669,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 2, - 2, + Some(2), "test", "test", df.queryExecution.toString, @@ -687,7 +687,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 3, - 3, + Some(3), "test", "test", df.queryExecution.toString, @@ -724,7 +724,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org