This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b0aa59  [SPARK-26288][CORE] add initRegisteredExecutorsDB
8b0aa59 is described below

commit 8b0aa59218c209d39cbba5959302d8668b885cf6
Author: weixiuli <weixi...@jd.com>
AuthorDate: Tue Mar 19 16:16:43 2019 -0500

    [SPARK-26288][CORE] add initRegisteredExecutorsDB
    
    ## What changes were proposed in this pull request?
    
    As we all know that spark on Yarn uses DB 
https://github.com/apache/spark/pull/7943 to record RegisteredExecutors 
information which can be reloaded and used again when the 
ExternalShuffleService is restarted .
    
    The RegisteredExecutors information can't be recorded both in the mode of 
spark's standalone and spark on k8s , which will cause the RegisteredExecutors 
information to be lost ,when the ExternalShuffleService is restarted.
    
    To solve the problem above, a method is proposed and is committed .
    
    ## How was this patch tested?
    new  unit tests
    
    Closes #23393 from weixiuli/SPARK-26288.
    
    Authored-by: weixiuli <weixi...@jd.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../shuffle/ExternalShuffleBlockHandler.java       |   5 +
 core/pom.xml                                       |   7 ++
 .../spark/deploy/ExternalShuffleService.scala      |  25 +++-
 .../org/apache/spark/deploy/worker/Worker.scala    |   9 ++
 .../org/apache/spark/internal/config/package.scala |   7 ++
 .../deploy/ExternalShuffleServiceDbSuite.scala     | 140 +++++++++++++++++++++
 .../apache/spark/deploy/worker/WorkerSuite.scala   |  51 +++++++-
 docs/spark-standalone.md                           |  11 ++
 8 files changed, 253 insertions(+), 2 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index b25e48a..70dcc8b 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -66,6 +66,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
       new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
   }
 
