[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360777925 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) Review comment: what about other `SchedulerBackend`? We only send `RemoveExecutor` event for `CoarseGrainedSchedulerBackend` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360778246 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: > In case of the executors which are not gracefully shut down Do you mean `CoarseGrainedSchedulerBackend` is the only one that may shut down executors non-gracefully? And if it shuts down gracefully, is it OK to have 2 `RemoveExecutor` events? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360778246 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: > In case of the executors which are not gracefully shut down Do you mean `CoarseGrainedSchedulerBackend` is the only one that may shut down executors gracefully? And if it shuts down gracefully, is it OK to have 2 `RemoveExecutor` events? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360787039 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: then why do we only match `CoarseGrainedSchedulerBackend` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360791173 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: `SchedulerBackend` is private so we don't need to worry about plugins. Let me ask the question again: is `CoarseGrainedSchedulerBackend` the only one that may shut down executors non-gracefully? According to what you said, it covers all the scheduler backend so the answer is yes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360796532 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: do you mean only `CoarseGrainedSchedulerBackend` can't handle non-graceful executor shutdown well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360817459 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: only means we only need to match `CoarseGrainedSchedulerBackend` here, nothing else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360817535 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) Review comment: what if we don't remove it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360857961 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) Review comment: shall we still do this for non-`CoarseGrainedSchedulerBackend`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r361269391 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,25 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// SPARK-27348: in case of the executors which are not gracefully shut down, +// we should remove lost executors from CoarseGrainedSchedulerBackend manually +// here to guarantee two things: +// 1) call scheduler.executorLost() underlying to fail any tasks assigned to Review comment: Will we call `scheduler.executorLost` for `LocalSchedulerBackend`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r361269466 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,25 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// SPARK-27348: in case of the executors which are not gracefully shut down, +// we should remove lost executors from CoarseGrainedSchedulerBackend manually +// here to guarantee two things: +// 1) call scheduler.executorLost() underlying to fail any tasks assigned to Review comment: nvm, `LocalSchedulerBackend` only has a single executor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r361269587 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,25 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// SPARK-27348: in case of the executors which are not gracefully shut down, +// we should remove lost executors from CoarseGrainedSchedulerBackend manually +// here to guarantee two things: +// 1) call scheduler.executorLost() underlying to fail any tasks assigned to +//those executors to avoid app hang +// 2) always remove executor information from CoarseGrainedSchedulerBackend for +//a lost executor +sc.schedulerBackend match { + case backend: CoarseGrainedSchedulerBackend => +backend.driverEndpoint.send(RemoveExecutor(executorId, + SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms"))) + case _ => Review comment: we should match `LocalSchedulerBackend`, do nothing but leaving a comment to say that `LocalSchedulerBackend`only has a single executor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r361269779 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,25 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + Review comment: does it hurt to call `scheduler.executorLost` twice? Since we assume killing executors may take a while, looks reasonable to call `scheduler.executorLost` and re-submit tasks eariler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r361571352 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,25 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + Review comment: makes sense! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r361574984 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +202,27 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// SPARK-27348: in case of the executors which are not gracefully shut down, +// we should remove lost executors from CoarseGrainedSchedulerBackend manually +// here to guarantee two things: +// 1) call scheduler.executorLost() underlying to fail any tasks assigned to +//those executors to avoid app hang +// 2) always remove executor information from CoarseGrainedSchedulerBackend for +//a lost executor +sc.schedulerBackend match { + case backend: CoarseGrainedSchedulerBackend => +backend.driverEndpoint.send(RemoveExecutor(executorId, + SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms"))) + case _: LocalSchedulerBackend => + // LocalSchedulerBackend is used locally and only has one single executor + case _ => Review comment: let's throw an exception here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r361592123 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +202,28 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// SPARK-27348: in case of the executors which are not gracefully shut down, +// we should remove lost executors from CoarseGrainedSchedulerBackend manually +// here to guarantee two things: +// 1) explicitly remove executor information from CoarseGrainedSchedulerBackend for +//a lost executor instead of waiting for disconnect message +// 2) call scheduler.executorLost() underlying to fail any tasks assigned to +//those executors to avoid app hang +sc.schedulerBackend match { + case backend: CoarseGrainedSchedulerBackend => +backend.driverEndpoint.send(RemoveExecutor(executorId, + SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms"))) + case _: LocalSchedulerBackend => + // LocalSchedulerBackend is used locally and only has one single executor Review comment: the comment is in a wrong place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org