Repository: spark
Updated Branches:
  refs/heads/master 09e78c1ea -> 8ef167a5f


[SPARK-24340][CORE] Clean up non-shuffle disk block manager files following 
executor exits on a Standalone cluster

## What changes were proposed in this pull request?

Currently we only clean up the local directories on application removed. 
However, when executors die and restart repeatedly, many temp files are left 
untouched in the local directories, which is undesired behavior and could cause 
disk space used up gradually.

We can detect executor death in the Worker, and clean up the non-shuffle files 
(files not ended with ".index" or ".data") in the local directories, we should 
not touch the shuffle files since they are expected to be used by the external 
shuffle service.

Scope of this PR is limited to only implement the cleanup logic on a Standalone 
cluster, we defer to experts familiar with other cluster 
managers(YARN/Mesos/K8s) to determine whether it's worth to add similar support.

## How was this patch tested?

Add new test suite to cover.

Author: Xingbo Jiang <xingbo.ji...@databricks.com>

Closes #21390 from jiangxb1987/cleanupNonshuffleFiles.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ef167a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ef167a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ef167a5

Branch: refs/heads/master
Commit: 8ef167a5f9ba8a79bb7ca98a9844fe9cfcfea060
Parents: 09e78c1
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Fri Jun 1 13:46:05 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Fri Jun 1 13:46:05 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/network/util/JavaUtils.java    |  45 ++--
 .../shuffle/ExternalShuffleBlockHandler.java    |   7 +
 .../shuffle/ExternalShuffleBlockResolver.java   |  43 ++++
 .../shuffle/NonShuffleFilesCleanupSuite.java    | 221 +++++++++++++++++++
 .../network/shuffle/TestShuffleDataContext.java |  15 ++
 .../spark/deploy/ExternalShuffleService.scala   |   5 +
 .../org/apache/spark/deploy/worker/Worker.scala |  17 +-
 .../spark/deploy/worker/WorkerSuite.scala       |  55 ++++-
 docs/spark-standalone.md                        |  12 +
 9 files changed, 400 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index afc59ef..b549708 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -17,10 +17,7 @@
 
 package org.apache.spark.network.util;
 
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.charset.StandardCharsets;
@@ -91,11 +88,24 @@ public class JavaUtils {
    * @throws IOException if deletion is unsuccessful
    */
   public static void deleteRecursively(File file) throws IOException {
+    deleteRecursively(file, null);
+  }
+
+  /**
+   * Delete a file or directory and its contents recursively.
+   * Don't follow directories if they are symlinks.
+   *
+   * @param file Input file / dir to be deleted
+   * @param filter A filename filter that make sure only files / dirs with the 
satisfied filenames
+   *               are deleted.
+   * @throws IOException if deletion is unsuccessful
+   */
+  public static void deleteRecursively(File file, FilenameFilter filter) 
throws IOException {
     if (file == null) { return; }
 
     // On Unix systems, use operating system command to run faster
     // If that does not work out, fallback to the Java IO way
-    if (SystemUtils.IS_OS_UNIX) {
+    if (SystemUtils.IS_OS_UNIX && filter == null) {
       try {
         deleteRecursivelyUsingUnixNative(file);
         return;
@@ -105,15 +115,17 @@ public class JavaUtils {
       }
     }
 
-    deleteRecursivelyUsingJavaIO(file);
+    deleteRecursivelyUsingJavaIO(file, filter);
   }
 
-  private static void deleteRecursivelyUsingJavaIO(File file) throws 
IOException {
+  private static void deleteRecursivelyUsingJavaIO(
+      File file,
+      FilenameFilter filter) throws IOException {
     if (file.isDirectory() && !isSymlink(file)) {
       IOException savedIOException = null;
-      for (File child : listFilesSafely(file)) {
+      for (File child : listFilesSafely(file, filter)) {
         try {
-          deleteRecursively(child);
+          deleteRecursively(child, filter);
         } catch (IOException e) {
           // In case of multiple exceptions, only last one will be thrown
           savedIOException = e;
@@ -124,10 +136,13 @@ public class JavaUtils {
       }
     }
 
-    boolean deleted = file.delete();
-    // Delete can also fail if the file simply did not exist.
-    if (!deleted && file.exists()) {
-      throw new IOException("Failed to delete: " + file.getAbsolutePath());
+    // Delete file only when it's a normal file or an empty directory.
+    if (file.isFile() || (file.isDirectory() && listFilesSafely(file, 
null).length == 0)) {
+      boolean deleted = file.delete();
+      // Delete can also fail if the file simply did not exist.
+      if (!deleted && file.exists()) {
+        throw new IOException("Failed to delete: " + file.getAbsolutePath());
+      }
     }
   }
 
@@ -157,9 +172,9 @@ public class JavaUtils {
     }
   }
 
-  private static File[] listFilesSafely(File file) throws IOException {
+  private static File[] listFilesSafely(File file, FilenameFilter filter) 
throws IOException {
     if (file.exists()) {
-      File[] files = file.listFiles();
+      File[] files = file.listFiles(filter);
       if (files == null) {
         throw new IOException("Failed to list files for dir: " + file);
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index fc7bba4..098fa79 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -139,6 +139,13 @@ public class ExternalShuffleBlockHandler extends 
RpcHandler {
   }
 
   /**
+   * Clean up any non-shuffle files in any local directories associated with 
an finished executor.
+   */
+  public void executorRemoved(String executorId, String appId) {
+    blockManager.executorRemoved(executorId, appId);
+  }
+
+  /**
    * Register an (application, executor) with the given shuffle info.
    *
    * The "re-" is meant to highlight the intended use of this method -- when 
this service is

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index e639989..58fb17f 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -212,6 +212,26 @@ public class ExternalShuffleBlockResolver {
   }
 
   /**
+   * Removes all the non-shuffle files in any local directories associated 
with the finished
+   * executor.
+   */
+  public void executorRemoved(String executorId, String appId) {
+    logger.info("Clean up non-shuffle files associated with the finished 
executor {}", executorId);
+    AppExecId fullId = new AppExecId(appId, executorId);
+    final ExecutorShuffleInfo executor = executors.get(fullId);
+    if (executor == null) {
+      // Executor not registered, skip clean up of the local directories.
+      logger.info("Executor is not registered (appId={}, execId={})", appId, 
executorId);
+    } else {
+      logger.info("Cleaning up non-shuffle files in executor {}'s {} local 
dirs", fullId,
+              executor.localDirs.length);
+
+      // Execute the actual deletion in a different thread, as it may take 
some time.
+      directoryCleaner.execute(() -> 
deleteNonShuffleFiles(executor.localDirs));
+    }
+  }
+
+  /**
    * Synchronously deletes each directory one at a time.
    * Should be executed in its own thread, as this may take a long time.
    */
@@ -227,6 +247,29 @@ public class ExternalShuffleBlockResolver {
   }
 
   /**
+   * Synchronously deletes non-shuffle files in each directory recursively.
+   * Should be executed in its own thread, as this may take a long time.
+   */
+  private void deleteNonShuffleFiles(String[] dirs) {
+    FilenameFilter filter = new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        // Don't delete shuffle data or shuffle index files.
+        return !name.endsWith(".index") && !name.endsWith(".data");
+      }
+    };
+
+    for (String localDir : dirs) {
+      try {
+        JavaUtils.deleteRecursively(new File(localDir), filter);
+        logger.debug("Successfully cleaned up non-shuffle files in directory: 
{}", localDir);
+      } catch (Exception e) {
+        logger.error("Failed to delete non-shuffle files in directory: " + 
localDir, e);
+      }
+    }
+  }
+
+  /**
    * Sort-based shuffle data uses an index called 
"shuffle_ShuffleId_MapId_0.index" into a data file
    * called "shuffle_ShuffleId_MapId_0.data". This logic is from 
IndexShuffleBlockResolver,
    * and the block id format is from ShuffleDataBlockId and 
ShuffleIndexBlockId.

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/NonShuffleFilesCleanupSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/NonShuffleFilesCleanupSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/NonShuffleFilesCleanupSuite.java
new file mode 100644
index 0000000..d22f3ac
--- /dev/null
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/NonShuffleFilesCleanupSuite.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.spark.network.util.MapConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class NonShuffleFilesCleanupSuite {
+
+  // Same-thread Executor used to ensure cleanup happens synchronously in test 
thread.
+  private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+  private TransportConf conf = new TransportConf("shuffle", 
MapConfigProvider.EMPTY);
+  private static final String SORT_MANAGER = 
"org.apache.spark.shuffle.sort.SortShuffleManager";
+
+  @Test
+  public void cleanupOnRemovedExecutorWithShuffleFiles() throws IOException {
+    cleanupOnRemovedExecutor(true);
+  }
+
+  @Test
+  public void cleanupOnRemovedExecutorWithoutShuffleFiles() throws IOException 
{
+    cleanupOnRemovedExecutor(false);
+  }
+
+  private void cleanupOnRemovedExecutor(boolean withShuffleFiles) throws 
IOException {
+    TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);
+
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
+    resolver.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo(SORT_MANAGER));
+    resolver.executorRemoved("exec0", "app");
+
+    assertCleanedUp(dataContext);
+  }
+
+  @Test
+  public void cleanupUsesExecutorWithShuffleFiles() throws IOException {
+    cleanupUsesExecutor(true);
+  }
+
+  @Test
+  public void cleanupUsesExecutorWithoutShuffleFiles() throws IOException {
+    cleanupUsesExecutor(false);
+  }
+
+  private void cleanupUsesExecutor(boolean withShuffleFiles) throws 
IOException {
+    TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);
+
+    AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+
+    // Executor which does nothing to ensure we're actually using it.
+    Executor noThreadExecutor = runnable -> cleanupCalled.set(true);
+
+    ExternalShuffleBlockResolver manager =
+      new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
+
+    manager.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo(SORT_MANAGER));
+    manager.executorRemoved("exec0", "app");
+
+    assertTrue(cleanupCalled.get());
+    assertStillThere(dataContext);
+  }
+
+  @Test
+  public void cleanupOnlyRemovedExecutorWithShuffleFiles() throws IOException {
+    cleanupOnlyRemovedExecutor(true);
+  }
+
+  @Test
+  public void cleanupOnlyRemovedExecutorWithoutShuffleFiles() throws 
IOException {
+    cleanupOnlyRemovedExecutor(false);
+  }
+
+  private void cleanupOnlyRemovedExecutor(boolean withShuffleFiles) throws 
IOException {
+    TestShuffleDataContext dataContext0 = initDataContext(withShuffleFiles);
+    TestShuffleDataContext dataContext1 = initDataContext(withShuffleFiles);
+
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
+    resolver.registerExecutor("app", "exec0", 
dataContext0.createExecutorInfo(SORT_MANAGER));
+    resolver.registerExecutor("app", "exec1", 
dataContext1.createExecutorInfo(SORT_MANAGER));
+
+
+    resolver.executorRemoved("exec-nonexistent", "app");
+    assertStillThere(dataContext0);
+    assertStillThere(dataContext1);
+
+    resolver.executorRemoved("exec0", "app");
+    assertCleanedUp(dataContext0);
+    assertStillThere(dataContext1);
+
+    resolver.executorRemoved("exec1", "app");
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+
+    // Make sure it's not an error to cleanup multiple times
+    resolver.executorRemoved("exec1", "app");
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+  }
+
+  @Test
+  public void cleanupOnlyRegisteredExecutorWithShuffleFiles() throws 
IOException {
+    cleanupOnlyRegisteredExecutor(true);
+  }
+
+  @Test
+  public void cleanupOnlyRegisteredExecutorWithoutShuffleFiles() throws 
IOException {
+    cleanupOnlyRegisteredExecutor(false);
+  }
+
+  private void cleanupOnlyRegisteredExecutor(boolean withShuffleFiles) throws 
IOException {
+    TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);
+
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
+    resolver.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo(SORT_MANAGER));
+
+    resolver.executorRemoved("exec1", "app");
+    assertStillThere(dataContext);
+
+    resolver.executorRemoved("exec0", "app");
+    assertCleanedUp(dataContext);
+  }
+
+  private static void assertStillThere(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      assertTrue(localDir + " was cleaned up prematurely", new 
File(localDir).exists());
+    }
+  }
+
+  private static FilenameFilter filter = new FilenameFilter() {
+    @Override
+    public boolean accept(File dir, String name) {
+      // Don't delete shuffle data or shuffle index files.
+      return !name.endsWith(".index") && !name.endsWith(".data");
+    }
+  };
+
+  private static boolean assertOnlyShuffleDataInDir(File[] dirs) {
+    for (File dir : dirs) {
+      assertTrue(dir.getName() + " wasn't cleaned up", !dir.exists() ||
+        dir.listFiles(filter).length == 0 || 
assertOnlyShuffleDataInDir(dir.listFiles()));
+    }
+    return true;
+  }
+
+  private static void assertCleanedUp(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      File[] dirs = new File[] {new File(localDir)};
+      assertOnlyShuffleDataInDir(dirs);
+    }
+  }
+
+  private static TestShuffleDataContext initDataContext(boolean 
withShuffleFiles)
+      throws IOException {
+    if (withShuffleFiles) {
+      return initDataContextWithShuffleFiles();
+    } else {
+      return initDataContextWithoutShuffleFiles();
+    }
+  }
+
+  private static TestShuffleDataContext initDataContextWithShuffleFiles() 
throws IOException {
+    TestShuffleDataContext dataContext = createDataContext();
+    createShuffleFiles(dataContext);
+    createNonShuffleFiles(dataContext);
+    return dataContext;
+  }
+
+  private static TestShuffleDataContext initDataContextWithoutShuffleFiles() 
throws IOException {
+    TestShuffleDataContext dataContext = createDataContext();
+    createNonShuffleFiles(dataContext);
+    return dataContext;
+  }
+
+  private static TestShuffleDataContext createDataContext() {
+    TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
+    dataContext.create();
+    return dataContext;
+  }
+
+  private static void createShuffleFiles(TestShuffleDataContext dataContext) 
throws IOException {
+    Random rand = new Random(123);
+    dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), 
new byte[][] {
+        "ABC".getBytes(StandardCharsets.UTF_8),
+        "DEF".getBytes(StandardCharsets.UTF_8)});
+  }
+
+  private static void createNonShuffleFiles(TestShuffleDataContext 
dataContext) throws IOException {
+    // Create spill file(s)
+    dataContext.insertSpillData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 81e0194..6989c3b 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.UUID;
 
 import com.google.common.io.Closeables;
 import com.google.common.io.Files;
@@ -94,6 +95,20 @@ public class TestShuffleDataContext {
     }
   }
 
+  /** Creates spill file(s) within the local dirs. */
+  public void insertSpillData() throws IOException {
+    String filename = "temp_local_" + UUID.randomUUID();
+    OutputStream dataStream = null;
+
+    try {
+      dataStream = new FileOutputStream(
+        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, 
filename));
+      dataStream.write(42);
+    } finally {
+      Closeables.close(dataStream, false);
+    }
+  }
+
   /**
    * Creates an ExecutorShuffleInfo object based on the given shuffle manager 
which targets this
    * context's directories.

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index f975fa5..b59a4fe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -94,6 +94,11 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
     blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
   }
 
+  /** Clean up all the non-shuffle files associated with an executor that has 
exited. */
+  def executorRemoved(executorId: String, appId: String): Unit = {
+    blockHandler.executorRemoved(executorId, appId)
+  }
+
   def stop() {
     if (server != null) {
       server.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 563b849..ee1ca0b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -23,6 +23,7 @@ import java.text.SimpleDateFormat
 import java.util.{Date, Locale, UUID}
 import java.util.concurrent._
 import java.util.concurrent.{Future => JFuture, ScheduledFuture => 
JScheduledFuture}
+import java.util.function.Supplier
 
 import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
 import scala.concurrent.ExecutionContext
@@ -49,7 +50,8 @@ private[deploy] class Worker(
     endpointName: String,
     workDirPath: String = null,
     val conf: SparkConf,
-    val securityMgr: SecurityManager)
+    val securityMgr: SecurityManager,
+    externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
   extends ThreadSafeRpcEndpoint with Logging {
 
   private val host = rpcEnv.address.host
@@ -97,6 +99,10 @@ private[deploy] class Worker(
   private val APP_DATA_RETENTION_SECONDS =
     conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
 
+  // Whether or not cleanup the non-shuffle files on executor exits.
+  private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
+    conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
+
   private val testing: Boolean = sys.props.contains("spark.testing")
   private var master: Option[RpcEndpointRef] = None
 
@@ -142,7 +148,11 @@ private[deploy] class Worker(
     WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
 
   // The shuffle service is not actually started unless configured.
-  private val shuffleService = new ExternalShuffleService(conf, securityMgr)
+  private val shuffleService = if (externalShuffleServiceSupplier != null) {
+    externalShuffleServiceSupplier.get()
+  } else {
+    new ExternalShuffleService(conf, securityMgr)
+  }
 
   private val publicAddress = {
     val envVar = conf.getenv("SPARK_PUBLIC_DNS")
@@ -732,6 +742,9 @@ private[deploy] class Worker(
           trimFinishedExecutorsIfNecessary()
           coresUsed -= executor.cores
           memoryUsed -= executor.memory
+          if (CLEANUP_NON_SHUFFLE_FILES_ENABLED) {
+            
shuffleService.executorRemoved(executorStateChanged.execId.toString, appId)
+          }
         case None =>
           logInfo("Unknown Executor " + fullId + " finished with state " + 
state +
             message.map(" message " + _).getOrElse("") +

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index ce212a7..e3fe2b6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,10 +17,19 @@
 
 package org.apache.spark.deploy.worker
 
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Supplier
+
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
 import org.scalatest.{BeforeAndAfter, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.{Command, ExecutorState}
+import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
 import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, 
ExecutorStateChanged}
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
@@ -29,6 +38,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
 
   import org.apache.spark.deploy.DeployTestUtils._
 
+  @Mock(answer = RETURNS_SMART_NULLS) private var shuffleService: 
ExternalShuffleService = _
+
   def cmd(javaOpts: String*): Command = {
     Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*))
   }
@@ -36,15 +47,21 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
 
   private var _worker: Worker = _
 
-  private def makeWorker(conf: SparkConf): Worker = {
+  private def makeWorker(
+      conf: SparkConf,
+      shuffleServiceSupplier: Supplier[ExternalShuffleService] = null): Worker 
= {
     assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
     _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, securityMgr)
+      "Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
     _worker
   }
 
+  before {
+    MockitoAnnotations.initMocks(this)
+  }
+
   after {
     if (_worker != null) {
       _worker.rpcEnv.shutdown()
@@ -194,4 +211,36 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
       assert(worker.finishedDrivers.size === expectedValue)
     }
   }
+
+  test("cleanup non-shuffle files after executor exits when config " +
+      "spark.storage.cleanupFilesAfterExecutorExit=true") {
+    testCleanupFilesWithConfig(true)
+  }
+
+  test("don't cleanup non-shuffle files after executor exits when config " +
+      "spark.storage.cleanupFilesAfterExecutorExit=false") {
+    testCleanupFilesWithConfig(false)
+  }
+
+  private def testCleanupFilesWithConfig(value: Boolean) = {
+    val conf = new 
SparkConf().set("spark.storage.cleanupFilesAfterExecutorExit", value.toString)
+
+    val cleanupCalled = new AtomicBoolean(false)
+    when(shuffleService.executorRemoved(any[String], 
any[String])).thenAnswer(new Answer[Unit] {
+      override def answer(invocations: InvocationOnMock): Unit = {
+        cleanupCalled.set(true)
+      }
+    })
+    val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
+      override def get: ExternalShuffleService = shuffleService
+    }
+    val worker = makeWorker(conf, externalShuffleServiceSupplier)
+    // initialize workers
+    for (i <- 0 until 10) {
+      worker.executors += s"app1/$i" -> createExecutorRunner(i)
+    }
+    worker.handleExecutorStateChanged(
+      ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
+    assert(cleanupCalled.get() == value)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ef167a5/docs/spark-standalone.md
----------------------------------------------------------------------
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index f06e72a..14d742d 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -255,6 +255,18 @@ SPARK_WORKER_OPTS supports the following system properties:
   </td>
 </tr>
 <tr>
+  <td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td>
+  <td>true</td>
+  <td>
+    Enable cleanup non-shuffle files(such as temp. shuffle blocks, cached 
RDD/broadcast blocks,
+    spill files, etc) of worker directories following executor exits. Note 
that this doesn't
+    overlap with `spark.worker.cleanup.enabled`, as this enables cleanup of 
non-shuffle files in
+    local directories of a dead executor, while `spark.worker.cleanup.enabled` 
enables cleanup of
+    all files/subdirectories of a stopped and timeout application.
+    This only affects Standalone mode, support of other cluster manangers can 
be added in the future.
+  </td>
+</tr>
+<tr>
   <td><code>spark.worker.ui.compressedLogFileLengthCacheSize</code></td>
   <td>100</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to