This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d1d17e57a [SPARK-42391][CORE][TESTS] Close live `AppStatusStore` in 
the finally block for test cases
89d1d17e57a is described below

commit 89d1d17e57abc4c825cb9259f7a319b80e0d854a
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sun Feb 12 20:14:52 2023 -0800

    [SPARK-42391][CORE][TESTS] Close live `AppStatusStore` in the finally block 
for test cases
    
    ### What changes were proposed in this pull request?
    `AppStatusStore.createLiveStore` will return `RocksDB` backend 
`AppStatusStore` when `LIVE_UI_LOCAL_STORE_DIR` or `LIVE_UI_LOCAL_STORE_DIR`  
is configured, it should be closed in finally block to release resources for 
test cases.
    
    There are 4 test suites use `AppStatusStore.createLiveStore` function:
    
    - `AppStatusStoreSuite`: one case not close `AppStatusStore`
    - `StagePageSuite`: already call close in `finally` block
    - `AllExecutionsPageSuite`: already call close in `after`
    - `SQLAppStatusListenerSuite`: already call close in `after`
    
    and only `AppStatusStoreSuite` has `AppStatusStore` without manual closing, 
so this pr has made the following changes:
    
    - For `SPARK-36038: speculation summary should not be present if there are 
no speculative tasks` in `AppStatusStoreSuite`, add close `AppStatusStore` in 
finally block
    - For `SPARK-26260: summary should contain only successful tasks' metrics 
(store = $hint)`, move the existing `AppStatusStore.close` to the finally block
    
    ### Why are the changes needed?
    Call `AppStatusStore.close` in the finally block to release possible 
RocksDB resources.
    
    ### Does this PR introduce _any_ user-facing change?
    No, just for test
    
    ### How was this patch tested?
    - Pass GitHub Actions
    - Manual test
    
    ```
    export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui
    build/sbt clean "core/testOnly org.apache.spark.status.AppStatusStoreSuite"
    ```
    All tests passed
    
    Closes #39961 from LuciferYang/SPARK-42391.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/status/AppStatusStoreSuite.scala  | 173 +++++++++++----------
 1 file changed, 90 insertions(+), 83 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index d38b0857e57..ccf6c9184cc 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -133,78 +133,81 @@ class AppStatusStoreSuite extends SparkFunSuite {
   cases.foreach { case (hint, appStore) =>
     test(s"SPARK-26260: summary should contain only successful tasks' metrics 
(store = $hint)") {
       assume(appStore != null)
-      val store = appStore.store
-
-      // Success and failed tasks metrics
-      for (i <- 0 to 5) {
-        if (i % 2 == 0) {
-          writeTaskDataToStore(i, store, "FAILED")
-        } else {
-          writeTaskDataToStore(i, store, "SUCCESS")
+      try {
+        val store = appStore.store
+
+        // Success and failed tasks metrics
+        for (i <- 0 to 5) {
+          if (i % 2 == 0) {
+            writeTaskDataToStore(i, store, "FAILED")
+          } else {
+            writeTaskDataToStore(i, store, "SUCCESS")
+          }
         }
-      }
 
-      // Running tasks metrics (-1 = no metrics reported, positive = metrics 
have been reported)
-      Seq(-1, 6).foreach { metric =>
-        writeTaskDataToStore(metric, store, "RUNNING")
-      }
+        // Running tasks metrics (-1 = no metrics reported, positive = metrics 
have been reported)
+        Seq(-1, 6).foreach { metric =>
+          writeTaskDataToStore(metric, store, "RUNNING")
+        }
 
-      /**
-       * Following are the tasks metrics,
-       * 1, 3, 5 => Success
-       * 0, 2, 4 => Failed
-       * -1, 6 => Running
-       *
-       * Task summary will consider (1, 3, 5) only
-       */
-      val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
-      val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), 
getTaskMetrics(5))
-
-      def assertQuantiles(metricGetter: TaskMetrics => Double,
-        actualQuantiles: Seq[Double]): Unit = {
-        val values = successfulTasks.map(metricGetter)
-        val expectedQuantiles = new Distribution(values, 0, values.length)
-          .getQuantiles(uiQuantiles.sorted)
-
-        assert(actualQuantiles === expectedQuantiles)
-      }
+        /**
+         * Following are the tasks metrics,
+         * 1, 3, 5 => Success
+         * 0, 2, 4 => Failed
+         * -1, 6 => Running
+         *
+         * Task summary will consider (1, 3, 5) only
+         */
+        val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
+        val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), 
getTaskMetrics(5))
+
+        def assertQuantiles(metricGetter: TaskMetrics => Double,
+          actualQuantiles: Seq[Double]): Unit = {
+          val values = successfulTasks.map(metricGetter)
+          val expectedQuantiles = new Distribution(values, 0, values.length)
+            .getQuantiles(uiQuantiles.sorted)
+
+          assert(actualQuantiles === expectedQuantiles)
+        }
 
