This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d4d8a63  [SPARK-36772] FinalizeShuffleMerge fails with an exception 
due to attempt id not matching
d4d8a63 is described below

commit d4d8a6320fceb00fa79a28e4e92ad454ae3cbb76
Author: Ye Zhou <yez...@linkedin.com>
AuthorDate: Sat Sep 18 15:51:57 2021 +0800

    [SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt 
id not matching
    
    ### What changes were proposed in this pull request?
    Remove the appAttemptId from TransportConf, and parsing through SparkEnv.
    
    ### Why are the changes needed?
    Push based shuffle will fail if there are any attemptId set in the 
SparkConf, as the attemptId is not set correctly in Driver.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested within our Yarn cluster. Without this PR, the Driver will fail to 
finalize the shuffle merge on all the mergers. After the patch, Driver can 
successfully finalize the shuffle merge and the push based shuffle can work 
fine.
    Also with unit test to verify the attemptId is being set in the 
BlockStoreClient in Driver.
    
    Closes #34018 from zhouyejoe/SPARK-36772.
    
    Authored-by: Ye Zhou <yez...@linkedin.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
    (cherry picked from commit cabc36b54d7f6633d8b128e511e7049c475b919d)
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/network/util/TransportConf.java   |  7 -----
 .../spark/network/shuffle/BlockStoreClient.java    | 12 +++++++++
 .../network/shuffle/ExternalBlockStoreClient.java  | 30 +++++++++++++++++++---
 .../main/scala/org/apache/spark/SparkContext.scala |  5 +++-
 .../executor/CoarseGrainedExecutorBackend.scala    |  6 ++++-
 .../scala/org/apache/spark/SparkContextSuite.scala | 12 +++++++++
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |  4 ++-
 7 files changed, 63 insertions(+), 13 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index bc507a4..f73e3ce 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -427,11 +427,4 @@ public class TransportConf {
   public int ioExceptionsThresholdDuringMerge() {
     return 
conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
   }
-
-  /**
-   * The application attemptID assigned from Hadoop YARN.
-   */
-  public int appAttemptId() {
-    return conf.getInt("spark.app.attempt.id", -1);
-  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
index 6dc5fd5..253fb7a 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
@@ -46,6 +46,8 @@ public abstract class BlockStoreClient implements Closeable {
 
   protected volatile TransportClientFactory clientFactory;
   protected String appId;
+  // Store the application attemptId
+  private String appAttemptId;
   protected TransportConf transportConf;
 
   /**
@@ -124,6 +126,16 @@ public abstract class BlockStoreClient implements 
Closeable {
     assert appId != null : "Called before init()";
   }
 
+  // Set the application attemptId
+  public void setAppAttemptId(String appAttemptId) {
+    this.appAttemptId = appAttemptId;
+  }
+
+  // Get the application attemptId
+  public String getAppAttemptId() {
+    return this.appAttemptId;
+  }
+
   /**
    * Request the local disk directories for executors which are located at the 
same host with
    * the current BlockStoreClient(it can be ExternalBlockStoreClient or 
NettyBlockTransferService).
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index 4c0e9f3..d2df776 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -52,6 +52,10 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
   private final boolean authEnabled;
   private final SecretKeyHolder secretKeyHolder;
   private final long registrationTimeoutMs;
+  // Push based shuffle requires a comparable Id to distinguish the shuffle 
data among multiple
+  // application attempts. This variable is derived from the String typed 
appAttemptId. If no
+  // appAttemptId is set, the default comparableAppAttemptId is -1.
+  private int comparableAppAttemptId = -1;
 
   /**
    * Creates an external shuffle client, with SASL optionally enabled. If SASL 
is not enabled,
@@ -84,6 +88,26 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
   }
 
   @Override
+  public void setAppAttemptId(String appAttemptId) {
+    super.setAppAttemptId(appAttemptId);
+    setComparableAppAttemptId(appAttemptId);
+  }
+
+  private void setComparableAppAttemptId(String appAttemptId) {
+    // For now, push based shuffle only supports running in YARN.
+    // Application attemptId in YARN is integer and it can be safely parsed
+    // to integer here. For the application attemptId from other cluster set up
+    // which is not numeric, it needs to generate this comparableAppAttemptId
+    // from the String typed appAttemptId through some other customized logic.
+    try {
+      this.comparableAppAttemptId = Integer.parseInt(appAttemptId);
+    } catch (NumberFormatException e) {
+      logger.warn("Push based shuffle requires comparable application 
attemptId, " +
+        "but the appAttemptId {} cannot be parsed to Integer", appAttemptId, 
e);
+    }
+  }
+
+  @Override
   public void fetchBlocks(
       String host,
       int port,
@@ -146,7 +170,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
               assert inputListener instanceof BlockPushingListener :
                 "Expecting a BlockPushingListener, but got " + 
inputListener.getClass();
               TransportClient client = clientFactory.createClient(host, port);
-              new OneForOneBlockPusher(client, appId, 
transportConf.appAttemptId(), inputBlockId,
+              new OneForOneBlockPusher(client, appId, comparableAppAttemptId, 
inputBlockId,
                 (BlockPushingListener) inputListener, buffersWithId).start();
             } else {
               logger.info("This clientFactory was closed. Skipping further 
block push retries.");
@@ -178,8 +202,8 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
     try {
       TransportClient client = clientFactory.createClient(host, port);
       ByteBuffer finalizeShuffleMerge =
-        new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), 
shuffleId,
-          shuffleMergeId).toByteBuffer();
+        new FinalizeShuffleMerge(
+          appId, comparableAppAttemptId, shuffleId, 
shuffleMergeId).toByteBuffer();
       client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
         @Override
         public void onSuccess(ByteBuffer response) {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d11fa55..f7d8c79 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -583,7 +583,10 @@ class SparkContext(config: SparkConf) extends Logging {
     _applicationId = _taskScheduler.applicationId()
     _applicationAttemptId = _taskScheduler.applicationAttemptId()
     _conf.set("spark.app.id", _applicationId)
-    _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, 
attemptId))
+    _applicationAttemptId.foreach { attemptId =>
+      _conf.set(APP_ATTEMPT_ID, attemptId)
+      _env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
+    }
     if (_conf.get(UI_REVERSE_PROXY)) {
       val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") +
         "/proxy/" + _applicationId
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ffcb30d..0f8e6d1 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -466,7 +466,11 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       driverConf.set(EXECUTOR_ID, arguments.executorId)
       val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, 
arguments.bindAddress,
         arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = 
false)
-
+      // Set the application attemptId in the BlockStoreClient if available.
+      val appAttemptId = env.conf.get(APP_ATTEMPT_ID)
+      appAttemptId.foreach(attemptId =>
+        env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
+      )
       val backend = backendCreateFn(env.rpcEnv, arguments, env, 
cfg.resourceProfile)
       env.rpcEnv.setupEndpoint("Executor", backend)
       arguments.workerUrl.foreach { url =>
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 93677d3..16330ba 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -1317,6 +1317,18 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       }
     }
   }
+
+  test("SPARK-36772: Store application attemptId in BlockStoreClient for push 
based shuffle") {
+    val conf = new SparkConf().setAppName("testAppAttemptId")
+      .setMaster("pushbasedshuffleclustermanager")
+    conf.set(PUSH_BASED_SHUFFLE_ENABLED.key, "true")
+    conf.set(IS_TESTING.key, "true")
+    conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+    sc = new SparkContext(conf)
+    val env = SparkEnv.get
+    assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1"))
+  }
+
 }
 
 object SparkContextSuite {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 312d1f8..deddaea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -3961,7 +3961,9 @@ private class PushBasedClusterManager extends 
ExternalClusterManager {
 
   override def createTaskScheduler(
       sc: SparkContext,
-      masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal 
= true)
+      masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal 
= true) {
+    override def applicationAttemptId(): Option[String] = Some("1")
+  }
 
   override def initialize(scheduler: TaskScheduler, backend: 
SchedulerBackend): Unit = {
     val sc = scheduler.asInstanceOf[TaskSchedulerImpl]

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to