This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a7596e  [SPARK-37618][CORE] Remove shuffle blocks using the shuffle 
service for released executors
9a7596e is described below

commit 9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Fri Mar 25 13:00:17 2022 -0500

    [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for 
released executors
    
    ### What changes were proposed in this pull request?
    
    Add support for removing shuffle files on released executors via the 
external shuffle service. The shuffle service already supports removing shuffle 
service cached RDD blocks, so I reused this mechanism to remove shuffle blocks 
as well, so as not to require updating the shuffle service itself.
    
    To support this change functioning in a secure Yarn environment, I updated 
permissions on some of the block manager folders and files. Specifically:
    - Block manager sub directories have the group write posix permission added 
to them. This gives the shuffle service permission to delete files from within 
these folders.
    - Shuffle files have the world readable posix permission added to them. 
This is because when the sub directories are marked group writable, they lose 
the setgid bit that gets set in a secure Yarn environment. Without this, the 
permissions on the files would be `rw-r-----`, and since the group running Yarn 
(and therefore the shuffle service), is no longer the group owner of the file, 
it does not have access to read the file. The sub directories still do not have 
world execute permissio [...]
    
    Both of these changes are done after creating a file so that umasks don't 
affect the resulting permissions.
    
    ### Why are the changes needed?
    
    External shuffle services are very useful for long running jobs and dynamic 
allocation. However, currently if an executor is removed (either through 
dynamic deallocation or through some error), the shuffle files created by that 
executor will live until the application finishes. This results in local disks 
slowly filling up over time, eventually causing problems for long running 
applications.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test. Not sure if there's a better way I could have tested for the 
files being deleted or any other tests I should add.
    
    Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks.
    
    Authored-by: Adam Binford <adam...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../network/shuffle/ExternalBlockStoreClient.java  |   4 +-
 .../sort/io/LocalDiskShuffleMapOutputWriter.java   |   3 +-
 .../scala/org/apache/spark/ContextCleaner.scala    |   4 +-
 .../src/main/scala/org/apache/spark/SparkEnv.scala |   6 +-
 .../org/apache/spark/internal/config/package.scala |  10 ++
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  18 ++-
 .../spark/shuffle/ShuffleBlockResolver.scala       |   8 ++
 .../spark/storage/BlockManagerMasterEndpoint.scala |  89 +++++++++++----
 .../apache/spark/storage/DiskBlockManager.scala    |  61 +++++++++-
 .../scala/org/apache/spark/storage/DiskStore.scala |  10 ++
 .../shuffle/sort/UnsafeShuffleWriterSuite.java     |   8 ++
 .../apache/spark/ExternalShuffleServiceSuite.scala | 127 ++++++++++++++++++++-
 .../sort/BypassMergeSortShuffleWriterSuite.scala   |  11 ++
 .../sort/IndexShuffleBlockResolverSuite.scala      |   5 +
 .../io/LocalDiskShuffleMapOutputWriterSuite.scala  |   5 +
 .../storage/BlockManagerReplicationSuite.scala     |   3 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |   3 +-
 .../spark/storage/DiskBlockManagerSuite.scala      |  26 ++++-
 docs/configuration.md                              |  11 ++
 .../streaming/ReceivedBlockHandlerSuite.scala      |   3 +-
 20 files changed, 372 insertions(+), 43 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index d2df776..b066d99 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -299,7 +299,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
           BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
           numRemovedBlocksFuture.complete(((BlocksRemoved) 
msgObj).numRemovedBlocks);
         } catch (Throwable t) {
-          logger.warn("Error trying to remove RDD blocks " + 
Arrays.toString(blockIds) +
+          logger.warn("Error trying to remove blocks " + 
Arrays.toString(blockIds) +
             " via external shuffle service from executor: " + execId, t);
           numRemovedBlocksFuture.complete(0);
         }
@@ -307,7 +307,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
 
       @Override
       public void onFailure(Throwable e) {
-        logger.warn("Error trying to remove RDD blocks " + 
Arrays.toString(blockIds) +
+        logger.warn("Error trying to remove blocks " + 
Arrays.toString(blockIds) +
           " via external shuffle service from executor: " + execId, e);
         numRemovedBlocksFuture.complete(0);
       }
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index 6c5025d..efe508d 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -36,7 +36,6 @@ import 
org.apache.spark.shuffle.api.WritableByteChannelWrapper;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.shuffle.IndexShuffleBlockResolver;
 import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;
-import org.apache.spark.util.Utils;
 
 /**
  * Implementation of {@link ShuffleMapOutputWriter} that replicates the 
functionality of shuffle
@@ -87,7 +86,7 @@ public class LocalDiskShuffleMapOutputWriter implements 
ShuffleMapOutputWriter {
     }
     lastPartitionId = reducePartitionId;
     if (outputTempFile == null) {
-      outputTempFile = Utils.tempFileWith(outputFile);
+      outputTempFile = blockResolver.createTempFile(outputFile);
     }
     if (outputFileChannel != null) {
       currChannelPosition = outputFileChannel.position();
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 091b5e1..a6fa28b 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
     try {
       if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
         logDebug("Cleaning shuffle " + shuffleId)
-        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+        // Shuffle must be removed before it's unregistered from the output 
tracker
+        // to find blocks served by the shuffle service on deallocated 
executors
         shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
         listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
         logDebug("Cleaned shuffle " + shuffleId)
       } else {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d07614a..19467e7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -343,12 +343,14 @@ object SparkEnv extends Logging {
           isLocal,
           conf,
           listenerBus,
-          if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
+          if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
             externalShuffleClient
           } else {
             None
           }, blockManagerInfo,
-          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], isDriver)),
+          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
+          shuffleManager,
+          isDriver)),
       registerOrLookupEndpoint(
         BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
         new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, 
blockManagerInfo)),
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index fa048f5..aa8f63e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -686,6 +686,16 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.service.removeShuffle")
+      .doc("Whether to use the ExternalShuffleService for deleting shuffle 
blocks for " +
+        "deallocated executors when the shuffle is no longer needed. Without 
this enabled, " +
+        "shuffle data on executors that are deallocated will remain on disk 
until the " +
+        "application ends.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED =
     ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
       .doc("Whether to use the ExternalShuffleService for fetching disk 
persisted RDD blocks. " +
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index f1485ec..ba54555 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -84,6 +84,11 @@ private[spark] class IndexShuffleBlockResolver(
     shuffleFiles.map(_.length()).sum
   }
 
+  /** Create a temporary file that will be renamed to the final resulting file 
*/
+  def createTempFile(file: File): File = {
+    blockManager.diskBlockManager.createTempFileWith(file)
+  }
+
   /**
    * Get the shuffle data file.
    *
@@ -234,7 +239,7 @@ private[spark] class IndexShuffleBlockResolver(
         throw new IllegalStateException(s"Unexpected shuffle block transfer 
${blockId} as " +
           s"${blockId.getClass().getSimpleName()}")
     }
-    val fileTmp = Utils.tempFileWith(file)
+    val fileTmp = createTempFile(file)
     val channel = Channels.newChannel(
       serializerManager.wrapStream(blockId,
         new FileOutputStream(fileTmp)))
@@ -335,7 +340,7 @@ private[spark] class IndexShuffleBlockResolver(
       checksums: Array[Long],
       dataTmp: File): Unit = {
     val indexFile = getIndexFile(shuffleId, mapId)
-    val indexTmp = Utils.tempFileWith(indexFile)
+    val indexTmp = createTempFile(indexFile)
 
     val checksumEnabled = checksums.nonEmpty
     val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) {
@@ -343,7 +348,7 @@ private[spark] class IndexShuffleBlockResolver(
         "The size of partition lengths and checksums should be equal")
       val checksumFile =
         getChecksumFile(shuffleId, mapId, 
conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
-      (Some(checksumFile), Some(Utils.tempFileWith(checksumFile)))
+      (Some(checksumFile), Some(createTempFile(checksumFile)))
     } else {
       (None, None)
     }
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {
+    Seq(
+      ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID),
+      ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
+    )
+  }
+
   override def stop(): Unit = {}
 }
 
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
index 0f35f8c..c8fde8d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
@@ -42,6 +42,14 @@ trait ShuffleBlockResolver {
   def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): 
ManagedBuffer
 
   /**
+   * Retrieve a list of BlockIds for a given shuffle map. Used to delete 
shuffle files
+   * from the external shuffle service after the associated executor has been 
removed.
+   */
+  def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
+    Seq.empty
+  }
+
+  /**
    * Retrieve the data for the specified merged shuffle block as multiple 
chunks.
    */
   def getMergedBlockData(
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index b96befc..4d8ba9b 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -36,6 +36,7 @@ import 
org.apache.spark.network.shuffle.ExternalBlockStoreClient
 import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, 
RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, 
CoarseGrainedSchedulerBackend}
+import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
 
@@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint(
     externalBlockStoreClient: Option[ExternalBlockStoreClient],
     blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
     mapOutputTracker: MapOutputTrackerMaster,
+    shuffleManager: ShuffleManager,
     isDriver: Boolean)
   extends IsolatedRpcEndpoint with Logging {
 
@@ -104,9 +106,11 @@ class BlockManagerMasterEndpoint(
   private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, 
isDriver)
 
   logInfo("BlockManagerMasterEndpoint up")
-  // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)
-  //   && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)`
-  private val externalShuffleServiceRddFetchEnabled: Boolean = 
externalBlockStoreClient.isDefined
+
+  private val externalShuffleServiceRemoveShuffleEnabled: Boolean =
+    externalBlockStoreClient.isDefined && 
conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED)
+  private val externalShuffleServiceRddFetchEnabled: Boolean =
+    externalBlockStoreClient.isDefined && 
conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
   private val externalShuffleServicePort: Int = 
StorageUtils.externalShuffleServicePort(conf)
 
   private lazy val driverEndpoint =
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    val removeRddBlockViaExtShuffleServiceFutures = 
externalBlockStoreClient.map { shuffleClient =>
-      blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
-        Future[Int] {
-          val numRemovedBlocks = shuffleClient.removeBlocks(
-            bmId.host,
-            bmId.port,
-            bmId.executorId,
-            blockIds.map(_.toString).toArray)
-          numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+    val removeRddBlockViaExtShuffleServiceFutures = if 
(externalShuffleServiceRddFetchEnabled) {
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Int] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+          }
         }
-      }
-    }.getOrElse(Seq.empty)
+      }.getOrElse(Seq.empty)
+    } else {
+      Seq.empty
+    }
 
     Future.sequence(removeRddFromExecutorsFutures ++ 
removeRddBlockViaExtShuffleServiceFutures)
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
-    // Nothing to do in the BlockManagerMasterEndpoint data structures
     val removeMsg = RemoveShuffle(shuffleId)