-      assertQuantiles(_.executorDeserializeTime, 
summary.executorDeserializeTime)
-      assertQuantiles(_.executorDeserializeCpuTime, 
summary.executorDeserializeCpuTime)
-      assertQuantiles(_.executorRunTime, summary.executorRunTime)
-      assertQuantiles(_.executorRunTime, summary.executorRunTime)
-      assertQuantiles(_.executorCpuTime, summary.executorCpuTime)
-      assertQuantiles(_.resultSize, summary.resultSize)
-      assertQuantiles(_.jvmGCTime, summary.jvmGcTime)
-      assertQuantiles(_.resultSerializationTime, 
summary.resultSerializationTime)
-      assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled)
-      assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled)
-      assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory)
-      assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead)
-      assertQuantiles(_.inputMetrics.recordsRead, 
summary.inputMetrics.recordsRead)
-      assertQuantiles(_.outputMetrics.bytesWritten, 
summary.outputMetrics.bytesWritten)
-      assertQuantiles(_.outputMetrics.recordsWritten, 
summary.outputMetrics.recordsWritten)
-      assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched,
-        summary.shuffleReadMetrics.remoteBlocksFetched)
-      assertQuantiles(_.shuffleReadMetrics.localBlocksFetched,
-        summary.shuffleReadMetrics.localBlocksFetched)
-      assertQuantiles(_.shuffleReadMetrics.fetchWaitTime, 
summary.shuffleReadMetrics.fetchWaitTime)
-      assertQuantiles(_.shuffleReadMetrics.remoteBytesRead,
-        summary.shuffleReadMetrics.remoteBytesRead)
-      assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk,
-        summary.shuffleReadMetrics.remoteBytesReadToDisk)
-      assertQuantiles(
-        t => t.shuffleReadMetrics.localBytesRead + 
t.shuffleReadMetrics.remoteBytesRead,
-        summary.shuffleReadMetrics.readBytes)
-      assertQuantiles(
-        t => t.shuffleReadMetrics.localBlocksFetched + 
t.shuffleReadMetrics.remoteBlocksFetched,
-        summary.shuffleReadMetrics.totalBlocksFetched)
-      assertQuantiles(_.shuffleWriteMetrics.bytesWritten, 
summary.shuffleWriteMetrics.writeBytes)
-      assertQuantiles(_.shuffleWriteMetrics.writeTime, 
summary.shuffleWriteMetrics.writeTime)
-      assertQuantiles(_.shuffleWriteMetrics.recordsWritten,
-        summary.shuffleWriteMetrics.writeRecords)
-
-      appStore.close()
+        assertQuantiles(_.executorDeserializeTime, 
summary.executorDeserializeTime)
+        assertQuantiles(_.executorDeserializeCpuTime, 
summary.executorDeserializeCpuTime)
+        assertQuantiles(_.executorRunTime, summary.executorRunTime)
+        assertQuantiles(_.executorRunTime, summary.executorRunTime)
+        assertQuantiles(_.executorCpuTime, summary.executorCpuTime)
+        assertQuantiles(_.resultSize, summary.resultSize)
+        assertQuantiles(_.jvmGCTime, summary.jvmGcTime)
+        assertQuantiles(_.resultSerializationTime, 
summary.resultSerializationTime)
+        assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled)
+        assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled)
+        assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory)
+        assertQuantiles(_.inputMetrics.bytesRead, 
summary.inputMetrics.bytesRead)
+        assertQuantiles(_.inputMetrics.recordsRead, 
summary.inputMetrics.recordsRead)
+        assertQuantiles(_.outputMetrics.bytesWritten, 
summary.outputMetrics.bytesWritten)
+        assertQuantiles(_.outputMetrics.recordsWritten, 
summary.outputMetrics.recordsWritten)
+        assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched,
+          summary.shuffleReadMetrics.remoteBlocksFetched)
+        assertQuantiles(_.shuffleReadMetrics.localBlocksFetched,
+          summary.shuffleReadMetrics.localBlocksFetched)
+        assertQuantiles(_.shuffleReadMetrics.fetchWaitTime,
+          summary.shuffleReadMetrics.fetchWaitTime)
+        assertQuantiles(_.shuffleReadMetrics.remoteBytesRead,
+          summary.shuffleReadMetrics.remoteBytesRead)
+        assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk,
+          summary.shuffleReadMetrics.remoteBytesReadToDisk)
+        assertQuantiles(
+          t => t.shuffleReadMetrics.localBytesRead + 
t.shuffleReadMetrics.remoteBytesRead,
+          summary.shuffleReadMetrics.readBytes)
+        assertQuantiles(
+          t => t.shuffleReadMetrics.localBlocksFetched + 
t.shuffleReadMetrics.remoteBlocksFetched,
+          summary.shuffleReadMetrics.totalBlocksFetched)
+        assertQuantiles(_.shuffleWriteMetrics.bytesWritten, 
summary.shuffleWriteMetrics.writeBytes)
+        assertQuantiles(_.shuffleWriteMetrics.writeTime, 
summary.shuffleWriteMetrics.writeTime)
+        assertQuantiles(_.shuffleWriteMetrics.recordsWritten,
+          summary.shuffleWriteMetrics.writeRecords)
+      } finally {
+        appStore.close()
+      }
     }
   }
 
