This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new d4d8a63 [SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching d4d8a63 is described below commit d4d8a6320fceb00fa79a28e4e92ad454ae3cbb76 Author: Ye Zhou <yez...@linkedin.com> AuthorDate: Sat Sep 18 15:51:57 2021 +0800 [SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching ### What changes were proposed in this pull request? Remove the appAttemptId from TransportConf, and parsing through SparkEnv. ### Why are the changes needed? Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine. Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver. Closes #34018 from zhouyejoe/SPARK-36772. Authored-by: Ye Zhou <yez...@linkedin.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> (cherry picked from commit cabc36b54d7f6633d8b128e511e7049c475b919d) Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/network/util/TransportConf.java | 7 ----- .../spark/network/shuffle/BlockStoreClient.java | 12 +++++++++ .../network/shuffle/ExternalBlockStoreClient.java | 30 +++++++++++++++++++--- .../main/scala/org/apache/spark/SparkContext.scala | 5 +++- .../executor/CoarseGrainedExecutorBackend.scala | 6 ++++- .../scala/org/apache/spark/SparkContextSuite.scala | 12 +++++++++ .../apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++- 7 files changed, 63 insertions(+), 13 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index bc507a4..f73e3ce 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -427,11 +427,4 @@ public class TransportConf { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); } - - /** - * The application attemptID assigned from Hadoop YARN. - */ - public int appAttemptId() { - return conf.getInt("spark.app.attempt.id", -1); - } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 6dc5fd5..253fb7a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -46,6 +46,8 @@ public abstract class BlockStoreClient implements Closeable { protected volatile TransportClientFactory clientFactory; protected String appId; + // Store the application attemptId + private String appAttemptId; protected TransportConf transportConf; /** @@ -124,6 +126,16 @@ public abstract class BlockStoreClient implements Closeable { assert appId != null : "Called before init()"; } + // Set the application attemptId + public void setAppAttemptId(String appAttemptId) { + this.appAttemptId = appAttemptId; + } + + // Get the application attemptId + public String getAppAttemptId() { + return this.appAttemptId; + } + /** * Request the local disk directories for executors which are located at the same host with * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 4c0e9f3..d2df776 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -52,6 +52,10 @@ public class ExternalBlockStoreClient extends BlockStoreClient { private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; private final long registrationTimeoutMs; + // Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple + // application attempts. This variable is derived from the String typed appAttemptId. If no + // appAttemptId is set, the default comparableAppAttemptId is -1. + private int comparableAppAttemptId = -1; /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, @@ -84,6 +88,26 @@ public class ExternalBlockStoreClient extends BlockStoreClient { } @Override + public void setAppAttemptId(String appAttemptId) { + super.setAppAttemptId(appAttemptId); + setComparableAppAttemptId(appAttemptId); + } + + private void setComparableAppAttemptId(String appAttemptId) { + // For now, push based shuffle only supports running in YARN. + // Application attemptId in YARN is integer and it can be safely parsed + // to integer here. For the application attemptId from other cluster set up + // which is not numeric, it needs to generate this comparableAppAttemptId + // from the String typed appAttemptId through some other customized logic. + try { + this.comparableAppAttemptId = Integer.parseInt(appAttemptId); + } catch (NumberFormatException e) { + logger.warn("Push based shuffle requires comparable application attemptId, " + + "but the appAttemptId {} cannot be parsed to Integer", appAttemptId, e); + } + } + + @Override public void fetchBlocks( String host, int port, @@ -146,7 +170,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { assert inputListener instanceof BlockPushingListener : "Expecting a BlockPushingListener, but got " + inputListener.getClass(); TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId, + new OneForOneBlockPusher(client, appId, comparableAppAttemptId, inputBlockId, (BlockPushingListener) inputListener, buffersWithId).start(); } else { logger.info("This clientFactory was closed. Skipping further block push retries."); @@ -178,8 +202,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient { try { TransportClient client = clientFactory.createClient(host, port); ByteBuffer finalizeShuffleMerge = - new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId, - shuffleMergeId).toByteBuffer(); + new FinalizeShuffleMerge( + appId, comparableAppAttemptId, shuffleId, shuffleMergeId).toByteBuffer(); client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d11fa55..f7d8c79 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -583,7 +583,10 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = _taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) - _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId)) + _applicationAttemptId.foreach { attemptId => + _conf.set(APP_ATTEMPT_ID, attemptId) + _env.blockManager.blockStoreClient.setAppAttemptId(attemptId) + } if (_conf.get(UI_REVERSE_PROXY)) { val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") + "/proxy/" + _applicationId diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ffcb30d..0f8e6d1 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -466,7 +466,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { driverConf.set(EXECUTOR_ID, arguments.executorId) val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) - + // Set the application attemptId in the BlockStoreClient if available. + val appAttemptId = env.conf.get(APP_ATTEMPT_ID) + appAttemptId.foreach(attemptId => + env.blockManager.blockStoreClient.setAppAttemptId(attemptId) + ) val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile) env.rpcEnv.setupEndpoint("Executor", backend) arguments.workerUrl.foreach { url => diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 93677d3..16330ba 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -1317,6 +1317,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } } + + test("SPARK-36772: Store application attemptId in BlockStoreClient for push based shuffle") { + val conf = new SparkConf().setAppName("testAppAttemptId") + .setMaster("pushbasedshuffleclustermanager") + conf.set(PUSH_BASED_SHUFFLE_ENABLED.key, "true") + conf.set(IS_TESTING.key, "true") + conf.set(SHUFFLE_SERVICE_ENABLED.key, "true") + sc = new SparkContext(conf) + val env = SparkEnv.get + assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1")) + } + } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 312d1f8..deddaea 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3961,7 +3961,9 @@ private class PushBasedClusterManager extends ExternalClusterManager { override def createTaskScheduler( sc: SparkContext, - masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) + masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) { + override def applicationAttemptId(): Option[String] = Some("1") + } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { val sc = scheduler.asInstanceOf[TaskSchedulerImpl] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org