This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 958a7d594d4 [SPARK-43179][SHUFFLE] Allowing apps to control whether 
their metadata gets saved in the db by the External Shuffle Service
958a7d594d4 is described below

commit 958a7d594d44900b82a70658c105cc7142fff92e
Author: Chandni Singh <singh.chan...@gmail.com>
AuthorDate: Fri Apr 21 12:21:35 2023 -0500

    [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets 
saved in the db by the External Shuffle Service
    
    ### What changes were proposed in this pull request?
    This change allows applications to control whether their metadata gets 
saved in the db. For applications with higher security requirements, storing 
application secret in the db without any encryption is a potential security 
risk. While filesystem ACLs can help protect the access to the db, this level 
of security is not sufficient for some use cases. Such applications can chose 
to not save their metadata in the db. As a result, these applications may 
experience more failures in the eve [...]
    
    ### Why are the changes needed?
    These modifications are necessary to reduce the likelihood of security 
threats for applications with elevated security requirements.
    
    ### Does this PR introduce _any_ user-facing change?
    No. Added a configuration `spark.shuffle.server.recovery.disabled` which by 
default is `false`. When set to `true`, the metadata of the application will 
not saved in the db.
    
    ### How was this patch tested?
    Added UTs and also verified with test applications in our test environment.
    
    Closes #40843 from otterc/SPARK-43179.
    
    Authored-by: Chandni Singh <singh.chan...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/AppsWithRecoveryDisabled.java  |  66 +++++++++
 .../shuffle/ExternalShuffleBlockResolver.java      |   4 +-
 .../network/shuffle/RemoteBlockPushResolver.java   |   7 +-
 .../spark/network/yarn/YarnShuffleService.java     |  38 +++++-
 .../org/apache/spark/internal/config/package.scala |   9 ++
 docs/running-on-yarn.md                            |  12 +-
 docs/security.md                                   |  14 ++
 .../spark/deploy/yarn/ExecutorRunnable.scala       |  46 +++++--
 .../spark/deploy/yarn/ExecutorRunnableSuite.scala  | 106 +++++++++++++++
 .../network/yarn/YarnShuffleServiceSuite.scala     | 147 ++++++++++++++++++++-
 10 files changed, 425 insertions(+), 24 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java
new file mode 100644
index 00000000000..6a029a1083a
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Stores the applications which have recovery disabled.
+ */
+public final class AppsWithRecoveryDisabled {
+
+  private static final AppsWithRecoveryDisabled INSTANCE = new 
AppsWithRecoveryDisabled();
+
+  private final Set<String> appsWithRecoveryDisabled = 
Collections.newSetFromMap(
+      new ConcurrentHashMap<>());
+
+  private AppsWithRecoveryDisabled() {
+  }
+
+  /**
+   * Add an application for which recovery is disabled.
+   * @param appId application id
+   */
+  public static void disableRecoveryOfApp(String appId) {
+    Preconditions.checkNotNull(appId);
+    INSTANCE.appsWithRecoveryDisabled.add(appId);
+  }
+
+  /**
+   * Returns whether an application is enabled for recovery or not.
+   * @param appId application id
+   * @return true if the application is enabled for recovery; false otherwise.
+   */
+  public static boolean isRecoveryEnabledForApp(String appId) {
+    Preconditions.checkNotNull(appId);
+    return !INSTANCE.appsWithRecoveryDisabled.contains(appId);
+  }
+
+  /**
+   * Removes the application from the store.
+   * @param appId application id
+   */
+  public static void removeApp(String appId) {
+    Preconditions.checkNotNull(appId);
+    INSTANCE.appsWithRecoveryDisabled.remove(appId);
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 6bca19a2511..ea341b73916 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -151,7 +151,7 @@ public class ExternalShuffleBlockResolver {
     AppExecId fullId = new AppExecId(appId, execId);
     logger.info("Registered executor {} with {}", fullId, executorInfo);
     try {
-      if (db != null) {
+      if (db != null && 
AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
         byte[] key = dbAppExecKey(fullId);
         byte[] value = 
mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
         db.put(key, value);
@@ -224,7 +224,7 @@ public class ExternalShuffleBlockResolver {
       // Only touch executors associated with the appId that was removed.
       if (appId.equals(fullId.appId)) {
         it.remove();
-        if (db != null) {
+        if (db != null && 
AppsWithRecoveryDisabled.isRecoveryEnabledForApp(fullId.appId)) {
           try {
             db.delete(dbAppExecKey(fullId));
           } catch (IOException e) {
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index df2d1fa12d1..7f0862fcef4 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -494,7 +494,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   @VisibleForTesting
   void removeAppAttemptPathInfoFromDB(String appId, int attemptId) {
     AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
-    if (db != null) {
+    if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) 
{
       try {
         byte[] key = getDbAppAttemptPathsKey(appAttemptId);
         db.delete(key);
@@ -967,7 +967,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
    * Write the application attempt's local path information to the DB
    */
   private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
-    if (db != null) {
+    if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) 
{
       AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
       try {
         byte[] key = getDbAppAttemptPathsKey(appAttemptId);
@@ -985,7 +985,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
    */
   private void writeAppAttemptShuffleMergeInfoToDB(
       AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
-    if (db != null) {
+    if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(
+        appAttemptShuffleMergeId.appId)) {
       // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
       try{
         byte[] dbKey = 
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 1fa0eebb7f8..578c1a19c40 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -28,6 +28,7 @@ import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -59,6 +60,7 @@ import org.apache.spark.network.crypto.AuthServerBootstrap;
 import org.apache.spark.network.sasl.ShuffleSecretManager;
 import org.apache.spark.network.server.TransportServer;
 import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.shuffle.AppsWithRecoveryDisabled;
 import org.apache.spark.network.shuffle.ExternalBlockHandler;
 import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.TransportConf;
@@ -136,6 +138,12 @@ public class YarnShuffleService extends AuxiliaryService {
 
   private static final boolean DEFAULT_STOP_ON_FAILURE = false;
 
+  @VisibleForTesting
+  static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED =
+      "spark.yarn.shuffle.server.recovery.disabled";
+  @VisibleForTesting
+  static final String SECRET_KEY = "secret";
+
   // just for testing when you want to find an open port
   @VisibleForTesting
   static int boundPort = -1;
@@ -407,10 +415,30 @@ public class YarnShuffleService extends AuxiliaryService {
   public void initializeApplication(ApplicationInitializationContext context) {
     String appId = context.getApplicationId().toString();
     try {
-      ByteBuffer shuffleSecret = context.getApplicationDataForService();
+      ByteBuffer appServiceData = context.getApplicationDataForService();
+      String payload = JavaUtils.bytesToString(appServiceData);
+      String shuffleSecret;
+      Map<String, Object> metaInfo;
+      try {
+        metaInfo = mapper.readValue(payload,
+            new TypeReference<Map<String, Object>>() {});
+        Object metadataStorageVal = 
metaInfo.get(SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED);
+        if (metadataStorageVal != null && (Boolean) metadataStorageVal) {
+          AppsWithRecoveryDisabled.disableRecoveryOfApp(appId);
+          logger.info("Disabling metadata persistence for application {}", 
appId);
+        }
+      } catch (IOException ioe) {
+        logger.warn("Unable to parse application data for service: " + 
payload);
+        metaInfo = null;
+      }
       if (isAuthenticationEnabled()) {
-        AppId fullId = new AppId(appId);
-        if (db != null) {
+        if (metaInfo != null) {
+          shuffleSecret = (String) metaInfo.get(SECRET_KEY);
+        } else {
+          shuffleSecret = payload;
+        }
+        if (db != null && 
AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
+          AppId fullId = new AppId(appId);
           byte[] key = dbAppKey(fullId);
           byte[] value = 
mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
           db.put(key, value);
@@ -428,7 +456,7 @@ public class YarnShuffleService extends AuxiliaryService {
     try {
       if (isAuthenticationEnabled()) {
         AppId fullId = new AppId(appId);
-        if (db != null) {
+        if (db != null && 
AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
           try {
             db.delete(dbAppKey(fullId));
           } catch (IOException e) {
@@ -440,6 +468,8 @@ public class YarnShuffleService extends AuxiliaryService {
       blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
     } catch (Exception e) {
       logger.error("Exception when stopping application {}", appId, e);
+    } finally {
+      AppsWithRecoveryDisabled.removeApp(appId);
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 11febabd1bd..125025d6a2f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2507,4 +2507,13 @@ package object config {
       .version("3.5.0")
       .intConf
       .createWithDefault(Int.MaxValue)
+
+  private[spark] val SHUFFLE_SERVER_RECOVERY_DISABLED =
+    ConfigBuilder("spark.yarn.shuffle.server.recovery.disabled")
+      .internal()
+      .doc("Set to true for applications that prefer to disable recovery when 
the External " +
+        "Shuffle Service restarts. This configuration only takes effect on 
YARN.")
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 19649ffb380..8c024901352 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -650,7 +650,7 @@ To use a custom metrics.properties for the application 
master and executors, upd
   <td><code>spark.yarn.report.loggingFrequency</code></td>
   <td><code>30</code></td>
   <td>
-    Maximum number of application reports processed until the next application 
status 
+    Maximum number of application reports processed until the next application 
status
     is logged. If there is a change of state, the application status will be 
logged regardless
     of the number of application reports processed.
   </td>
@@ -683,6 +683,16 @@ To use a custom metrics.properties for the application 
master and executors, upd
   </td>
   <td>3.0.0</td>
 </tr>
+<tr>
+  <td><code>spark.yarn.shuffle.server.recovery.disabled</code></td>
+  <td>false</td>
+  <td>
+    Set to true for applications that have higher security requirements and 
prefer that their
+    secret is not saved in the db. The shuffle data of such applications wll 
not be recovered after
+    the External Shuffle Service restarts.
+  </td>
+  <td>3.5.0</td>
+</tr>
 </table>
 
 #### Available patterns for SHS custom executor log URL
diff --git a/docs/security.md b/docs/security.md
index 7201ea51859..c856b94388f 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -60,6 +60,20 @@ distributing the shared secret. Each application will use a 
unique shared secret
 the case of YARN, this feature relies on YARN RPC encryption being enabled for 
the distribution of
 secrets to be secure.
 
+<table class="table table-striped">
+<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since 
Version</th></tr></thead>
+<tr>
+  <td><code>spark.yarn.shuffle.server.recovery.disabled</code></td>
+  <td>false</td>
+  <td>
+    Set to true for applications that have higher security requirements and 
prefer that their
+    secret is not saved in the db. The shuffle data of such applications wll 
not be recovered after
+    the External Shuffle Service restarts.
+  </td>
+  <td>3.5.0</td>
+</tr>
+</table>
+
 ### Kubernetes
 
 On Kubernetes, Spark will also automatically generate an authentication secret 
unique to each
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 1f3121ed224..e3fcf5472f5 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -21,8 +21,11 @@ import java.nio.ByteBuffer
 import java.util.Collections
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.{HashMap, ListBuffer}
 
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
@@ -105,17 +108,7 @@ private[yarn] class ExecutorRunnable(
     // started on the NodeManager and, if authentication is enabled, provide 
it with our secret
     // key for fetching shuffle files later
     if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
-      val secretString = securityMgr.getSecretKey()
-      val secretBytes =
-        if (secretString != null) {
-          // This conversion must match how the YarnShuffleService decodes our 
secret
-          JavaUtils.stringToBytes(secretString)
-        } else {
-          // Authentication is not enabled, so just provide dummy metadata
-          ByteBuffer.allocate(0)
-        }
-      val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
-      ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
+      configureServiceData(ctx)
     }
 
     // Send the start request to the ContainerManager
@@ -128,6 +121,33 @@ private[yarn] class ExecutorRunnable(
     }
   }
 
+  private[yarn] def configureServiceData(ctx: ContainerLaunchContext): Unit = {
+    val secretString = securityMgr.getSecretKey()
+
+    val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
+    if (!sparkConf.get(SHUFFLE_SERVER_RECOVERY_DISABLED)) {
+      val secretBytes =
+        if (secretString != null) {
+          // This conversion must match how the YarnShuffleService decodes our 
secret
+          JavaUtils.stringToBytes(secretString)
+        } else {
+          // Authentication is not enabled, so just provide dummy metadata
+          ByteBuffer.allocate(0)
+        }
+      ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
+    } else {
+      val payload = new mutable.HashMap[String, Object]()
+      payload.put(SHUFFLE_SERVER_RECOVERY_DISABLED.key, java.lang.Boolean.TRUE)
+      if (secretString != null) {
+        payload.put(ExecutorRunnable.SECRET_KEY, secretString)
+      }
+      val mapper = new ObjectMapper()
+      mapper.registerModule(DefaultScalaModule)
+      val jsonString = mapper.writeValueAsString(payload)
+      ctx.setServiceData(Collections.singletonMap(serviceName, 
JavaUtils.stringToBytes(jsonString)))
+    }
+  }
+
   private def prepareCommand(): List[String] = {
     // Extra options for the JVM
     val javaOpts = ListBuffer[String]()
@@ -202,3 +222,7 @@ private[yarn] class ExecutorRunnable(
     env
   }
 }
+
+private[yarn] object ExecutorRunnable {
+  private[yarn] val SECRET_KEY = "secret"
+}
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
new file mode 100644
index 00000000000..1ef3c9c410a
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.hadoop.yarn.api.records.{ContainerLaunchContext, 
LocalResource}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.Records
+import org.mockito.Mockito.{mock, when}
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+
+class ExecutorRunnableSuite extends SparkFunSuite {
+
+  private def createExecutorRunnable(
+      sparkConf: SparkConf = new SparkConf(),
+      securityManager: SecurityManager = mock(classOf[SecurityManager])): 
ExecutorRunnable = {
+    new ExecutorRunnable(
+      None,
+      new YarnConfiguration(),
+      sparkConf,
+      "yarn",
+      "exec-1",
+      "localhost",
+      1,
+      1,
+      "application_123_1",
+      securityManager,
+      Map.empty[String, LocalResource],
+      0)
+  }
+
+  for (shuffleServerRecoveryDisabled <- Seq(true, false)) {
+    test("validate service data when $shuffleServerRecoveryDisabled is " +
+      shuffleServerRecoveryDisabled) {
+      val sparkConf = new SparkConf()
+      sparkConf.set(SHUFFLE_SERVER_RECOVERY_DISABLED, 
shuffleServerRecoveryDisabled)
+      val securityManager = mock(classOf[SecurityManager])
+      when(securityManager.getSecretKey()).thenReturn("secret")
+      val execRunnable = createExecutorRunnable(sparkConf, securityManager)
+      val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+        .asInstanceOf[ContainerLaunchContext]
+      execRunnable.configureServiceData(ctx)
+      val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
+      val serviceData = ctx.getServiceData.get(serviceName)
+      assert(serviceData != null)
+      val payload: String = JavaUtils.bytesToString(serviceData)
+      var metaInfo: java.util.Map[String, AnyRef] = null
+      val secret = try {
+        val mapper = new ObjectMapper
+        metaInfo = mapper.readValue(payload,
+          new TypeReference[java.util.Map[String, AnyRef]]() {})
+        metaInfo.get(ExecutorRunnable.SECRET_KEY).asInstanceOf[String]
+      } catch {
+        case _: IOException =>
+          payload
+      }
+      assert(secret equals "secret")
+      if (shuffleServerRecoveryDisabled) {
+        assert(metaInfo != null)
+        val metadataStorageVal: Any = 
metaInfo.get(SHUFFLE_SERVER_RECOVERY_DISABLED.key)
+        assert(metadataStorageVal != null && 
metadataStorageVal.asInstanceOf[Boolean])
+      }
+    }
+  }
+
+  test("if shuffle server recovery is disabled and authentication is disabled, 
then" +
+    " service data should not contain secret") {
+    val sparkConf = new SparkConf()
+    sparkConf.set(SHUFFLE_SERVER_RECOVERY_DISABLED, true)
+    val execRunnable = createExecutorRunnable(sparkConf)
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+    execRunnable.configureServiceData(ctx)
+    val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
+    val serviceData = ctx.getServiceData.get(serviceName)
+    assert(serviceData != null)
+    val payload: String = JavaUtils.bytesToString(serviceData)
+    val mapper = new ObjectMapper
+    val metaInfo = mapper.readValue(payload,
+        new TypeReference[java.util.Map[String, AnyRef]]() {})
+    assert(!metaInfo.containsKey(ExecutorRunnable.SECRET_KEY))
+    val metadataStorageVal: Any = 
metaInfo.get(SHUFFLE_SERVER_RECOVERY_DISABLED.key)
+    assert(metadataStorageVal != null && 
metadataStorageVal.asInstanceOf[Boolean])
+  }
+}
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 075a21c399e..3e78262a765 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -25,9 +25,12 @@ import java.util.EnumSet
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.concurrent.duration._
 
 import com.codahale.metrics.MetricSet
+import com.fasterxml.jackson.databind.ObjectMapper
+import com. fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem
@@ -49,6 +52,7 @@ import org.apache.spark.network.shuffle.{Constants, 
MergedShuffleFileManager, No
 import org.apache.spark.network.shuffle.RemoteBlockPushResolver._
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.network.shuffledb.DBBackend
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.network.yarn.util.HadoopConfigProvider
 import org.apache.spark.tags.ExtendedLevelDBTest
@@ -1032,9 +1036,23 @@ abstract class YarnShuffleServiceSuite extends 
SparkFunSuite with Matchers {
     s2.stop()
   }
 
-  private def makeAppInfo(user: String, appId: ApplicationId): 
ApplicationInitializationContext = {
-    val secret = ByteBuffer.wrap(new Array[Byte](0))
-    new ApplicationInitializationContext(user, appId, secret)
+  private def makeAppInfo(user: String, appId: ApplicationId,
+      metadataStorageDisabled: Boolean = false,
+      authEnabled: Boolean = true): ApplicationInitializationContext = {
+    if (!metadataStorageDisabled) {
+      val secret = ByteBuffer.wrap(new Array[Byte](0))
+      new ApplicationInitializationContext(user, appId, secret)
+    } else {
+      val payload = new mutable.HashMap[String, Object]()
+      payload.put(YarnShuffleService.SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED, 
java.lang.Boolean.TRUE)
+      if (authEnabled) {
+        payload.put(YarnShuffleService.SECRET_KEY, "")
+      }
+      val mapper = new ObjectMapper()
+      mapper.registerModule(DefaultScalaModule)
+      val jsonString = mapper.writeValueAsString(payload)
+      new ApplicationInitializationContext(user, appId, 
JavaUtils.stringToBytes(jsonString))
+    }
   }
 
   test("recovery db should not be created if NM recovery is not enabled") {
@@ -1109,6 +1127,129 @@ abstract class YarnShuffleServiceSuite extends 
SparkFunSuite with Matchers {
     val mergeMgr = 
YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf, null)
     assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
   }
+
+  test("secret of applications should not be stored in db if they want to be 
excluded") {
+    // set auth to true to test the secrets recovery
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
+    s1 = createYarnShuffleService()
+    val app1Id = ApplicationId.newInstance(1681252509, 1)
+    val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true)
+    s1.initializeApplication(app1Data)
+    val app2Id = ApplicationId.newInstance(1681252509, 2)
+    val app2Data = makeAppInfo("user", app2Id)
+    s1.initializeApplication(app2Data)
+    assert(s1.secretManager.getSecretKey(app1Id.toString()) == "")
+    assert(s1.secretManager.getSecretKey(app2Id.toString()) == "")
+
+    val execShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val execShuffleInfo2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy/bippy").getAbsolutePath),
+        3, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+    val blockHandler = s1.blockHandler
+    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+    blockResolver.registerExecutor(app1Id.toString, "exec-1", execShuffleInfo1)
+    blockResolver.registerExecutor(app2Id.toString, "exec-2", execShuffleInfo2)
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
+      be(Some(execShuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
+      be(Some(execShuffleInfo2))
+
+    val mergeManager = 
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    mergeManager.registerExecutor(app1Id.toString, execShuffleInfo1)
+    mergeManager.registerExecutor(app2Id.toString, execShuffleInfo2)
+    val localDirsApp1 = Array(new File(tempDir, 
"foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirsApp2 = Array(new File(tempDir, 
"bippy/merge_manager_1").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirsApp1, 3)
+    val appPathsInfo2 = new AppPathsInfo(localDirsApp2, 3)
+
+    ShuffleTestAccessor.getAppPathsInfo(app1Id.toString, mergeManager) should
+      be(Some(appPathsInfo1))
+    ShuffleTestAccessor.getAppPathsInfo(app2Id.toString, mergeManager) should
+      be(Some(appPathsInfo2))
+
+    val partitionIdApp1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 
1)
+    val partitionIdApp2 = new AppAttemptShuffleMergeId(app2Id.toString, 1, 2, 
1)
+    prepareAppShufflePartition(mergeManager, partitionIdApp1, 1, "3")
+    prepareAppShufflePartition(mergeManager, partitionIdApp2, 2, "4")
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager, partitionIdApp1)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager, partitionIdApp2)
+
+    val execStateFile = s1.registeredExecutorFile
+    assert(execStateFile.exists(), s"$execStateFile did not exist")
+    val mergeMgrFile = s1.mergeManagerFile
+    assert(mergeMgrFile.exists(), s"$mergeMgrFile did not exist")
+
+    // shuffle service goes down
+    s1.stop()
+    // Yarn Shuffle service comes back up without custom mergeManager
+    s2 = createYarnShuffleService()
+    // Since secret of app1 is not saved in the db, it isn't recovered
+    assert(s2.secretManager.getSecretKey(app1Id.toString()) == null)
+    assert(s2.secretManager.getSecretKey(app2Id.toString()) == "")
+
+    val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler)
+    val mergeManager2 = 
s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+
+    // App1 executor information should not have been saved in the db.
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should 
be(None)
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be(
+      Some(execShuffleInfo2))
+    // App1 should not have any merge related metadata stored in the db.
+    ShuffleTestAccessor
+      .getAppPathsInfo(app1Id.toString, mergeManager2) should be(None)
+    ShuffleTestAccessor.getAppPathsInfo(app2Id.toString, mergeManager2) should 
be(
+      Some(appPathsInfo2))
+
+    // Even though App1-partition1 was finalized before the restart, merge 
manager will recreate
+    // the partition since it didn't have any metadata saved for that app.
+    mergeManager2.registerExecutor(app1Id.toString, execShuffleInfo1)
+    prepareAppShufflePartition(mergeManager2, partitionIdApp1, 1, "3")
+    val dataFileApp1 =
+      ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager2, 
partitionIdApp1, 1)
+    dataFileApp1.length() should be((4 * 5 + 1) * DUMMY_BLOCK_DATA.length)
+    // Since app2-partition2 was metadata was saved, it cannot be re-opened.
+    val error = intercept[BlockPushNonFatalFailure] {
+      ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo(
+        mergeManager2, partitionIdApp2, 2, "3")
+    }
+    assert(error.getMessage.contains("is finalized"))
+
+    s2.stopApplication(new ApplicationTerminationContext(app1Id))
+    s2.stopApplication(new ApplicationTerminationContext(app2Id))
+    s2.stop()
+  }
+
+  test("executor info of apps should not be stored in db if they want to be 
excluded. " +
+    "Authentication is turned off") {
+    s1 = createYarnShuffleService()
+    val app1Id = ApplicationId.newInstance(1681252509, 1)
+    val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true, 
authEnabled = false)
+    s1.initializeApplication(app1Data)
+    val execShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3, SORT_MANAGER)
+    val blockHandler = s1.blockHandler
+    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+    blockResolver.registerExecutor(app1Id.toString, "exec-1", execShuffleInfo1)
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
+      be(Some(execShuffleInfo1))
+    // shuffle service goes down
+    s1.stop()
+    // Yarn Shuffle service comes back up without custom mergeManager
+    s2 = createYarnShuffleService()
+    val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler)
+    // App1 executor information should not have been saved in the db.
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should 
be(None)
+    s2.stopApplication(new ApplicationTerminationContext(app1Id))
+    s2.stop()
+  }
 }
 
 @ExtendedLevelDBTest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to