This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 1431df01dcb [SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout 1431df01dcb is described below commit 1431df01dcb10fa85ce2ca6cd065e4be0af85585 Author: Warren Zhu <warren.zh...@gmail.com> AuthorDate: Mon Jun 12 00:40:13 2023 -0700 [SPARK-43398][CORE] Executor timeout should be max of idle shuffle and rdd timeout ### What changes were proposed in this pull request? Executor timeout should be max of idle, shuffle and rdd timeout ### Why are the changes needed? Wrong timeout value when combining idle, shuffle and rdd timeout ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added test in `ExecutorMonitorSuite` Closes #41082 from warrenzhu25/max-timeout. Authored-by: Warren Zhu <warren.zh...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 7107742a381cde2e6de9425e3e436282a8c0d27c) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 15 +++++------- .../scheduler/dynalloc/ExecutorMonitorSuite.scala | 28 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index fc9248de7ee..34878b8e561 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -563,17 +563,14 @@ private[spark] class ExecutorMonitor( def updateTimeout(): Unit = { val oldDeadline = timeoutAt val newDeadline = if (idleStart >= 0) { - val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) { - val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else Long.MaxValue - val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) { - shuffleTimeoutNs - } else { - Long.MaxValue - } - math.min(_cacheTimeout, _shuffleTimeout) + val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else 0 + val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) { + shuffleTimeoutNs } else { - idleTimeoutNs + 0 } + // timeout should be max of idleTimeout, storageTimeout and shuffleTimeout + val timeout = Seq(_cacheTimeout, _shuffleTimeout, idleTimeoutNs).max val deadline = idleStart + timeout if (deadline >= 0) deadline else Long.MaxValue } else { diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index da8c97e54d1..48382550090 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -476,6 +476,34 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.executorCount == 0 ) } + for (isShuffleTrackingEnabled <- Seq(true, false)) { + test(s"SPARK-43398: executor timeout should be max of shuffle and rdd timeout with" + + s" shuffleTrackingEnabled as $isShuffleTrackingEnabled") { + conf + .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT.key, "240s") + .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, isShuffleTrackingEnabled) + .set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource()) + + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + knownExecs += "1" + val stage1 = stageInfo(1, shuffleId = 0) + monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1))) + monitor.onBlockUpdated(rddUpdate(1, 0, "1")) + val t1 = taskInfo("1", 1) + monitor.onTaskStart(SparkListenerTaskStart(1, 1, t1)) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, t1, new ExecutorMetrics, null)) + monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded)) + + if (isShuffleTrackingEnabled) { + assert(monitor.timedOutExecutors(storageDeadline).isEmpty) + assert(monitor.timedOutExecutors(shuffleDeadline) == Seq("1")) + } else { + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + assert(monitor.timedOutExecutors(storageDeadline) == Seq("1")) + } + } + } private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1 private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org