This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 45ad044042f [SPARK-43179][FOLLOW-UP] Use the secret ByteBuffer instead 
of the String
45ad044042f is described below

commit 45ad044042f7f376c4c0234807a62179b680edae
Author: Chandni Singh <chsi...@linkedin.com>
AuthorDate: Sun Jun 11 07:59:35 2023 -0500

    [SPARK-43179][FOLLOW-UP] Use the secret ByteBuffer instead of the String
    
    ### What changes were proposed in this pull request?
    Introduced a bug with this change: 
https://github.com/apache/spark/pull/40843. To get the value that is persisted 
in db, we used to use `mapper.writeValueAsString(ByteBuffer)`. We changed it to 
`mapper.writeValueAsString(String)`. However, when we load from the db, it 
still uses
    `ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);` 
causing exceptions when the shuffle service is unable to recover the apps:
    ```
    ERROR org.apache.spark.network.server.TransportRequestHandler: Error while 
invoking RpcHandler#receive() on RPC id 5764589675121231159 
java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: 
digest response format violation. Mismatched response. at 
org.sparkproject.guava.base.Throwables.propagate(Throwables.java:160) at 
org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121)
 at org.apache.spark.network.sasl.SaslRpcHandler.doAuthChallenge(Sas [...]
    ```
    
    ### Why are the changes needed?
    It fixes the bug that was introduced with SPARK-43179
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    The existing UTs in the `YarnShuffleServiceSuite` were using empty password 
which masked the issue. Changed it to use a non-empty password.
    
    Closes #41502 from otterc/SPARK-43179-followup.
    
    Authored-by: Chandni Singh <chsi...@linkedin.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../spark/network/yarn/YarnShuffleService.java     |  4 +++-
 .../network/yarn/YarnShuffleServiceSuite.scala     | 25 +++++++++++++---------
 2 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 578c1a19c40..b34ebf6e29b 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -440,7 +440,9 @@ public class YarnShuffleService extends AuxiliaryService {
         if (db != null && 
AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
           AppId fullId = new AppId(appId);
           byte[] key = dbAppKey(fullId);
-          byte[] value = 
mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
+          ByteBuffer dbVal = metaInfo != null ?
+              JavaUtils.stringToBytes(shuffleSecret) : appServiceData;
+          byte[] value = 
mapper.writeValueAsString(dbVal).getBytes(StandardCharsets.UTF_8);
           db.put(key, value);
         }
         secretManager.registerApp(appId, shuffleSecret);
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 3e78262a765..552cc98311e 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -71,6 +71,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite 
with Matchers {
   private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID =
     "org.apache.spark.shuffle.sort.SortShuffleManager:{\"mergeDir\": 
\"merge_manager\"}"
   private val DUMMY_BLOCK_DATA = 
"dummyBlockData".getBytes(StandardCharsets.UTF_8)
+  private val DUMMY_PASSWORD = "dummyPassword"
+  private val EMPTY_PASSWORD = ""
 
   private var recoveryLocalDir: File = _
   protected var tempDir: File = _
@@ -191,7 +193,8 @@ abstract class YarnShuffleServiceSuite extends 
SparkFunSuite with Matchers {
     val app3Data = makeAppInfo("user", app3Id)
     s1.initializeApplication(app3Data)
     val app4Id = ApplicationId.newInstance(0, 4)
-    val app4Data = makeAppInfo("user", app4Id)
+    val app4Data = makeAppInfo("user", app4Id, metadataStorageDisabled = false,
+        authEnabled = true, DUMMY_PASSWORD)
     s1.initializeApplication(app4Data)
 
     val execStateFile = s1.registeredExecutorFile
@@ -1038,15 +1041,15 @@ abstract class YarnShuffleServiceSuite extends 
SparkFunSuite with Matchers {
 
   private def makeAppInfo(user: String, appId: ApplicationId,
       metadataStorageDisabled: Boolean = false,
-      authEnabled: Boolean = true): ApplicationInitializationContext = {
+      authEnabled: Boolean = true,
+      password: String = EMPTY_PASSWORD): ApplicationInitializationContext = {
     if (!metadataStorageDisabled) {
-      val secret = ByteBuffer.wrap(new Array[Byte](0))
-      new ApplicationInitializationContext(user, appId, secret)
+      new ApplicationInitializationContext(user, appId, 
JavaUtils.stringToBytes(password))
     } else {
       val payload = new mutable.HashMap[String, Object]()
       payload.put(YarnShuffleService.SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED, 
java.lang.Boolean.TRUE)
       if (authEnabled) {
-        payload.put(YarnShuffleService.SECRET_KEY, "")
+        payload.put(YarnShuffleService.SECRET_KEY, password)
       }
       val mapper = new ObjectMapper()
       mapper.registerModule(DefaultScalaModule)
@@ -1133,13 +1136,15 @@ abstract class YarnShuffleServiceSuite extends 
SparkFunSuite with Matchers {
     yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
     s1 = createYarnShuffleService()
     val app1Id = ApplicationId.newInstance(1681252509, 1)
-    val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true)
+    val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true,
+        authEnabled = true, EMPTY_PASSWORD)
     s1.initializeApplication(app1Data)
     val app2Id = ApplicationId.newInstance(1681252509, 2)
-    val app2Data = makeAppInfo("user", app2Id)
+    val app2Data = makeAppInfo("user", app2Id, metadataStorageDisabled = false,
+        authEnabled = true, DUMMY_PASSWORD)
     s1.initializeApplication(app2Data)
-    assert(s1.secretManager.getSecretKey(app1Id.toString()) == "")
-    assert(s1.secretManager.getSecretKey(app2Id.toString()) == "")
+    assert(s1.secretManager.getSecretKey(app1Id.toString()) == EMPTY_PASSWORD)
+    assert(s1.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD)
 
     val execShuffleInfo1 =
       new ExecutorShuffleInfo(
@@ -1191,7 +1196,7 @@ abstract class YarnShuffleServiceSuite extends 
SparkFunSuite with Matchers {
     s2 = createYarnShuffleService()
     // Since secret of app1 is not saved in the db, it isn't recovered
     assert(s2.secretManager.getSecretKey(app1Id.toString()) == null)
-    assert(s2.secretManager.getSecretKey(app2Id.toString()) == "")
+    assert(s2.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD)
 
     val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler)
     val mergeManager2 = 
s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to