This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fb5cf5f90c6 [SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically cleanup when `SparkContext.stop()` fb5cf5f90c6 is described below commit fb5cf5f90c6fe0860c811c0f7e06b9d8255d1772 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Jan 4 20:32:43 2023 -0800 [SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically cleanup when `SparkContext.stop()` ### What changes were proposed in this pull request? This pr brings two fixes: - Add sub-dir with `spark-ui` prefix under `spark.ui.store.path` for each Spark App to ensure that multiple Spark Apps can run normally use the same Spark Client with same `spark.ui.store.path` configuration - Automatically cleanup Live UI data when `SparkContext.stop()` ### Why are the changes needed? There are 2 issue before this pr: 1. Multiple Spark Apps can't run normally use the same Spark Client with same `spark.ui.store.path` configuration, the following exceptions will occur: ``` org.rocksdb.RocksDBException: While lock file: /${baseDir}/listing.rdb/LOCK: Resource temporarily unavailable ``` At the same time, only one Spark App can run normally use RocksDB as the Live UI store. After this pr, each Spark App uses an independent RocksDB directory when `spark.ui.store.path` is specified as Live UI store. 2. `spark.ui.store.path` directory not clean up when `SparkContext.stop()`: - The disk space occupied by the `spark.ui.store.path` directory will continue to grow. - When submitting new App and reusing the `spark.ui.store.path` directory, we will see the content related to the previous App, which is a bit strange After this pr, `spark.ui.store.path` directory is is automatically cleaned by default when `SparkContext` stop. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new UTs Closes #39226 from LuciferYang/SPARK-41694. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Co-authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../org/apache/spark/status/AppStatusStore.scala | 29 +++++++++-- .../spark/status/AutoCleanupLiveUIDirSuite.scala | 56 ++++++++++++++++++++++ 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 70fcbfd2d51..6db2fa57833 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -18,12 +18,14 @@ package org.apache.spark.status import java.io.File +import java.io.IOException import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID @@ -36,7 +38,8 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] class AppStatusStore( val store: KVStore, - val listener: Option[AppStatusListener] = None) { + val listener: Option[AppStatusListener] = None, + val storePath: Option[File] = None) { def applicationInfo(): v1.ApplicationInfo = { try { @@ -733,6 +736,11 @@ private[spark] class AppStatusStore( def close(): Unit = { store.close() + cleanUpStorePath() + } + + private def cleanUpStorePath(): Unit = { + storePath.foreach(Utils.deleteRecursively) } def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): Seq[v1.TaskData] = { @@ -761,7 +769,7 @@ private[spark] class AppStatusStore( } } -private[spark] object AppStatusStore { +private[spark] object AppStatusStore extends Logging { val CURRENT_VERSION = 2L @@ -771,10 +779,23 @@ private[spark] object AppStatusStore { def createLiveStore( conf: SparkConf, appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { - val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_)) + + def createStorePath(rootDir: String): Option[File] = { + try { + val localDir = Utils.createDirectory(rootDir, "spark-ui") + logInfo(s"Created spark ui store directory at $rootDir") + Some(localDir) + } catch { + case e: IOException => + logError(s"Failed to create spark ui store path in $rootDir.", e) + None + } + } + + val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).flatMap(createStorePath) val kvStore = KVUtils.createKVStore(storePath, live = true, conf) val store = new ElementTrackingStore(kvStore, conf) val listener = new AppStatusListener(store, conf, true, appStatusSource) - new AppStatusStore(store, listener = Some(listener)) + new AppStatusStore(store, listener = Some(listener), storePath) } } diff --git a/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala b/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala new file mode 100644 index 00000000000..f717299a1ed --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class AutoCleanupLiveUIDirSuite extends SparkFunSuite { + + test("SPARK-41694: Auto cleanup Spark UI store path") { + val baseUIDir = Utils.createTempDir() + try { + val conf = new SparkConf().setAppName("ui-dir-cleanup").setMaster("local") + .set(LIVE_UI_LOCAL_STORE_DIR, baseUIDir.getCanonicalPath) + val sc = new SparkContext(conf) + sc.parallelize(0 until 100, 10) + .map { x => (x % 10) -> x } + .reduceByKey { + _ + _ + } + .collect() + // `baseUIDir` should exists and not emtpy before SparkContext stop. + assert(baseUIDir.exists()) + val subDirs = baseUIDir.listFiles() + assert(subDirs.nonEmpty) + val uiDirs = subDirs.filter(_.getName.startsWith("spark-ui")) + assert(uiDirs.length == 1) + assert(uiDirs.head.listFiles().nonEmpty) + sc.stop() + // base dir should exists + assert(baseUIDir.exists()) + assert(!uiDirs.head.exists()) + assert(baseUIDir.listFiles().isEmpty) + } finally { + JavaUtils.deleteRecursively(baseUIDir) + assert(!baseUIDir.exists()) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org