This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 89d1d17e57a [SPARK-42391][CORE][TESTS] Close live `AppStatusStore` in the finally block for test cases 89d1d17e57a is described below commit 89d1d17e57abc4c825cb9259f7a319b80e0d854a Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun Feb 12 20:14:52 2023 -0800 [SPARK-42391][CORE][TESTS] Close live `AppStatusStore` in the finally block for test cases ### What changes were proposed in this pull request? `AppStatusStore.createLiveStore` will return `RocksDB` backend `AppStatusStore` when `LIVE_UI_LOCAL_STORE_DIR` or `LIVE_UI_LOCAL_STORE_DIR` is configured, it should be closed in finally block to release resources for test cases. There are 4 test suites use `AppStatusStore.createLiveStore` function: - `AppStatusStoreSuite`: one case not close `AppStatusStore` - `StagePageSuite`: already call close in `finally` block - `AllExecutionsPageSuite`: already call close in `after` - `SQLAppStatusListenerSuite`: already call close in `after` and only `AppStatusStoreSuite` has `AppStatusStore` without manual closing, so this pr has made the following changes: - For `SPARK-36038: speculation summary should not be present if there are no speculative tasks` in `AppStatusStoreSuite`, add close `AppStatusStore` in finally block - For `SPARK-26260: summary should contain only successful tasks' metrics (store = $hint)`, move the existing `AppStatusStore.close` to the finally block ### Why are the changes needed? Call `AppStatusStore.close` in the finally block to release possible RocksDB resources. ### Does this PR introduce _any_ user-facing change? No, just for test ### How was this patch tested? - Pass GitHub Actions - Manual test ``` export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui build/sbt clean "core/testOnly org.apache.spark.status.AppStatusStoreSuite" ``` All tests passed Closes #39961 from LuciferYang/SPARK-42391. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/status/AppStatusStoreSuite.scala | 173 +++++++++++---------- 1 file changed, 90 insertions(+), 83 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index d38b0857e57..ccf6c9184cc 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -133,78 +133,81 @@ class AppStatusStoreSuite extends SparkFunSuite { cases.foreach { case (hint, appStore) => test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint)") { assume(appStore != null) - val store = appStore.store - - // Success and failed tasks metrics - for (i <- 0 to 5) { - if (i % 2 == 0) { - writeTaskDataToStore(i, store, "FAILED") - } else { - writeTaskDataToStore(i, store, "SUCCESS") + try { + val store = appStore.store + + // Success and failed tasks metrics + for (i <- 0 to 5) { + if (i % 2 == 0) { + writeTaskDataToStore(i, store, "FAILED") + } else { + writeTaskDataToStore(i, store, "SUCCESS") + } } - } - // Running tasks metrics (-1 = no metrics reported, positive = metrics have been reported) - Seq(-1, 6).foreach { metric => - writeTaskDataToStore(metric, store, "RUNNING") - } + // Running tasks metrics (-1 = no metrics reported, positive = metrics have been reported) + Seq(-1, 6).foreach { metric => + writeTaskDataToStore(metric, store, "RUNNING") + } - /** - * Following are the tasks metrics, - * 1, 3, 5 => Success - * 0, 2, 4 => Failed - * -1, 6 => Running - * - * Task summary will consider (1, 3, 5) only - */ - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), getTaskMetrics(5)) - - def assertQuantiles(metricGetter: TaskMetrics => Double, - actualQuantiles: Seq[Double]): Unit = { - val values = successfulTasks.map(metricGetter) - val expectedQuantiles = new Distribution(values, 0, values.length) - .getQuantiles(uiQuantiles.sorted) - - assert(actualQuantiles === expectedQuantiles) - } + /** + * Following are the tasks metrics, + * 1, 3, 5 => Success + * 0, 2, 4 => Failed + * -1, 6 => Running + * + * Task summary will consider (1, 3, 5) only + */ + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get + val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), getTaskMetrics(5)) + + def assertQuantiles(metricGetter: TaskMetrics => Double, + actualQuantiles: Seq[Double]): Unit = { + val values = successfulTasks.map(metricGetter) + val expectedQuantiles = new Distribution(values, 0, values.length) + .getQuantiles(uiQuantiles.sorted) + + assert(actualQuantiles === expectedQuantiles) + } - assertQuantiles(_.executorDeserializeTime, summary.executorDeserializeTime) - assertQuantiles(_.executorDeserializeCpuTime, summary.executorDeserializeCpuTime) - assertQuantiles(_.executorRunTime, summary.executorRunTime) - assertQuantiles(_.executorRunTime, summary.executorRunTime) - assertQuantiles(_.executorCpuTime, summary.executorCpuTime) - assertQuantiles(_.resultSize, summary.resultSize) - assertQuantiles(_.jvmGCTime, summary.jvmGcTime) - assertQuantiles(_.resultSerializationTime, summary.resultSerializationTime) - assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled) - assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled) - assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory) - assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead) - assertQuantiles(_.inputMetrics.recordsRead, summary.inputMetrics.recordsRead) - assertQuantiles(_.outputMetrics.bytesWritten, summary.outputMetrics.bytesWritten) - assertQuantiles(_.outputMetrics.recordsWritten, summary.outputMetrics.recordsWritten) - assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched, - summary.shuffleReadMetrics.remoteBlocksFetched) - assertQuantiles(_.shuffleReadMetrics.localBlocksFetched, - summary.shuffleReadMetrics.localBlocksFetched) - assertQuantiles(_.shuffleReadMetrics.fetchWaitTime, summary.shuffleReadMetrics.fetchWaitTime) - assertQuantiles(_.shuffleReadMetrics.remoteBytesRead, - summary.shuffleReadMetrics.remoteBytesRead) - assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk, - summary.shuffleReadMetrics.remoteBytesReadToDisk) - assertQuantiles( - t => t.shuffleReadMetrics.localBytesRead + t.shuffleReadMetrics.remoteBytesRead, - summary.shuffleReadMetrics.readBytes) - assertQuantiles( - t => t.shuffleReadMetrics.localBlocksFetched + t.shuffleReadMetrics.remoteBlocksFetched, - summary.shuffleReadMetrics.totalBlocksFetched) - assertQuantiles(_.shuffleWriteMetrics.bytesWritten, summary.shuffleWriteMetrics.writeBytes) - assertQuantiles(_.shuffleWriteMetrics.writeTime, summary.shuffleWriteMetrics.writeTime) - assertQuantiles(_.shuffleWriteMetrics.recordsWritten, - summary.shuffleWriteMetrics.writeRecords) - - appStore.close() + assertQuantiles(_.executorDeserializeTime, summary.executorDeserializeTime) + assertQuantiles(_.executorDeserializeCpuTime, summary.executorDeserializeCpuTime) + assertQuantiles(_.executorRunTime, summary.executorRunTime) + assertQuantiles(_.executorRunTime, summary.executorRunTime) + assertQuantiles(_.executorCpuTime, summary.executorCpuTime) + assertQuantiles(_.resultSize, summary.resultSize) + assertQuantiles(_.jvmGCTime, summary.jvmGcTime) + assertQuantiles(_.resultSerializationTime, summary.resultSerializationTime) + assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled) + assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled) + assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory) + assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead) + assertQuantiles(_.inputMetrics.recordsRead, summary.inputMetrics.recordsRead) + assertQuantiles(_.outputMetrics.bytesWritten, summary.outputMetrics.bytesWritten) + assertQuantiles(_.outputMetrics.recordsWritten, summary.outputMetrics.recordsWritten) + assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched, + summary.shuffleReadMetrics.remoteBlocksFetched) + assertQuantiles(_.shuffleReadMetrics.localBlocksFetched, + summary.shuffleReadMetrics.localBlocksFetched) + assertQuantiles(_.shuffleReadMetrics.fetchWaitTime, + summary.shuffleReadMetrics.fetchWaitTime) + assertQuantiles(_.shuffleReadMetrics.remoteBytesRead, + summary.shuffleReadMetrics.remoteBytesRead) + assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk, + summary.shuffleReadMetrics.remoteBytesReadToDisk) + assertQuantiles( + t => t.shuffleReadMetrics.localBytesRead + t.shuffleReadMetrics.remoteBytesRead, + summary.shuffleReadMetrics.readBytes) + assertQuantiles( + t => t.shuffleReadMetrics.localBlocksFetched + t.shuffleReadMetrics.remoteBlocksFetched, + summary.shuffleReadMetrics.totalBlocksFetched) + assertQuantiles(_.shuffleWriteMetrics.bytesWritten, summary.shuffleWriteMetrics.writeBytes) + assertQuantiles(_.shuffleWriteMetrics.writeTime, summary.shuffleWriteMetrics.writeTime) + assertQuantiles(_.shuffleWriteMetrics.recordsWritten, + summary.shuffleWriteMetrics.writeRecords) + } finally { + appStore.close() + } } } @@ -230,22 +233,26 @@ class AppStatusStoreSuite extends SparkFunSuite { val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L) val statusStore = AppStatusStore.createLiveStore(conf) - val listener = statusStore.listener.get - - // Simulate a stage in job progress listener - val stageInfo = new StageInfo(stageId = 0, attemptId = 0, name = "dummy", numTasks = 1, - rddInfos = Seq.empty, parentIds = Seq.empty, details = "details", - resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) - (1 to 2).foreach { - taskId => - val taskInfo = new TaskInfo( - taskId, taskId, 0, taskId, 0, "0", "localhost", TaskLocality.ANY, - false) - listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) - listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) - } + try { + val listener = statusStore.listener.get + + // Simulate a stage in job progress listener + val stageInfo = new StageInfo(stageId = 0, attemptId = 0, name = "dummy", numTasks = 1, + rddInfos = Seq.empty, parentIds = Seq.empty, details = "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + (1 to 2).foreach { + taskId => + val taskInfo = new TaskInfo( + taskId, taskId, 0, taskId, 0, "0", "localhost", TaskLocality.ANY, + false) + listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) + listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) + } - assert(statusStore.speculationSummary(0, 0).isEmpty) + assert(statusStore.speculationSummary(0, 0).isEmpty) + } finally { + statusStore.close() + } } private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org