This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 953cc1c [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped 953cc1c is described below commit 953cc1c65ac9cc180dc0ca83ced8771b554bb291 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Thu Jan 28 13:06:42 2021 -0800 [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped This PR aims to prevent `HeartbeatReceiver` asks `Executor` to re-register blocker manager when the SparkContext is already stopped. Currently, `HeartbeatReceiver` blindly asks re-registration for the new heartbeat message. However, when SparkContext is stopped, we don't need to re-register new block manager. Re-registration causes unnecessary executors' logs and and a delay on job termination. No. Pass the CIs with the newly added test case. Closes #31373 from dongjoon-hyun/SPARK-34273. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit bc41c5a0e598e6b697ed61c33e1bea629dabfc57) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 8 +++++--- .../org/apache/spark/HeartbeatReceiverSuite.scala | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index bcbc8df..980411d 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -120,6 +120,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // Messages received from executors case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) => + var reregisterBlockManager = !sc.isStopped if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() @@ -127,7 +128,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, accumUpdates, blockManagerId) - val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) + reregisterBlockManager &= unknownExecutor + val response = HeartbeatResponse(reregisterBlockManager) context.reply(response) } }) @@ -137,14 +139,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") - context.reply(HeartbeatResponse(reregisterBlockManager = true)) + context.reply(HeartbeatResponse(reregisterBlockManager)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") - context.reply(HeartbeatResponse(reregisterBlockManager = true)) + context.reply(HeartbeatResponse(reregisterBlockManager)) } } diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index b705556..8c26b1d 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -207,6 +207,24 @@ class HeartbeatReceiverSuite assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2)) } + test("SPARK-34273: Do not reregister BlockManager when SparkContext is stopped") { + val blockManagerId = BlockManagerId(executorId1, "localhost", 12345) + + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( + Heartbeat(executorId1, Array.empty, blockManagerId)) + assert(response.reregisterBlockManager) + + try { + sc.stopped.set(true) + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( + Heartbeat(executorId1, Array.empty, blockManagerId)) + assert(!response.reregisterBlockManager) + } finally { + sc.stopped.set(false) + } + } + /** Manually send a heartbeat and return the response. */ private def triggerHeartbeat( executorId: String, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org