This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 958a7d594d4 [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service 958a7d594d4 is described below commit 958a7d594d44900b82a70658c105cc7142fff92e Author: Chandni Singh <singh.chan...@gmail.com> AuthorDate: Fri Apr 21 12:21:35 2023 -0500 [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service ### What changes were proposed in this pull request? This change allows applications to control whether their metadata gets saved in the db. For applications with higher security requirements, storing application secret in the db without any encryption is a potential security risk. While filesystem ACLs can help protect the access to the db, this level of security is not sufficient for some use cases. Such applications can chose to not save their metadata in the db. As a result, these applications may experience more failures in the eve [...] ### Why are the changes needed? These modifications are necessary to reduce the likelihood of security threats for applications with elevated security requirements. ### Does this PR introduce _any_ user-facing change? No. Added a configuration `spark.shuffle.server.recovery.disabled` which by default is `false`. When set to `true`, the metadata of the application will not saved in the db. ### How was this patch tested? Added UTs and also verified with test applications in our test environment. Closes #40843 from otterc/SPARK-43179. Authored-by: Chandni Singh <singh.chan...@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../network/shuffle/AppsWithRecoveryDisabled.java | 66 +++++++++ .../shuffle/ExternalShuffleBlockResolver.java | 4 +- .../network/shuffle/RemoteBlockPushResolver.java | 7 +- .../spark/network/yarn/YarnShuffleService.java | 38 +++++- .../org/apache/spark/internal/config/package.scala | 9 ++ docs/running-on-yarn.md | 12 +- docs/security.md | 14 ++ .../spark/deploy/yarn/ExecutorRunnable.scala | 46 +++++-- .../spark/deploy/yarn/ExecutorRunnableSuite.scala | 106 +++++++++++++++ .../network/yarn/YarnShuffleServiceSuite.scala | 147 ++++++++++++++++++++- 10 files changed, 425 insertions(+), 24 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java new file mode 100644 index 00000000000..6a029a1083a --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/AppsWithRecoveryDisabled.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.base.Preconditions; + +/** + * Stores the applications which have recovery disabled. + */ +public final class AppsWithRecoveryDisabled { + + private static final AppsWithRecoveryDisabled INSTANCE = new AppsWithRecoveryDisabled(); + + private final Set<String> appsWithRecoveryDisabled = Collections.newSetFromMap( + new ConcurrentHashMap<>()); + + private AppsWithRecoveryDisabled() { + } + + /** + * Add an application for which recovery is disabled. + * @param appId application id + */ + public static void disableRecoveryOfApp(String appId) { + Preconditions.checkNotNull(appId); + INSTANCE.appsWithRecoveryDisabled.add(appId); + } + + /** + * Returns whether an application is enabled for recovery or not. + * @param appId application id + * @return true if the application is enabled for recovery; false otherwise. + */ + public static boolean isRecoveryEnabledForApp(String appId) { + Preconditions.checkNotNull(appId); + return !INSTANCE.appsWithRecoveryDisabled.contains(appId); + } + + /** + * Removes the application from the store. + * @param appId application id + */ + public static void removeApp(String appId) { + Preconditions.checkNotNull(appId); + INSTANCE.appsWithRecoveryDisabled.remove(appId); + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 6bca19a2511..ea341b73916 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -151,7 +151,7 @@ public class ExternalShuffleBlockResolver { AppExecId fullId = new AppExecId(appId, execId); logger.info("Registered executor {} with {}", fullId, executorInfo); try { - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { byte[] key = dbAppExecKey(fullId); byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8); db.put(key, value); @@ -224,7 +224,7 @@ public class ExternalShuffleBlockResolver { // Only touch executors associated with the appId that was removed. if (appId.equals(fullId.appId)) { it.remove(); - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(fullId.appId)) { try { db.delete(dbAppExecKey(fullId)); } catch (IOException e) { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index df2d1fa12d1..7f0862fcef4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -494,7 +494,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @VisibleForTesting void removeAppAttemptPathInfoFromDB(String appId, int attemptId) { AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId); - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { try { byte[] key = getDbAppAttemptPathsKey(appAttemptId); db.delete(key); @@ -967,7 +967,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { * Write the application attempt's local path information to the DB */ private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId); try { byte[] key = getDbAppAttemptPathsKey(appAttemptId); @@ -985,7 +985,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { */ private void writeAppAttemptShuffleMergeInfoToDB( AppAttemptShuffleMergeId appAttemptShuffleMergeId) { - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp( + appAttemptShuffleMergeId.appId)) { // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles try{ byte[] dbKey = getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 1fa0eebb7f8..578c1a19c40 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -28,6 +28,7 @@ import java.util.Objects; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -59,6 +60,7 @@ import org.apache.spark.network.crypto.AuthServerBootstrap; import org.apache.spark.network.sasl.ShuffleSecretManager; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; +import org.apache.spark.network.shuffle.AppsWithRecoveryDisabled; import org.apache.spark.network.shuffle.ExternalBlockHandler; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; @@ -136,6 +138,12 @@ public class YarnShuffleService extends AuxiliaryService { private static final boolean DEFAULT_STOP_ON_FAILURE = false; + @VisibleForTesting + static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED = + "spark.yarn.shuffle.server.recovery.disabled"; + @VisibleForTesting + static final String SECRET_KEY = "secret"; + // just for testing when you want to find an open port @VisibleForTesting static int boundPort = -1; @@ -407,10 +415,30 @@ public class YarnShuffleService extends AuxiliaryService { public void initializeApplication(ApplicationInitializationContext context) { String appId = context.getApplicationId().toString(); try { - ByteBuffer shuffleSecret = context.getApplicationDataForService(); + ByteBuffer appServiceData = context.getApplicationDataForService(); + String payload = JavaUtils.bytesToString(appServiceData); + String shuffleSecret; + Map<String, Object> metaInfo; + try { + metaInfo = mapper.readValue(payload, + new TypeReference<Map<String, Object>>() {}); + Object metadataStorageVal = metaInfo.get(SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED); + if (metadataStorageVal != null && (Boolean) metadataStorageVal) { + AppsWithRecoveryDisabled.disableRecoveryOfApp(appId); + logger.info("Disabling metadata persistence for application {}", appId); + } + } catch (IOException ioe) { + logger.warn("Unable to parse application data for service: " + payload); + metaInfo = null; + } if (isAuthenticationEnabled()) { - AppId fullId = new AppId(appId); - if (db != null) { + if (metaInfo != null) { + shuffleSecret = (String) metaInfo.get(SECRET_KEY); + } else { + shuffleSecret = payload; + } + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { + AppId fullId = new AppId(appId); byte[] key = dbAppKey(fullId); byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8); db.put(key, value); @@ -428,7 +456,7 @@ public class YarnShuffleService extends AuxiliaryService { try { if (isAuthenticationEnabled()) { AppId fullId = new AppId(appId); - if (db != null) { + if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { try { db.delete(dbAppKey(fullId)); } catch (IOException e) { @@ -440,6 +468,8 @@ public class YarnShuffleService extends AuxiliaryService { blockHandler.applicationRemoved(appId, false /* clean up local dirs */); } catch (Exception e) { logger.error("Exception when stopping application {}", appId, e); + } finally { + AppsWithRecoveryDisabled.removeApp(appId); } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 11febabd1bd..125025d6a2f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2507,4 +2507,13 @@ package object config { .version("3.5.0") .intConf .createWithDefault(Int.MaxValue) + + private[spark] val SHUFFLE_SERVER_RECOVERY_DISABLED = + ConfigBuilder("spark.yarn.shuffle.server.recovery.disabled") + .internal() + .doc("Set to true for applications that prefer to disable recovery when the External " + + "Shuffle Service restarts. This configuration only takes effect on YARN.") + .version("3.5.0") + .booleanConf + .createWithDefault(false) } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 19649ffb380..8c024901352 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -650,7 +650,7 @@ To use a custom metrics.properties for the application master and executors, upd <td><code>spark.yarn.report.loggingFrequency</code></td> <td><code>30</code></td> <td> - Maximum number of application reports processed until the next application status + Maximum number of application reports processed until the next application status is logged. If there is a change of state, the application status will be logged regardless of the number of application reports processed. </td> @@ -683,6 +683,16 @@ To use a custom metrics.properties for the application master and executors, upd </td> <td>3.0.0</td> </tr> +<tr> + <td><code>spark.yarn.shuffle.server.recovery.disabled</code></td> + <td>false</td> + <td> + Set to true for applications that have higher security requirements and prefer that their + secret is not saved in the db. The shuffle data of such applications wll not be recovered after + the External Shuffle Service restarts. + </td> + <td>3.5.0</td> +</tr> </table> #### Available patterns for SHS custom executor log URL diff --git a/docs/security.md b/docs/security.md index 7201ea51859..c856b94388f 100644 --- a/docs/security.md +++ b/docs/security.md @@ -60,6 +60,20 @@ distributing the shared secret. Each application will use a unique shared secret the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of secrets to be secure. +<table class="table table-striped"> +<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead> +<tr> + <td><code>spark.yarn.shuffle.server.recovery.disabled</code></td> + <td>false</td> + <td> + Set to true for applications that have higher security requirements and prefer that their + secret is not saved in the db. The shuffle data of such applications wll not be recovered after + the External Shuffle Service restarts. + </td> + <td>3.5.0</td> +</tr> +</table> + ### Kubernetes On Kubernetes, Spark will also automatically generate an authentication secret unique to each diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 1f3121ed224..e3fcf5472f5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -21,8 +21,11 @@ import java.nio.ByteBuffer import java.util.Collections import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.{HashMap, ListBuffer} +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation @@ -105,17 +108,7 @@ private[yarn] class ExecutorRunnable( // started on the NodeManager and, if authentication is enabled, provide it with our secret // key for fetching shuffle files later if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { - val secretString = securityMgr.getSecretKey() - val secretBytes = - if (secretString != null) { - // This conversion must match how the YarnShuffleService decodes our secret - JavaUtils.stringToBytes(secretString) - } else { - // Authentication is not enabled, so just provide dummy metadata - ByteBuffer.allocate(0) - } - val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) - ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) + configureServiceData(ctx) } // Send the start request to the ContainerManager @@ -128,6 +121,33 @@ private[yarn] class ExecutorRunnable( } } + private[yarn] def configureServiceData(ctx: ContainerLaunchContext): Unit = { + val secretString = securityMgr.getSecretKey() + + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + if (!sparkConf.get(SHUFFLE_SERVER_RECOVERY_DISABLED)) { + val secretBytes = + if (secretString != null) { + // This conversion must match how the YarnShuffleService decodes our secret + JavaUtils.stringToBytes(secretString) + } else { + // Authentication is not enabled, so just provide dummy metadata + ByteBuffer.allocate(0) + } + ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) + } else { + val payload = new mutable.HashMap[String, Object]() + payload.put(SHUFFLE_SERVER_RECOVERY_DISABLED.key, java.lang.Boolean.TRUE) + if (secretString != null) { + payload.put(ExecutorRunnable.SECRET_KEY, secretString) + } + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val jsonString = mapper.writeValueAsString(payload) + ctx.setServiceData(Collections.singletonMap(serviceName, JavaUtils.stringToBytes(jsonString))) + } + } + private def prepareCommand(): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() @@ -202,3 +222,7 @@ private[yarn] class ExecutorRunnable( env } } + +private[yarn] object ExecutorRunnable { + private[yarn] val SECRET_KEY = "secret" +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala new file mode 100644 index 00000000000..1ef3c9c410a --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.hadoop.yarn.api.records.{ContainerLaunchContext, LocalResource} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.Records +import org.mockito.Mockito.{mock, when} + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils + +class ExecutorRunnableSuite extends SparkFunSuite { + + private def createExecutorRunnable( + sparkConf: SparkConf = new SparkConf(), + securityManager: SecurityManager = mock(classOf[SecurityManager])): ExecutorRunnable = { + new ExecutorRunnable( + None, + new YarnConfiguration(), + sparkConf, + "yarn", + "exec-1", + "localhost", + 1, + 1, + "application_123_1", + securityManager, + Map.empty[String, LocalResource], + 0) + } + + for (shuffleServerRecoveryDisabled <- Seq(true, false)) { + test("validate service data when $shuffleServerRecoveryDisabled is " + + shuffleServerRecoveryDisabled) { + val sparkConf = new SparkConf() + sparkConf.set(SHUFFLE_SERVER_RECOVERY_DISABLED, shuffleServerRecoveryDisabled) + val securityManager = mock(classOf[SecurityManager]) + when(securityManager.getSecretKey()).thenReturn("secret") + val execRunnable = createExecutorRunnable(sparkConf, securityManager) + val ctx = Records.newRecord(classOf[ContainerLaunchContext]) + .asInstanceOf[ContainerLaunchContext] + execRunnable.configureServiceData(ctx) + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + val serviceData = ctx.getServiceData.get(serviceName) + assert(serviceData != null) + val payload: String = JavaUtils.bytesToString(serviceData) + var metaInfo: java.util.Map[String, AnyRef] = null + val secret = try { + val mapper = new ObjectMapper + metaInfo = mapper.readValue(payload, + new TypeReference[java.util.Map[String, AnyRef]]() {}) + metaInfo.get(ExecutorRunnable.SECRET_KEY).asInstanceOf[String] + } catch { + case _: IOException => + payload + } + assert(secret equals "secret") + if (shuffleServerRecoveryDisabled) { + assert(metaInfo != null) + val metadataStorageVal: Any = metaInfo.get(SHUFFLE_SERVER_RECOVERY_DISABLED.key) + assert(metadataStorageVal != null && metadataStorageVal.asInstanceOf[Boolean]) + } + } + } + + test("if shuffle server recovery is disabled and authentication is disabled, then" + + " service data should not contain secret") { + val sparkConf = new SparkConf() + sparkConf.set(SHUFFLE_SERVER_RECOVERY_DISABLED, true) + val execRunnable = createExecutorRunnable(sparkConf) + val ctx = Records.newRecord(classOf[ContainerLaunchContext]) + .asInstanceOf[ContainerLaunchContext] + execRunnable.configureServiceData(ctx) + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + val serviceData = ctx.getServiceData.get(serviceName) + assert(serviceData != null) + val payload: String = JavaUtils.bytesToString(serviceData) + val mapper = new ObjectMapper + val metaInfo = mapper.readValue(payload, + new TypeReference[java.util.Map[String, AnyRef]]() {}) + assert(!metaInfo.containsKey(ExecutorRunnable.SECRET_KEY)) + val metadataStorageVal: Any = metaInfo.get(SHUFFLE_SERVER_RECOVERY_DISABLED.key) + assert(metadataStorageVal != null && metadataStorageVal.asInstanceOf[Boolean]) + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 075a21c399e..3e78262a765 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -25,9 +25,12 @@ import java.util.EnumSet import scala.annotation.tailrec import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration._ import com.codahale.metrics.MetricSet +import com.fasterxml.jackson.databind.ObjectMapper +import com. fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.Path import org.apache.hadoop.metrics2.impl.MetricsSystemImpl import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem @@ -49,6 +52,7 @@ import org.apache.spark.network.shuffle.{Constants, MergedShuffleFileManager, No import org.apache.spark.network.shuffle.RemoteBlockPushResolver._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.network.shuffledb.DBBackend +import org.apache.spark.network.util.JavaUtils import org.apache.spark.network.util.TransportConf import org.apache.spark.network.yarn.util.HadoopConfigProvider import org.apache.spark.tags.ExtendedLevelDBTest @@ -1032,9 +1036,23 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { s2.stop() } - private def makeAppInfo(user: String, appId: ApplicationId): ApplicationInitializationContext = { - val secret = ByteBuffer.wrap(new Array[Byte](0)) - new ApplicationInitializationContext(user, appId, secret) + private def makeAppInfo(user: String, appId: ApplicationId, + metadataStorageDisabled: Boolean = false, + authEnabled: Boolean = true): ApplicationInitializationContext = { + if (!metadataStorageDisabled) { + val secret = ByteBuffer.wrap(new Array[Byte](0)) + new ApplicationInitializationContext(user, appId, secret) + } else { + val payload = new mutable.HashMap[String, Object]() + payload.put(YarnShuffleService.SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED, java.lang.Boolean.TRUE) + if (authEnabled) { + payload.put(YarnShuffleService.SECRET_KEY, "") + } + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val jsonString = mapper.writeValueAsString(payload) + new ApplicationInitializationContext(user, appId, JavaUtils.stringToBytes(jsonString)) + } } test("recovery db should not be created if NM recovery is not enabled") { @@ -1109,6 +1127,129 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf, null) assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager]) } + + test("secret of applications should not be stored in db if they want to be excluded") { + // set auth to true to test the secrets recovery + yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) + s1 = createYarnShuffleService() + val app1Id = ApplicationId.newInstance(1681252509, 1) + val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true) + s1.initializeApplication(app1Data) + val app2Id = ApplicationId.newInstance(1681252509, 2) + val app2Data = makeAppInfo("user", app2Id) + s1.initializeApplication(app2Data) + assert(s1.secretManager.getSecretKey(app1Id.toString()) == "") + assert(s1.secretManager.getSecretKey(app2Id.toString()) == "") + + val execShuffleInfo1 = + new ExecutorShuffleInfo( + Array(new File(tempDir, "foo/foo").getAbsolutePath, + new File(tempDir, "bar/bar").getAbsolutePath), 3, + SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1) + val execShuffleInfo2 = + new ExecutorShuffleInfo(Array(new File(tempDir, "bippy/bippy").getAbsolutePath), + 3, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1) + + val blockHandler = s1.blockHandler + val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) + blockResolver.registerExecutor(app1Id.toString, "exec-1", execShuffleInfo1) + blockResolver.registerExecutor(app2Id.toString, "exec-2", execShuffleInfo2) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should + be(Some(execShuffleInfo1)) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should + be(Some(execShuffleInfo2)) + + val mergeManager = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver] + mergeManager.registerExecutor(app1Id.toString, execShuffleInfo1) + mergeManager.registerExecutor(app2Id.toString, execShuffleInfo2) + val localDirsApp1 = Array(new File(tempDir, "foo/merge_manager_1").getAbsolutePath, + new File(tempDir, "bar/merge_manager_1").getAbsolutePath) + val localDirsApp2 = Array(new File(tempDir, "bippy/merge_manager_1").getAbsolutePath) + val appPathsInfo1 = new AppPathsInfo(localDirsApp1, 3) + val appPathsInfo2 = new AppPathsInfo(localDirsApp2, 3) + + ShuffleTestAccessor.getAppPathsInfo(app1Id.toString, mergeManager) should + be(Some(appPathsInfo1)) + ShuffleTestAccessor.getAppPathsInfo(app2Id.toString, mergeManager) should + be(Some(appPathsInfo2)) + + val partitionIdApp1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1) + val partitionIdApp2 = new AppAttemptShuffleMergeId(app2Id.toString, 1, 2, 1) + prepareAppShufflePartition(mergeManager, partitionIdApp1, 1, "3") + prepareAppShufflePartition(mergeManager, partitionIdApp2, 2, "4") + ShuffleTestAccessor.finalizeShuffleMerge(mergeManager, partitionIdApp1) + ShuffleTestAccessor.finalizeShuffleMerge(mergeManager, partitionIdApp2) + + val execStateFile = s1.registeredExecutorFile + assert(execStateFile.exists(), s"$execStateFile did not exist") + val mergeMgrFile = s1.mergeManagerFile + assert(mergeMgrFile.exists(), s"$mergeMgrFile did not exist") + + // shuffle service goes down + s1.stop() + // Yarn Shuffle service comes back up without custom mergeManager + s2 = createYarnShuffleService() + // Since secret of app1 is not saved in the db, it isn't recovered + assert(s2.secretManager.getSecretKey(app1Id.toString()) == null) + assert(s2.secretManager.getSecretKey(app2Id.toString()) == "") + + val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler) + val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver] + + // App1 executor information should not have been saved in the db. + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be(None) + ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be( + Some(execShuffleInfo2)) + // App1 should not have any merge related metadata stored in the db. + ShuffleTestAccessor + .getAppPathsInfo(app1Id.toString, mergeManager2) should be(None) + ShuffleTestAccessor.getAppPathsInfo(app2Id.toString, mergeManager2) should be( + Some(appPathsInfo2)) + + // Even though App1-partition1 was finalized before the restart, merge manager will recreate + // the partition since it didn't have any metadata saved for that app. + mergeManager2.registerExecutor(app1Id.toString, execShuffleInfo1) + prepareAppShufflePartition(mergeManager2, partitionIdApp1, 1, "3") + val dataFileApp1 = + ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager2, partitionIdApp1, 1) + dataFileApp1.length() should be((4 * 5 + 1) * DUMMY_BLOCK_DATA.length) + // Since app2-partition2 was metadata was saved, it cannot be re-opened. + val error = intercept[BlockPushNonFatalFailure] { + ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo( + mergeManager2, partitionIdApp2, 2, "3") + } + assert(error.getMessage.contains("is finalized")) + + s2.stopApplication(new ApplicationTerminationContext(app1Id)) + s2.stopApplication(new ApplicationTerminationContext(app2Id)) + s2.stop() + } + + test("executor info of apps should not be stored in db if they want to be excluded. " + + "Authentication is turned off") { + s1 = createYarnShuffleService() + val app1Id = ApplicationId.newInstance(1681252509, 1) + val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true, authEnabled = false) + s1.initializeApplication(app1Data) + val execShuffleInfo1 = + new ExecutorShuffleInfo( + Array(new File(tempDir, "foo/foo").getAbsolutePath, + new File(tempDir, "bar/bar").getAbsolutePath), 3, SORT_MANAGER) + val blockHandler = s1.blockHandler + val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) + blockResolver.registerExecutor(app1Id.toString, "exec-1", execShuffleInfo1) + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should + be(Some(execShuffleInfo1)) + // shuffle service goes down + s1.stop() + // Yarn Shuffle service comes back up without custom mergeManager + s2 = createYarnShuffleService() + val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler) + // App1 executor information should not have been saved in the db. + ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be(None) + s2.stopApplication(new ApplicationTerminationContext(app1Id)) + s2.stop() + } } @ExtendedLevelDBTest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org