This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fb5cf5f90c6 [SPARK-41694][CORE] Isolate RocksDB path for Live UI and 
automatically cleanup when `SparkContext.stop()`
fb5cf5f90c6 is described below

commit fb5cf5f90c6fe0860c811c0f7e06b9d8255d1772
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Wed Jan 4 20:32:43 2023 -0800

    [SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically 
cleanup when `SparkContext.stop()`
    
    ### What changes were proposed in this pull request?
    This pr brings two fixes:
    
    - Add sub-dir with `spark-ui` prefix under `spark.ui.store.path` for each 
Spark App to ensure that multiple Spark Apps can run normally use the same 
Spark Client with same `spark.ui.store.path` configuration
    
    - Automatically cleanup Live UI data when `SparkContext.stop()`
    
    ### Why are the changes needed?
    There are 2 issue before this pr:
    
    1.  Multiple Spark Apps can't run normally use the same Spark Client with 
same `spark.ui.store.path` configuration, the following exceptions will occur:
    
    ```
    org.rocksdb.RocksDBException: While lock file: 
/${baseDir}/listing.rdb/LOCK: Resource temporarily unavailable
    ```
    
    At the same time, only one Spark App can run normally use RocksDB as the 
Live UI store.
    
    After this pr, each Spark App uses an independent RocksDB directory when  
`spark.ui.store.path` is specified as Live UI store.
    
    2.  `spark.ui.store.path` directory not clean up when `SparkContext.stop()`:
       - The disk space occupied by the `spark.ui.store.path` directory will 
continue to grow.
       - When submitting new App and reusing the `spark.ui.store.path` 
directory, we will see the content related to the previous App, which is a bit 
strange
    
    After this pr, `spark.ui.store.path` directory is is automatically cleaned 
by default when `SparkContext` stop.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new UTs
    
    Closes #39226 from LuciferYang/SPARK-41694.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Co-authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../org/apache/spark/status/AppStatusStore.scala   | 29 +++++++++--
 .../spark/status/AutoCleanupLiveUIDirSuite.scala   | 56 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 70fcbfd2d51..6db2fa57833 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.status
 
 import java.io.File
+import java.io.IOException
 import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
@@ -36,7 +38,8 @@ import org.apache.spark.util.kvstore.KVStore
  */
 private[spark] class AppStatusStore(
     val store: KVStore,
-    val listener: Option[AppStatusListener] = None) {
+    val listener: Option[AppStatusListener] = None,
+    val storePath: Option[File] = None) {
 
   def applicationInfo(): v1.ApplicationInfo = {
     try {
@@ -733,6 +736,11 @@ private[spark] class AppStatusStore(
 
   def close(): Unit = {
     store.close()
+    cleanUpStorePath()
+  }
+
+  private def cleanUpStorePath(): Unit = {
+    storePath.foreach(Utils.deleteRecursively)
   }
 
   def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): 
Seq[v1.TaskData] = {
@@ -761,7 +769,7 @@ private[spark] class AppStatusStore(
   }
 }
 
-private[spark] object AppStatusStore {
+private[spark] object AppStatusStore extends Logging {
 
   val CURRENT_VERSION = 2L
 
@@ -771,10 +779,23 @@ private[spark] object AppStatusStore {
   def createLiveStore(
       conf: SparkConf,
       appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
-    val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
+
+    def createStorePath(rootDir: String): Option[File] = {
+      try {
+        val localDir = Utils.createDirectory(rootDir, "spark-ui")
+        logInfo(s"Created spark ui store directory at $rootDir")
+        Some(localDir)
+      } catch {
+        case e: IOException =>
+          logError(s"Failed to create spark ui store path in $rootDir.", e)
+          None
+      }
+    }
+
+    val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).flatMap(createStorePath)
     val kvStore = KVUtils.createKVStore(storePath, live = true, conf)
     val store = new ElementTrackingStore(kvStore, conf)
     val listener = new AppStatusListener(store, conf, true, appStatusSource)
-    new AppStatusStore(store, listener = Some(listener))
+    new AppStatusStore(store, listener = Some(listener), storePath)
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala
new file mode 100644
index 00000000000..f717299a1ed
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class AutoCleanupLiveUIDirSuite extends SparkFunSuite {
+
+  test("SPARK-41694: Auto cleanup Spark UI store path") {
+    val baseUIDir = Utils.createTempDir()
+    try {
+      val conf = new 
SparkConf().setAppName("ui-dir-cleanup").setMaster("local")
+        .set(LIVE_UI_LOCAL_STORE_DIR, baseUIDir.getCanonicalPath)
+      val sc = new SparkContext(conf)
+      sc.parallelize(0 until 100, 10)
+        .map { x => (x % 10) -> x }
+        .reduceByKey {
+          _ + _
+        }
+        .collect()
+      // `baseUIDir` should exists and not emtpy before SparkContext stop.
+      assert(baseUIDir.exists())
+      val subDirs = baseUIDir.listFiles()
+      assert(subDirs.nonEmpty)
+      val uiDirs = subDirs.filter(_.getName.startsWith("spark-ui"))
+      assert(uiDirs.length == 1)
+      assert(uiDirs.head.listFiles().nonEmpty)
+      sc.stop()
+      // base dir should exists
+      assert(baseUIDir.exists())
+      assert(!uiDirs.head.exists())
+      assert(baseUIDir.listFiles().isEmpty)
+    } finally {
+      JavaUtils.deleteRecursively(baseUIDir)
+      assert(!baseUIDir.exists())
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to