-    Future.sequence(
-      blockManagerInfo.values.map { bm =>
-        bm.storageEndpoint.ask[Boolean](removeMsg).recover {
-          // use false as default value means no shuffle data were removed
-          handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+    val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+      bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+        // use false as default value means no shuffle data were removed
+        handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+      }
+    }.toSeq
+
+    // Find all shuffle blocks on executors that are no longer running
+    val blocksToDeleteByShuffleService =
+      new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+    if (externalShuffleServiceRemoveShuffleEnabled) {
+      mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus 
=>
+        shuffleStatus.withMapStatuses { mapStatuses =>
+          mapStatuses.foreach { mapStatus =>
+            // Check if the executor has been deallocated
+            if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
+              val blocksToDel =
+                
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, 
mapStatus.mapId)
+              if (blocksToDel.nonEmpty) {
+                val blocks = 
blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
+                  new mutable.HashSet[BlockId])
+                blocks ++= blocksToDel
+              }
+            }
+          }
         }
-      }.toSeq
-    )
+      }
+    }
+
+    val removeShuffleFromShuffleServicesFutures =
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Boolean] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
+              TimeUnit.SECONDS) == blockIds.size
+          }
+        }
+      }.getOrElse(Seq.empty)
+
+    Future.sequence(removeShuffleFromExecutorsFutures ++
+      removeShuffleFromShuffleServicesFutures)
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index c6a2297..e29f3fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{File, IOException}
 import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermission
 import java.util.UUID
 
 import scala.collection.mutable.HashMap