+  @VisibleForTesting
+  public ExternalShuffleBlockResolver getBlockResolver() {
+    return blockManager;
+  }
+
   /** Enables mocking out the StreamManager and BlockManager. */
   @VisibleForTesting
   public ExternalShuffleBlockHandler(
diff --git a/core/pom.xml b/core/pom.xml
index b9f78b2..45bda44 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -372,6 +372,13 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
 
     <!--
       This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 12ed189..28279fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy
 
+import java.io.File
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConverters._
@@ -49,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
   private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
   private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
 
+  private val registeredExecutorsDB = "registeredExecutors.ldb"
+
   private val transportConf =
     SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
   private val blockHandler = newShuffleBlockHandler(transportConf)
@@ -58,9 +61,29 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
 
   private val shuffleServiceSource = new ExternalShuffleServiceSource
 
+  protected def findRegisteredExecutorsDBFile(dbName: String): File = {
+    val localDirs = 
sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array())
+    if (localDirs.length >= 1) {
+      new File(localDirs.find(new File(_, 
dbName).exists()).getOrElse(localDirs(0)), dbName)
+    } else {
+      logWarning(s"'spark.local.dir' should be set first when we use db in " +
+        s"ExternalShuffleService. Note that this only affects standalone 
mode.")
+      null
+    }
+  }
+
+  /** Get blockhandler  */
+  def getBlockHandler: ExternalShuffleBlockHandler = {
+    blockHandler
+  }
+
   /** Create a new shuffle block handler. Factored out for subclasses to 
override. */
   protected def newShuffleBlockHandler(conf: TransportConf): 
ExternalShuffleBlockHandler = {
-    new ExternalShuffleBlockHandler(conf, null)
+    if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
+      new ExternalShuffleBlockHandler(conf, 
findRegisteredExecutorsDBFile(registeredExecutorsDB))
+    } else {
+      new ExternalShuffleBlockHandler(conf, null)
+    }
   }
 
   /** Starts the external shuffle service if the user has configured us to. */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 52892c3..a0664b3f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -466,6 +466,15 @@ private[deploy] class Worker(
           }.foreach { dir =>
             logInfo(s"Removing directory: ${dir.getPath}")
             Utils.deleteRecursively(dir)
+
+            // Remove some registeredExecutors information of DB in external 
shuffle service when
+            // #spark.shuffle.service.db.enabled=true, the one which comes to 
mind is, what happens
+            // if an application is stopped while the external shuffle service 
is down?
+            // So then it'll leave an entry in the DB and the entry should be 
removed.
+            if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
+                conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+              shuffleService.applicationRemoved(dir.getName)
+            }
           }
         }(cleanupThreadExecutor)
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2d664bb..758f605 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -359,6 +359,13 @@ package object config {
   private[spark] val SHUFFLE_SERVICE_ENABLED =
     
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
+    ConfigBuilder("spark.shuffle.service.db.enabled")
+      .doc("Whether to use db in ExternalShuffleService. Note that this only 
affects " +
+        "standalone mode.")
+      .booleanConf
+      .createWithDefault(true)
+
   private[spark] val SHUFFLE_SERVICE_PORT =
     ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
new file mode 100644
index 0000000..e33c3f8
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.CharStreams
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, 
ExternalShuffleBlockResolver}
+import org.apache.spark.network.shuffle.TestShuffleDataContext
+import org.apache.spark.util.Utils
+
+/**
+ * This suite gets BlockData when the ExternalShuffleService is restarted
+ * with #spark.shuffle.service.db.enabled = true or false
+ * Note that failures in this suite may arise 
when#spark.shuffle.service.db.enabled = false
+ */
+class ExternalShuffleServiceDbSuite extends SparkFunSuite {
+  val sortBlock0 = "Hello!"
+  val sortBlock1 = "World!"
+  val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
+
+  var sparkConf: SparkConf = _
+  var dataContext: TestShuffleDataContext = _
+
+  var securityManager: SecurityManager = _
+  var externalShuffleService: ExternalShuffleService = _
+  var blockHandler: ExternalShuffleBlockHandler = _
+  var blockResolver: ExternalShuffleBlockResolver = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    sparkConf = new SparkConf()
+    sparkConf.set("spark.shuffle.service.enabled", "true")
+    sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir"))
+    Utils.loadDefaultSparkProperties(sparkConf, null)
+    securityManager = new SecurityManager(sparkConf)
+
+    dataContext = new TestShuffleDataContext(2, 5)
+    dataContext.create()
+    // Write some sort data.
+    dataContext.insertSortShuffleData(0, 0,
+      Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8),
+        sortBlock1.getBytes(StandardCharsets.UTF_8)))
+    registerExecutor()
+  }
+
+  override def afterAll() {
+    try {
+      dataContext.cleanup()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  def registerExecutor(): Unit = {
+    try {
+      sparkConf.set("spark.shuffle.service.db.enabled", "true")
+      externalShuffleService = new ExternalShuffleService(sparkConf, 
securityManager)
+
+      // external Shuffle Service start
+      externalShuffleService.start()
+      blockHandler = externalShuffleService.getBlockHandler
+      blockResolver = blockHandler.getBlockResolver
+      blockResolver.registerExecutor("app0", "exec0", 
dataContext.createExecutorInfo(SORT_MANAGER))
+    } finally {
+      blockHandler.close()
+      // external Shuffle Service stop
+      externalShuffleService.stop()
+    }
+  }
+
+  // The beforeAll ensures the shuffle data was already written, and then
+  // the shuffle service was stopped. Here we restart the shuffle service
+  // and make we can read the shuffle data
+  test("Recover shuffle data with spark.shuffle.service.db.enabled=true after 
" +
+    "shuffle service restart") {
+    try {
+      sparkConf.set("spark.shuffle.service.db.enabled", "true")
+      externalShuffleService = new ExternalShuffleService(sparkConf, 
securityManager)
+      // externalShuffleService restart
+      externalShuffleService.start()
+      blockHandler = externalShuffleService.getBlockHandler
+      blockResolver = blockHandler.getBlockResolver
+
+      val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 
0).createInputStream
+      val block0 = CharStreams.toString(new InputStreamReader(block0Stream, 
StandardCharsets.UTF_8))
+      block0Stream.close()
+      assert(sortBlock0 == block0)
+      // pass
+    } finally {
+      blockHandler.close()
+      // externalShuffleService stop
+      externalShuffleService.stop()
+    }
+
+  }
+
+  // The beforeAll ensures the shuffle data was already written, and then
+  // the shuffle service was stopped. Here we restart the shuffle service ,
+  // but we can't read the shuffle data
+  test("Can't recover shuffle data with spark.shuffle.service.db.enabled=false 
after" +
+    " shuffle service restart") {
+    try {
+      sparkConf.set("spark.shuffle.service.db.enabled", "false")
+      externalShuffleService = new ExternalShuffleService(sparkConf, 
securityManager)
+      // externalShuffleService restart
+      externalShuffleService.start()
+      blockHandler = externalShuffleService.getBlockHandler
+      blockResolver = blockHandler.getBlockResolver
+
+      val error = intercept[RuntimeException] {
+        blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
+      }.getMessage
+
+      assert(error.contains("not registered"))
+    } finally {
+      blockHandler.close()
+      // externalShuffleService stop
+      externalShuffleService.stop()
+    }
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index f6559df..168694c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.deploy.worker
 
+import java.io.{File, IOException}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.function.Supplier
 
+import scala.concurrent.duration._
+
 import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.Answers.RETURNS_SMART_NULLS
 import org.mockito.ArgumentMatchers.any
@@ -27,14 +30,16 @@ import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
-import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, 
ExecutorStateChanged}
+import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, 
ExecutorStateChanged, WorkDirCleanup}
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Worker._
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.util.Utils
 
 class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
