This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 62fd133 [SPARK-27019][SQL][WEBUI] onJobStart happens after onExecutionEnd shouldn't overwrite kvstore 62fd133 is described below commit 62fd133f744ab2d1aa3c409165914b5940e4d328 Author: Shahid <shahidk...@gmail.com> AuthorDate: Wed Mar 6 14:02:30 2019 -0800 [SPARK-27019][SQL][WEBUI] onJobStart happens after onExecutionEnd shouldn't overwrite kvstore ## What changes were proposed in this pull request? Currently, when the event reordering happens, especially onJobStart event come after onExecutionEnd event, SQL page in the UI displays weirdly.(for eg:test mentioned in JIRA and also this issue randomly occurs when the TPCDS query fails due to broadcast timeout etc.) The reason is that, In the SQLAppstatusListener, we remove the liveExecutions entry once the execution ends. So, if a jobStart event come after that, then we create a new liveExecution entry corresponding to the execId. Eventually this will overwrite the kvstore and UI displays confusing entries. ## How was this patch tested? Added UT, Also manually tested with the eventLog, provided in the jira, of the failed query. Before fix: ![screenshot from 2019-03-03 03-05-52](https://user-images.githubusercontent.com/23054875/53687929-53e2b800-3d61-11e9-9dca-620fa41e605c.png) After fix: ![screenshot from 2019-03-03 02-40-18](https://user-images.githubusercontent.com/23054875/53687928-4f1e0400-3d61-11e9-86aa-584646ac68f9.png) Closes #23939 from shahidki31/SPARK-27019. Authored-by: Shahid <shahidk...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../sql/execution/ui/SQLAppStatusListener.scala | 45 +++++++++++++++++----- .../execution/ui/SQLAppStatusListenerSuite.scala | 30 +++++++++++++++ 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 45954f2..daf7f2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.ui -import java.util.Date +import java.util.{Date, NoSuchElementException} import java.util.concurrent.ConcurrentHashMap import java.util.function.Function @@ -77,7 +77,29 @@ class SQLAppStatusListener( val executionId = executionIdString.toLong val jobId = event.jobId - val exec = getOrCreateExecution(executionId) + val exec = Option(liveExecutions.get(executionId)) + .orElse { + try { + // Should not overwrite the kvstore with new entry, if it already has the SQLExecution + // data corresponding to the execId. + val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId) + val executionData = new LiveExecutionData(executionId) + executionData.description = sqlStoreData.description + executionData.details = sqlStoreData.details + executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription + executionData.metrics = sqlStoreData.metrics + executionData.submissionTime = sqlStoreData.submissionTime + executionData.completionTime = sqlStoreData.completionTime + executionData.jobs = sqlStoreData.jobs + executionData.stages = sqlStoreData.stages + executionData.metricsValues = sqlStoreData.metricValues + executionData.endEvents = sqlStoreData.jobs.size + 1 + liveExecutions.put(executionId, executionData) + Some(executionData) + } catch { + case _: NoSuchElementException => None + } + }.getOrElse(getOrCreateExecution(executionId)) // Record the accumulator IDs for the stages of this job, so that the code that keeps // track of the metrics knows which accumulators to look at. @@ -275,16 +297,20 @@ class SQLAppStatusListener( exec.endEvents += 1 update(exec) - // Remove stale LiveStageMetrics objects for stages that are not active anymore. - val activeStages = liveExecutions.values().asScala.flatMap { other => - if (other != exec) other.stages else Nil - }.toSet - stageMetrics.keySet().asScala - .filter(!activeStages.contains(_)) - .foreach(stageMetrics.remove) + removeStaleMetricsData(exec) } } + private def removeStaleMetricsData(exec: LiveExecutionData): Unit = { + // Remove stale LiveStageMetrics objects for stages that are not active anymore. + val activeStages = liveExecutions.values().asScala.flatMap { other => + if (other != exec) other.stages else Nil + }.toSet + stageMetrics.keySet().asScala + .filter(!activeStages.contains(_)) + .foreach(stageMetrics.remove) + } + private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = { val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event Option(liveExecutions.get(executionId)).foreach { exec => @@ -311,6 +337,7 @@ class SQLAppStatusListener( val now = System.nanoTime() if (exec.endEvents >= exec.jobs.size + 1) { exec.write(kvstore, now) + removeStaleMetricsData(exec) liveExecutions.remove(exec.executionId) } else if (force) { exec.write(kvstore, now) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index c8d862c..f19bf5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -384,6 +384,36 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with assertJobs(statusStore.execution(executionId), failed = Seq(0)) } + test("onJobStart happens after onExecutionEnd shouldn't overwrite kvstore") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + + val executionId = 0 + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + listener.onJobStart(SparkListenerJobStart( + jobId = 0, + time = System.currentTimeMillis(), + stageInfos = Seq(createStageInfo(0, 0)), + createProperties(executionId))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) + listener.onJobEnd(SparkListenerJobEnd( + jobId = 0, + time = System.currentTimeMillis(), + JobFailed(new RuntimeException("Oops")))) + + assert(listener.noLiveData()) + assert(statusStore.execution(executionId).get.completionTime.nonEmpty) + } + test("handle one execution with multiple jobs") { val statusStore = createStatusStore() val listener = statusStore.listener.get --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org