@@ -77,6 +78,15 @@ private[spark] class DiskBlockManager(
 
   private val shutdownHook = addShutdownHook()
 
+  // If either of these features are enabled, we must change permissions on 
block manager
+  // directories and files to accomodate the shuffle service deleting files in 
a secure environment.
+  // Parent directories are assumed to be restrictive to prevent unauthorized 
users from accessing
+  // or modifying world readable files.
+  private val permissionChangingRequired = 
conf.get(config.SHUFFLE_SERVICE_ENABLED) && (
+    conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) ||
+    conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
+  )
+
   /** Looks up a file by hashing it into one of our local subdirectories. */
   // This method should be kept in sync with
   // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath().
@@ -94,7 +104,16 @@ private[spark] class DiskBlockManager(
       } else {
         val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
         if (!newDir.exists()) {
-          Files.createDirectory(newDir.toPath)
+          val path = newDir.toPath
+          Files.createDirectory(path)
+          if (permissionChangingRequired) {
+            // SPARK-37618: Create dir as group writable so files within can 
be deleted by the
+            // shuffle service in a secure setup. This will remove the setgid 
bit so files created
+            // within won't be created with the parent folder group.
+            val currentPerms = Files.getPosixFilePermissions(path)
+            currentPerms.add(PosixFilePermission.GROUP_WRITE)
+            Files.setPosixFilePermissions(path, currentPerms)
+          }
         }
         subDirs(dirId)(subDirId) = newDir
         newDir
@@ -166,6 +185,37 @@ private[spark] class DiskBlockManager(
     }
   }
 
