This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 953cc1c  [SPARK-34273][CORE] Do not reregister BlockManager when 
SparkContext is stopped
953cc1c is described below

commit 953cc1c65ac9cc180dc0ca83ced8771b554bb291
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Jan 28 13:06:42 2021 -0800

    [SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is 
stopped
    
    This PR aims to prevent `HeartbeatReceiver` asks `Executor` to re-register 
blocker manager when the SparkContext is already stopped.
    
    Currently, `HeartbeatReceiver` blindly asks re-registration for the new 
heartbeat message.
    However, when SparkContext is stopped, we don't need to re-register new 
block manager.
    Re-registration causes unnecessary executors' logs and and a delay on job 
termination.
    
    No.
    
    Pass the CIs with the newly added test case.
    
    Closes #31373 from dongjoon-hyun/SPARK-34273.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit bc41c5a0e598e6b697ed61c33e1bea629dabfc57)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/HeartbeatReceiver.scala     |  8 +++++---
 .../org/apache/spark/HeartbeatReceiverSuite.scala      | 18 ++++++++++++++++++
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index bcbc8df..980411d 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -120,6 +120,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
     // Messages received from executors
     case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
+      var reregisterBlockManager = !sc.isStopped
       if (scheduler != null) {
         if (executorLastSeen.contains(executorId)) {
           executorLastSeen(executorId) = clock.getTimeMillis()
@@ -127,7 +128,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
             override def run(): Unit = Utils.tryLogNonFatalError {
               val unknownExecutor = !scheduler.executorHeartbeatReceived(
                 executorId, accumUpdates, blockManagerId)
-              val response = HeartbeatResponse(reregisterBlockManager = 
unknownExecutor)
+              reregisterBlockManager &= unknownExecutor
+              val response = HeartbeatResponse(reregisterBlockManager)
               context.reply(response)
             }
           })
@@ -137,14 +139,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
           // not log warning here. Otherwise there may be a lot of noise 
especially if
           // we explicitly remove executors (SPARK-4134).
           logDebug(s"Received heartbeat from unknown executor $executorId")
-          context.reply(HeartbeatResponse(reregisterBlockManager = true))
+          context.reply(HeartbeatResponse(reregisterBlockManager))
         }
       } else {
         // Because Executor will sleep several seconds before sending the 
first "Heartbeat", this
         // case rarely happens. However, if it really happens, log it and ask 
the executor to
         // register itself again.
         logWarning(s"Dropping $heartbeat because TaskScheduler is not ready 
yet")
-        context.reply(HeartbeatResponse(reregisterBlockManager = true))
+        context.reply(HeartbeatResponse(reregisterBlockManager))
       }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index b705556..8c26b1d 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -207,6 +207,24 @@ class HeartbeatReceiverSuite
     assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, 
executorId2))
   }
 
+  test("SPARK-34273: Do not reregister BlockManager when SparkContext is 
stopped") {
+    val blockManagerId = BlockManagerId(executorId1, "localhost", 12345)
+
+    heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
+    val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
+      Heartbeat(executorId1, Array.empty, blockManagerId))
+    assert(response.reregisterBlockManager)
+
+    try {
+      sc.stopped.set(true)
+      val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
+        Heartbeat(executorId1, Array.empty, blockManagerId))
+      assert(!response.reregisterBlockManager)
+    } finally {
+      sc.stopped.set(false)
+    }
+  }
+
   /** Manually send a heartbeat and return the response. */
   private def triggerHeartbeat(
       executorId: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to