@@ -230,22 +233,26 @@ class AppStatusStoreSuite extends SparkFunSuite {
     val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
     val statusStore = AppStatusStore.createLiveStore(conf)
 
-    val listener = statusStore.listener.get
-
-    // Simulate a stage in job progress listener
-    val stageInfo = new StageInfo(stageId = 0, attemptId = 0, name = "dummy", 
numTasks = 1,
-      rddInfos = Seq.empty, parentIds = Seq.empty, details = "details",
-      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
-    (1 to 2).foreach {
-      taskId =>
-        val taskInfo = new TaskInfo(
-          taskId, taskId, 0, taskId, 0, "0", "localhost", TaskLocality.ANY,
-          false)
-        listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
-        listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
-    }
+    try {
+      val listener = statusStore.listener.get
+
+      // Simulate a stage in job progress listener
+      val stageInfo = new StageInfo(stageId = 0, attemptId = 0, name = 
"dummy", numTasks = 1,
+        rddInfos = Seq.empty, parentIds = Seq.empty, details = "details",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      (1 to 2).foreach {
+        taskId =>
+          val taskInfo = new TaskInfo(
+            taskId, taskId, 0, taskId, 0, "0", "localhost", TaskLocality.ANY,
+            false)
+          listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
+          listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
+      }
 
-    assert(statusStore.speculationSummary(0, 0).isEmpty)
+      assert(statusStore.speculationSummary(0, 0).isEmpty)
+    } finally {
+      statusStore.close()
+    }
   }
 
   private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to