@@ -245,4 +250,48 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
       ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
     assert(cleanupCalled.get() == value)
   }
+
+  test("WorkDirCleanup cleans app dirs and shuffle metadata when " +
+    "spark.shuffle.service.db.enabled=true") {
+    testWorkDirCleanupAndRemoveMetadataWithConfig(true)
+  }
+
+  test("WorkdDirCleanup cleans only app dirs when" +
+    "spark.shuffle.service.db.enabled=false") {
+    testWorkDirCleanupAndRemoveMetadataWithConfig(false)
+  }
+
+  private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: 
Boolean) = {
+    val conf = new SparkConf().set("spark.shuffle.service.db.enabled", 
dbCleanupEnabled.toString)
+    conf.set("spark.worker.cleanup.appDataTtl", "60")
+    conf.set("spark.shuffle.service.enabled", "true")
+
+    val appId = "app1"
+    val execId = "exec1"
+    val cleanupCalled = new AtomicBoolean(false)
+    when(shuffleService.applicationRemoved(any[String])).thenAnswer(new 
Answer[Unit] {
+      override def answer(invocations: InvocationOnMock): Unit = {
+        cleanupCalled.set(true)
+      }
+    })
+    val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
+      override def get: ExternalShuffleService = shuffleService
+    }
+    val worker = makeWorker(conf, externalShuffleServiceSupplier)
+    val workDir = Utils.createTempDir(namePrefix = "work")
+    // initialize workers
+    worker.workDir = workDir
+    // Create the executor's working directory
+    val executorDir = new File(worker.workDir, appId + "/" + execId)
+
+    if (!executorDir.exists && !executorDir.mkdirs()) {
+      throw new IOException("Failed to create directory " + executorDir)
+    }
+    executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
+    worker.receive(WorkDirCleanup)
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(!executorDir.exists() == true)
+      assert(cleanupCalled.get() == dbCleanupEnabled)
+    }
+  }
 }
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 60b84d3..3400da4 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -240,6 +240,7 @@ SPARK_WORKER_OPTS supports the following system properties:
   <td>
     Enable periodic cleanup of worker / application directories.  Note that 
this only affects standalone
     mode, as YARN works differently. Only the directories of stopped 
applications are cleaned up.
+    This should be enabled if spark.shuffle.service.db.enabled is "true"
   </td>
 </tr>
 <tr>
@@ -261,6 +262,16 @@ SPARK_WORKER_OPTS supports the following system properties:
   </td>
 </tr>
 <tr>
+  <td><spark.shuffle.service.db.enabled</code></td>
+  <td>true</td>
+  <td>
+    Store External Shuffle service state on local disk so that when the 
external shuffle service is restarted, it will
+    automatically reload info on current executors.  This only affects 
standalone mode (yarn always has this behavior
+    enabled).  You should also enable 
<code>spark.worker.cleanup.enabled</code>, to ensure that the state
+    eventually gets cleaned up.  This config may be removed in the future.
+  </td>
+</tr>
+<tr>
   <td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td>
   <td>true</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to