+  /**
+   * SPARK-37618: Makes sure that the file is created as world readable. This 
is to get
+   * around the fact that making the block manager sub dirs group writable 
removes
+   * the setgid bit in secure Yarn environments, which prevents the shuffle 
service
+   * from being able to read shuffle files. The outer directories will still 
not be
+   * world executable, so this doesn't allow access to these files except for 
the
+   * running user and shuffle service.
+   */
+  def createWorldReadableFile(file: File): Unit = {
+    val path = file.toPath
+    Files.createFile(path)
+    val currentPerms = Files.getPosixFilePermissions(path)
+    currentPerms.add(PosixFilePermission.OTHERS_READ)
+    Files.setPosixFilePermissions(path, currentPerms)
+  }
+
+  /**
+   * Creates a temporary version of the given file with world readable 
permissions (if required).
+   * Used to create block files that will be renamed to the final version of 
the file.
+   */
+  def createTempFileWith(file: File): File = {
+    val tmpFile = Utils.tempFileWith(file)
+    if (permissionChangingRequired) {
+      // SPARK-37618: we need to make the file world readable because the 
parent will
+      // lose the setgid bit when making it group writable. Without this the 
shuffle
+      // service can't read the shuffle files in a secure setup.
+      createWorldReadableFile(tmpFile)
+    }
+    tmpFile
+  }
+
   /** Produces a unique block id and File suitable for storing local 
intermediate results. */
   def createTempLocalBlock(): (TempLocalBlockId, File) = {
     var blockId = new TempLocalBlockId(UUID.randomUUID())
@@ -181,7 +231,14 @@ private[spark] class DiskBlockManager(
     while (getFile(blockId).exists()) {
       blockId = new TempShuffleBlockId(UUID.randomUUID())
     }
-    (blockId, getFile(blockId))
+    val tmpFile = getFile(blockId)
+    if (permissionChangingRequired) {
+      // SPARK-37618: we need to make the file world readable because the 
parent will
+      // lose the setgid bit when making it group writable. Without this the 
shuffle
+      // service can't read the shuffle files in a secure setup.
+      createWorldReadableFile(tmpFile)
+    }
+    (blockId, tmpFile)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index f0334c5..d45947d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -50,6 +50,9 @@ private[spark] class DiskStore(
   private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
   private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
 
+  private val shuffleServiceFetchRddEnabled = 
conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
+    conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
+
   def getSize(blockId: BlockId): Long = blockSizes.get(blockId)
 
   /**
@@ -71,6 +74,13 @@ private[spark] class DiskStore(
     logDebug(s"Attempting to put block $blockId")
     val startTimeNs = System.nanoTime()
     val file = diskManager.getFile(blockId)
+
+    // SPARK-37618: If fetching cached RDDs from the shuffle service is 
enabled, we must make
+    // the file world readable, as it will not be owned by the group running 
the shuffle service
+    // in a secure environment. This is due to changing directory permissions 
to allow deletion,
+    if (shuffleServiceFetchRddEnabled) {
+      diskManager.createWorldReadableFile(file)
+    }
     val out = new CountingWritableChannel(openForWrite(file))
     var threwException: Boolean = true
     try {
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index f4e09b7..8a3df5a 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -127,6 +127,10 @@ public class UnsafeShuffleWriterSuite implements 
ShuffleChecksumTestHelper {
       });
 
     when(shuffleBlockResolver.getDataFile(anyInt(), 
anyLong())).thenReturn(mergedOutputFile);
+    
when(shuffleBlockResolver.createTempFile(any(File.class))).thenAnswer(invocationOnMock
 -> {
+      File file = (File) invocationOnMock.getArguments()[0];
+      return Utils.tempFileWith(file);
+    });
 
     Answer<?> renameTempAnswer = invocationOnMock -> {
       partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
@@ -158,6 +162,10 @@ public class UnsafeShuffleWriterSuite implements 
ShuffleChecksumTestHelper {
       spillFilesCreated.add(file);
       return Tuple2$.MODULE$.apply(blockId, file);
     });
+    
when(diskBlockManager.createTempFileWith(any(File.class))).thenAnswer(invocationOnMock
 -> {
+      File file = (File) invocationOnMock.getArguments()[0];
+      return Utils.tempFileWith(file);
+    });
 
     when(taskContext.taskMetrics()).thenReturn(taskMetrics);
     when(shuffleDep.serializer()).thenReturn(serializer);
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 48c1cc5..dd3d90f3 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -17,6 +17,13 @@
 
 package org.apache.spark
 
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermission
+
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.Eventually
 import org.scalatest.matchers.should.Matchers._
@@ -26,9 +33,9 @@ import org.apache.spark.internal.config
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.server.TransportServer
-import org.apache.spark.network.shuffle.{ExternalBlockHandler, 
ExternalBlockStoreClient}
-import org.apache.spark.storage.{RDDBlockId, StorageLevel}
-import org.apache.spark.util.Utils
+import org.apache.spark.network.shuffle.{ExecutorDiskUtils, 
ExternalBlockHandler, ExternalBlockStoreClient}
+import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId, 
ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * This suite creates an external shuffle server and routes all shuffle 
fetches through it.
@@ -101,7 +108,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll wi
   }
 
   test("SPARK-25888: using external shuffle service fetching disk persisted 
blocks") {
-    val confWithRddFetchEnabled = 
conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
+    val confWithRddFetchEnabled = conf.clone
+      .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+      .set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", 
confWithRddFetchEnabled)
     sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
     sc.env.blockManager.blockStoreClient.getClass should 
equal(classOf[ExternalBlockStoreClient])
@@ -113,13 +122,42 @@ class ExternalShuffleServiceSuite extends ShuffleSuite 
with BeforeAndAfterAll wi
       rdd.count()
 
       val blockId = RDDBlockId(rdd.id, 0)
-      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+      val bms = eventually(timeout(2.seconds), interval(100.milliseconds)) {
         val locations = sc.env.blockManager.master.getLocations(blockId)
         assert(locations.size === 2)
         assert(locations.map(_.port).contains(server.getPort),
           "external shuffle service port should be contained")
+        locations
       }
 
+      val dirManager = sc.env.blockManager.hostLocalDirManager
+          .getOrElse(fail("No host local dir manager"))
+
+      val promises = bms.map { case bmid =>
+          val promise = Promise[File]()
+          dirManager.getHostLocalDirs(bmid.host, bmid.port, 
Seq(bmid.executorId).toArray) {
+            case scala.util.Success(res) => res.foreach { case (eid, dirs) =>
+              val file = new File(ExecutorDiskUtils.getFilePath(dirs,
+                sc.env.blockManager.subDirsPerLocalDir, blockId.name))
+              promise.success(file)
+            }
+            case scala.util.Failure(error) => promise.failure(error)
+          }
+          promise.future
+        }
+      val filesToCheck = promises.map(p => ThreadUtils.awaitResult(p, 
Duration(2, "sec")))
+
+      filesToCheck.foreach(f => {
+        val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath)
+        assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE))
+
+        // On most operating systems the default umask will make this test pass
+        // even if the permission isn't changed. To properly test this, run the
+        // test with a umask of 0027
+        val perms = Files.getPosixFilePermissions(f.toPath)
+        assert(perms.contains(PosixFilePermission.OTHERS_READ))
+      })
+
       sc.killExecutors(sc.getExecutorIds())
 
       eventually(timeout(2.seconds), interval(100.milliseconds)) {
@@ -138,4 +176,83 @@ class ExternalShuffleServiceSuite extends ShuffleSuite 
with BeforeAndAfterAll wi
       rpcHandler.applicationRemoved(sc.conf.getAppId, true)
     }
   }
+
+  test("SPARK-37618: external shuffle service removes shuffle blocks from 
deallocated executors") {
+    for (enabled <- Seq(true, false)) {
+      // Use local disk reading to get location of shuffle files on disk
+      val confWithLocalDiskReading = conf.clone
+        .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+        .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled)
+      sc = new SparkContext("local-cluster[1,1,1024]", "test", 
confWithLocalDiskReading)
+      sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+      sc.env.blockManager.blockStoreClient.getClass should 
equal(classOf[ExternalBlockStoreClient])
+      try {
+        val rdd = sc.parallelize(0 until 100, 2)
+          .map { i => (i, 1) }
+          .repartition(1)
+
+        rdd.count()
+
+        val mapOutputs = sc.env.mapOutputTracker.getMapSizesByExecutorId(0, 
0).toSeq
+
+        val dirManager = sc.env.blockManager.hostLocalDirManager
+          .getOrElse(fail("No host local dir manager"))
+
+        val promises = mapOutputs.map { case (bmid, blocks) =>
+          val promise = Promise[Seq[File]]()
+          dirManager.getHostLocalDirs(bmid.host, bmid.port, 
Seq(bmid.executorId).toArray) {
+            case scala.util.Success(res) => res.foreach { case (eid, dirs) =>
+              val files = blocks.flatMap { case (blockId, _, _) =>
+                val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+                Seq(
+                  ShuffleDataBlockId(shuffleBlockId.shuffleId, 
shuffleBlockId.mapId,
+                    shuffleBlockId.reduceId).name,
+                  ShuffleIndexBlockId(shuffleBlockId.shuffleId, 
shuffleBlockId.mapId,
+                    shuffleBlockId.reduceId).name
+                ).map { blockId =>
+                  new File(ExecutorDiskUtils.getFilePath(dirs,
+                    sc.env.blockManager.subDirsPerLocalDir, blockId))
+                }
+              }
+              promise.success(files)
+            }
+            case scala.util.Failure(error) => promise.failure(error)
+          }
+          promise.future
+        }
+        val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, 
Duration(2, "sec")))
+        assert(filesToCheck.length == 4)
+        assert(filesToCheck.forall(_.exists()))
+
+        if (enabled) {
+          filesToCheck.foreach(f => {
+            val parentPerms = 
Files.getPosixFilePermissions(f.getParentFile.toPath)
+            assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE))
+
+            // On most operating systems the default umask will make this test 
pass
+            // even if the permission isn't changed. To properly test this, 
run the
+            // test with a umask of 0027
+            val perms = Files.getPosixFilePermissions(f.toPath)
+            assert(perms.contains(PosixFilePermission.OTHERS_READ))
+          })
+        }
+
+        sc.killExecutors(sc.getExecutorIds())
+        eventually(timeout(2.seconds), interval(100.milliseconds)) {
+          
assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty)
+        }
+
+        sc.cleaner.foreach(_.doCleanupShuffle(0, true))
+
+        if (enabled) {
+          assert(filesToCheck.forall(!_.exists()))
+        } else {
+          assert(filesToCheck.forall(_.exists()))
+        }
+      } finally {
+        rpcHandler.applicationRemoved(sc.conf.getAppId, true)
+        sc.stop()
+      }
+    }
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 38ed702..83bd3b0 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -111,6 +111,12 @@ class BypassMergeSortShuffleWriterSuite
           blockId = args(0).asInstanceOf[BlockId])
       }
 
+    when(blockResolver.createTempFile(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
+
     when(diskBlockManager.createTempShuffleBlock())
       .thenAnswer { _ =>
         val blockId = new TempShuffleBlockId(UUID.randomUUID)
@@ -266,6 +272,11 @@ class BypassMergeSortShuffleWriterSuite
         temporaryFilesCreated += file
         (blockId, file)
       }
+    when(diskBlockManager.createTempFileWith(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
 
     val numPartition = shuffleHandle.dependency.partitioner.numPartitions
     val writer = new BypassMergeSortShuffleWriter[Int, Int](
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index 21704b1..de12f68 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -56,6 +56,11 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
       any[BlockId], any[Option[Array[String]]])).thenAnswer(
       (invocation: InvocationOnMock) => new File(tempDir, 
invocation.getArguments.head.toString))
     when(diskBlockManager.localDirs).thenReturn(Array(tempDir))
+    when(diskBlockManager.createTempFileWith(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
     conf.set("spark.app.id", appId)
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
index 35d9b4a..6c9ec8b 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
@@ -74,6 +74,11 @@ class LocalDiskShuffleMapOutputWriterSuite extends 
SparkFunSuite with BeforeAndA
       .set("spark.app.id", "example.spark.app")
       .set("spark.shuffle.unsafe.file.output.buffer", "16k")
     when(blockResolver.getDataFile(anyInt, 
anyLong)).thenReturn(mergedOutputFile)
+    when(blockResolver.createTempFile(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
     when(blockResolver.writeMetadataFileAndCommit(
       anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), 
any(classOf[File])))
       .thenAnswer { invocationOnMock =>
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index fc7b7a4..14e1ee5 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -102,7 +102,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]()
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, 
isDriver = true)),
+        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, 
sc.env.shuffleManager,
+        isDriver = true)),
       rpcEnv.setupEndpoint("blockmanagerHeartbeat",
       new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, 
blockManagerInfo)), conf, true)
     allStores.clear()
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0f99ea8..45e05b2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -188,7 +188,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     liveListenerBus = spy(new LiveListenerBus(conf))
     master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        liveListenerBus, None, blockManagerInfo, mapOutputTracker, isDriver = 
true)),
+        liveListenerBus, None, blockManagerInfo, mapOutputTracker, 
shuffleManager,
+        isDriver = true)),
       rpcEnv.setupEndpoint("blockmanagerHeartbeat",
       new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, 
blockManagerInfo)), conf, true))
   }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index b36eeb7..58fe40f 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{File, FileWriter}
 import java.nio.file.{Files, Paths}
-import java.nio.file.attribute.PosixFilePermissions
+import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions}
 import java.util.HashMap
 
 import com.fasterxml.jackson.core.`type`.TypeReference
@@ -141,6 +141,30 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     assert(attemptId.equals("1"))
   }
 
+  test("SPARK-37618: Sub dirs are group writable when removing from shuffle 
service enabled") {
+    val conf = testConf.clone
+    conf.set("spark.local.dir", rootDirs)
+    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set("spark.shuffle.service.removeShuffle", "false")
+    val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true, isDriver = false)
+    val blockId = new TestBlockId("test")
+    val newFile = diskBlockManager.getFile(blockId)
+    val parentDir = newFile.getParentFile()
+    assert(parentDir.exists && parentDir.isDirectory)
+    val permission = Files.getPosixFilePermissions(parentDir.toPath)
+    assert(!permission.contains(PosixFilePermission.GROUP_WRITE))
+
+    assert(parentDir.delete())
+
+    conf.set("spark.shuffle.service.removeShuffle", "true")
+    val diskBlockManager2 = new DiskBlockManager(conf, deleteFilesOnStop = 
true, isDriver = false)
+    val newFile2 = diskBlockManager2.getFile(blockId)
+    val parentDir2 = newFile2.getParentFile()
+    assert(parentDir2.exists && parentDir2.isDirectory)
+    val permission2 = Files.getPosixFilePermissions(parentDir2.toPath)
+    assert(permission2.contains(PosixFilePermission.GROUP_WRITE))
+  }
+
   def writeToFile(file: File, numBytes: Int): Unit = {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)
diff --git a/docs/configuration.md b/docs/configuration.md
index a2cf233..4fa3779 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -997,6 +997,17 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>2.3.0</td>
 </tr>
 <tr>
+  <td><code>spark.shuffle.service.removeShuffle</code></td>
+  <td>false</td>
+  <td>
+    Whether to use the ExternalShuffleService for deleting shuffle blocks for
+    deallocated executors when the shuffle is no longer needed. Without this 
enabled,
+    shuffle data on executors that are deallocated will remain on disk until 
the
+    application ends.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
   <td><code>spark.shuffle.maxChunksBeingTransferred</code></td>
   <td>Long.MAX_VALUE</td>
   <td>
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index a3b5b38..dcf82d5 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -93,7 +93,8 @@ abstract class 
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
     val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]()
     blockManagerMaster = new 
BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, 
isDriver = true)),
+        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, 
shuffleManager,
+        isDriver = true)),
       rpcEnv.setupEndpoint("blockmanagerHeartbeat",
       new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, 
blockManagerInfo)), conf, true)
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to