This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 45ad044042f [SPARK-43179][FOLLOW-UP] Use the secret ByteBuffer instead of the String 45ad044042f is described below commit 45ad044042f7f376c4c0234807a62179b680edae Author: Chandni Singh <chsi...@linkedin.com> AuthorDate: Sun Jun 11 07:59:35 2023 -0500 [SPARK-43179][FOLLOW-UP] Use the secret ByteBuffer instead of the String ### What changes were proposed in this pull request? Introduced a bug with this change: https://github.com/apache/spark/pull/40843. To get the value that is persisted in db, we used to use `mapper.writeValueAsString(ByteBuffer)`. We changed it to `mapper.writeValueAsString(String)`. However, when we load from the db, it still uses `ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);` causing exceptions when the shuffle service is unable to recover the apps: ``` ERROR org.apache.spark.network.server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5764589675121231159 java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: digest response format violation. Mismatched response. at org.sparkproject.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121) at org.apache.spark.network.sasl.SaslRpcHandler.doAuthChallenge(Sas [...] ``` ### Why are the changes needed? It fixes the bug that was introduced with SPARK-43179 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existing UTs in the `YarnShuffleServiceSuite` were using empty password which masked the issue. Changed it to use a non-empty password. Closes #41502 from otterc/SPARK-43179-followup. Authored-by: Chandni Singh <chsi...@linkedin.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/network/yarn/YarnShuffleService.java | 4 +++- .../network/yarn/YarnShuffleServiceSuite.scala | 25 +++++++++++++--------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 578c1a19c40..b34ebf6e29b 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -440,7 +440,9 @@ public class YarnShuffleService extends AuxiliaryService { if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) { AppId fullId = new AppId(appId); byte[] key = dbAppKey(fullId); - byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8); + ByteBuffer dbVal = metaInfo != null ? + JavaUtils.stringToBytes(shuffleSecret) : appServiceData; + byte[] value = mapper.writeValueAsString(dbVal).getBytes(StandardCharsets.UTF_8); db.put(key, value); } secretManager.registerApp(appId, shuffleSecret); diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 3e78262a765..552cc98311e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -71,6 +71,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID = "org.apache.spark.shuffle.sort.SortShuffleManager:{\"mergeDir\": \"merge_manager\"}" private val DUMMY_BLOCK_DATA = "dummyBlockData".getBytes(StandardCharsets.UTF_8) + private val DUMMY_PASSWORD = "dummyPassword" + private val EMPTY_PASSWORD = "" private var recoveryLocalDir: File = _ protected var tempDir: File = _ @@ -191,7 +193,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { val app3Data = makeAppInfo("user", app3Id) s1.initializeApplication(app3Data) val app4Id = ApplicationId.newInstance(0, 4) - val app4Data = makeAppInfo("user", app4Id) + val app4Data = makeAppInfo("user", app4Id, metadataStorageDisabled = false, + authEnabled = true, DUMMY_PASSWORD) s1.initializeApplication(app4Data) val execStateFile = s1.registeredExecutorFile @@ -1038,15 +1041,15 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { private def makeAppInfo(user: String, appId: ApplicationId, metadataStorageDisabled: Boolean = false, - authEnabled: Boolean = true): ApplicationInitializationContext = { + authEnabled: Boolean = true, + password: String = EMPTY_PASSWORD): ApplicationInitializationContext = { if (!metadataStorageDisabled) { - val secret = ByteBuffer.wrap(new Array[Byte](0)) - new ApplicationInitializationContext(user, appId, secret) + new ApplicationInitializationContext(user, appId, JavaUtils.stringToBytes(password)) } else { val payload = new mutable.HashMap[String, Object]() payload.put(YarnShuffleService.SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED, java.lang.Boolean.TRUE) if (authEnabled) { - payload.put(YarnShuffleService.SECRET_KEY, "") + payload.put(YarnShuffleService.SECRET_KEY, password) } val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) @@ -1133,13 +1136,15 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1 = createYarnShuffleService() val app1Id = ApplicationId.newInstance(1681252509, 1) - val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true) + val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true, + authEnabled = true, EMPTY_PASSWORD) s1.initializeApplication(app1Data) val app2Id = ApplicationId.newInstance(1681252509, 2) - val app2Data = makeAppInfo("user", app2Id) + val app2Data = makeAppInfo("user", app2Id, metadataStorageDisabled = false, + authEnabled = true, DUMMY_PASSWORD) s1.initializeApplication(app2Data) - assert(s1.secretManager.getSecretKey(app1Id.toString()) == "") - assert(s1.secretManager.getSecretKey(app2Id.toString()) == "") + assert(s1.secretManager.getSecretKey(app1Id.toString()) == EMPTY_PASSWORD) + assert(s1.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD) val execShuffleInfo1 = new ExecutorShuffleInfo( @@ -1191,7 +1196,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { s2 = createYarnShuffleService() // Since secret of app1 is not saved in the db, it isn't recovered assert(s2.secretManager.getSecretKey(app1Id.toString()) == null) - assert(s2.secretManager.getSecretKey(app2Id.toString()) == "") + assert(s2.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD) val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler) val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org