This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8b0aa59 [SPARK-26288][CORE] add initRegisteredExecutorsDB 8b0aa59 is described below commit 8b0aa59218c209d39cbba5959302d8668b885cf6 Author: weixiuli <weixi...@jd.com> AuthorDate: Tue Mar 19 16:16:43 2019 -0500 [SPARK-26288][CORE] add initRegisteredExecutorsDB ## What changes were proposed in this pull request? As we all know that spark on Yarn uses DB https://github.com/apache/spark/pull/7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted . The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted. To solve the problem above, a method is proposed and is committed . ## How was this patch tested? new unit tests Closes #23393 from weixiuli/SPARK-26288. Authored-by: weixiuli <weixi...@jd.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> --- .../shuffle/ExternalShuffleBlockHandler.java | 5 + core/pom.xml | 7 ++ .../spark/deploy/ExternalShuffleService.scala | 25 +++- .../org/apache/spark/deploy/worker/Worker.scala | 9 ++ .../org/apache/spark/internal/config/package.scala | 7 ++ .../deploy/ExternalShuffleServiceDbSuite.scala | 140 +++++++++++++++++++++ .../apache/spark/deploy/worker/WorkerSuite.scala | 51 +++++++- docs/spark-standalone.md | 11 ++ 8 files changed, 253 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index b25e48a..70dcc8b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -66,6 +66,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler { new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); } + @VisibleForTesting + public ExternalShuffleBlockResolver getBlockResolver() { + return blockManager; + } + /** Enables mocking out the StreamManager and BlockManager. */ @VisibleForTesting public ExternalShuffleBlockHandler( diff --git a/core/pom.xml b/core/pom.xml index b9f78b2..45bda44 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -372,6 +372,13 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-shuffle_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> <!-- This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 12ed189..28279fc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy +import java.io.File import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ @@ -49,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED) private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT) + private val registeredExecutorsDB = "registeredExecutors.ldb" + private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0) private val blockHandler = newShuffleBlockHandler(transportConf) @@ -58,9 +61,29 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val shuffleServiceSource = new ExternalShuffleServiceSource + protected def findRegisteredExecutorsDBFile(dbName: String): File = { + val localDirs = sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array()) + if (localDirs.length >= 1) { + new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName) + } else { + logWarning(s"'spark.local.dir' should be set first when we use db in " + + s"ExternalShuffleService. Note that this only affects standalone mode.") + null + } + } + + /** Get blockhandler */ + def getBlockHandler: ExternalShuffleBlockHandler = { + blockHandler + } + /** Create a new shuffle block handler. Factored out for subclasses to override. */ protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = { - new ExternalShuffleBlockHandler(conf, null) + if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { + new ExternalShuffleBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB)) + } else { + new ExternalShuffleBlockHandler(conf, null) + } } /** Starts the external shuffle service if the user has configured us to. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 52892c3..a0664b3f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -466,6 +466,15 @@ private[deploy] class Worker( }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) + + // Remove some registeredExecutors information of DB in external shuffle service when + // #spark.shuffle.service.db.enabled=true, the one which comes to mind is, what happens + // if an application is stopped while the external shuffle service is down? + // So then it'll leave an entry in the DB and the entry should be removed. + if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && + conf.get(config.SHUFFLE_SERVICE_ENABLED)) { + shuffleService.applicationRemoved(dir.getName) + } } }(cleanupThreadExecutor) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2d664bb..758f605 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -359,6 +359,13 @@ package object config { private[spark] val SHUFFLE_SERVICE_ENABLED = ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) + private[spark] val SHUFFLE_SERVICE_DB_ENABLED = + ConfigBuilder("spark.shuffle.service.db.enabled") + .doc("Whether to use db in ExternalShuffleService. Note that this only affects " + + "standalone mode.") + .booleanConf + .createWithDefault(true) + private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337) diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala new file mode 100644 index 0000000..e33c3f8 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io._ +import java.nio.charset.StandardCharsets + +import com.google.common.io.CharStreams + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} +import org.apache.spark.network.shuffle.TestShuffleDataContext +import org.apache.spark.util.Utils + +/** + * This suite gets BlockData when the ExternalShuffleService is restarted + * with #spark.shuffle.service.db.enabled = true or false + * Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false + */ +class ExternalShuffleServiceDbSuite extends SparkFunSuite { + val sortBlock0 = "Hello!" + val sortBlock1 = "World!" + val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" + + var sparkConf: SparkConf = _ + var dataContext: TestShuffleDataContext = _ + + var securityManager: SecurityManager = _ + var externalShuffleService: ExternalShuffleService = _ + var blockHandler: ExternalShuffleBlockHandler = _ + var blockResolver: ExternalShuffleBlockResolver = _ + + override def beforeAll() { + super.beforeAll() + sparkConf = new SparkConf() + sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir")) + Utils.loadDefaultSparkProperties(sparkConf, null) + securityManager = new SecurityManager(sparkConf) + + dataContext = new TestShuffleDataContext(2, 5) + dataContext.create() + // Write some sort data. + dataContext.insertSortShuffleData(0, 0, + Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8), + sortBlock1.getBytes(StandardCharsets.UTF_8))) + registerExecutor() + } + + override def afterAll() { + try { + dataContext.cleanup() + } finally { + super.afterAll() + } + } + + def registerExecutor(): Unit = { + try { + sparkConf.set("spark.shuffle.service.db.enabled", "true") + externalShuffleService = new ExternalShuffleService(sparkConf, securityManager) + + // external Shuffle Service start + externalShuffleService.start() + blockHandler = externalShuffleService.getBlockHandler + blockResolver = blockHandler.getBlockResolver + blockResolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)) + } finally { + blockHandler.close() + // external Shuffle Service stop + externalShuffleService.stop() + } + } + + // The beforeAll ensures the shuffle data was already written, and then + // the shuffle service was stopped. Here we restart the shuffle service + // and make we can read the shuffle data + test("Recover shuffle data with spark.shuffle.service.db.enabled=true after " + + "shuffle service restart") { + try { + sparkConf.set("spark.shuffle.service.db.enabled", "true") + externalShuffleService = new ExternalShuffleService(sparkConf, securityManager) + // externalShuffleService restart + externalShuffleService.start() + blockHandler = externalShuffleService.getBlockHandler + blockResolver = blockHandler.getBlockResolver + + val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream + val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8)) + block0Stream.close() + assert(sortBlock0 == block0) + // pass + } finally { + blockHandler.close() + // externalShuffleService stop + externalShuffleService.stop() + } + + } + + // The beforeAll ensures the shuffle data was already written, and then + // the shuffle service was stopped. Here we restart the shuffle service , + // but we can't read the shuffle data + test("Can't recover shuffle data with spark.shuffle.service.db.enabled=false after" + + " shuffle service restart") { + try { + sparkConf.set("spark.shuffle.service.db.enabled", "false") + externalShuffleService = new ExternalShuffleService(sparkConf, securityManager) + // externalShuffleService restart + externalShuffleService.start() + blockHandler = externalShuffleService.getBlockHandler + blockResolver = blockHandler.getBlockResolver + + val error = intercept[RuntimeException] { + blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream + }.getMessage + + assert(error.contains("not registered")) + } finally { + blockHandler.close() + // externalShuffleService stop + externalShuffleService.stop() + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index f6559df..168694c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -17,9 +17,12 @@ package org.apache.spark.deploy.worker +import java.io.{File, IOException} import java.util.concurrent.atomic.AtomicBoolean import java.util.function.Supplier +import scala.concurrent.duration._ + import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Answers.RETURNS_SMART_NULLS import org.mockito.ArgumentMatchers.any @@ -27,14 +30,16 @@ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.{BeforeAndAfter, Matchers} +import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService} -import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged} +import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup} import org.apache.spark.deploy.master.DriverState import org.apache.spark.internal.config import org.apache.spark.internal.config.Worker._ import org.apache.spark.rpc.{RpcAddress, RpcEnv} +import org.apache.spark.util.Utils class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { @@ -245,4 +250,48 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None)) assert(cleanupCalled.get() == value) } + + test("WorkDirCleanup cleans app dirs and shuffle metadata when " + + "spark.shuffle.service.db.enabled=true") { + testWorkDirCleanupAndRemoveMetadataWithConfig(true) + } + + test("WorkdDirCleanup cleans only app dirs when" + + "spark.shuffle.service.db.enabled=false") { + testWorkDirCleanupAndRemoveMetadataWithConfig(false) + } + + private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = { + val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString) + conf.set("spark.worker.cleanup.appDataTtl", "60") + conf.set("spark.shuffle.service.enabled", "true") + + val appId = "app1" + val execId = "exec1" + val cleanupCalled = new AtomicBoolean(false) + when(shuffleService.applicationRemoved(any[String])).thenAnswer(new Answer[Unit] { + override def answer(invocations: InvocationOnMock): Unit = { + cleanupCalled.set(true) + } + }) + val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] { + override def get: ExternalShuffleService = shuffleService + } + val worker = makeWorker(conf, externalShuffleServiceSupplier) + val workDir = Utils.createTempDir(namePrefix = "work") + // initialize workers + worker.workDir = workDir + // Create the executor's working directory + val executorDir = new File(worker.workDir, appId + "/" + execId) + + if (!executorDir.exists && !executorDir.mkdirs()) { + throw new IOException("Failed to create directory " + executorDir) + } + executorDir.setLastModified(System.currentTimeMillis - (1000 * 120)) + worker.receive(WorkDirCleanup) + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + assert(!executorDir.exists() == true) + assert(cleanupCalled.get() == dbCleanupEnabled) + } + } } diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 60b84d3..3400da4 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -240,6 +240,7 @@ SPARK_WORKER_OPTS supports the following system properties: <td> Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Only the directories of stopped applications are cleaned up. + This should be enabled if spark.shuffle.service.db.enabled is "true" </td> </tr> <tr> @@ -261,6 +262,16 @@ SPARK_WORKER_OPTS supports the following system properties: </td> </tr> <tr> + <td><spark.shuffle.service.db.enabled</code></td> + <td>true</td> + <td> + Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will + automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior + enabled). You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state + eventually gets cleaned up. This config may be removed in the future. + </td> +</tr> +<tr> <td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td> <td>true</td> <td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org