Arnaud Nauwynck created SPARK-46661:
---------------------------------------

             Summary: Add customizable property 
spark.dynamicAllocation.lastExecutorIdleTimeout for last remaining executor, 
defaulting to spark.dynamicAllocation.executorIdleTimeout
                 Key: SPARK-46661
                 URL: https://issues.apache.org/jira/browse/SPARK-46661
             Project: Spark
          Issue Type: Wish
          Components: Spark Core
    Affects Versions: 3.5.0, 4.0.0
            Reporter: Arnaud Nauwynck


when using dynamicAllocation, the parameter 
"spark.dynamicAllocation.executorIdleTimeout" is used for any executor, 
regardless whether it is the last running one, or any other useless one.

However, it might be interresting to preserve the last executor running longer 
when it is the last remaining one, so that any incoming new task would be 
immediatly processed faster, instead of waiting for a complete restart of 
executors that may take >= 30 secondes.

This is particularly frequent in scenario when using Spark Streaming, and when 
polling for micro-batches. Preserving 1 alive executors help responding faster, 
while still allowing dynamic alllocation for 2,3..N executors.

In practise, this might change only the following source code lines 

In 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653]
to add
{code:java}
  private[spark] val DYN_ALLOCATION_LAST_EXECUTOR_IDLE_TIMEOUT =
    ConfigBuilder("spark.dynamicAllocation.lastExecutorIdleTimeout")
      .version("3.6.0")
      .timeConf(TimeUnit.SECONDS)
      .checkValue(_ >= 0L, "Last Timeout must be >= 0 (and preferrably >= 
spark.dynamicAllocation.executorIdleTimeout)")
      .createWithDefault(60)
{code}



In  
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46
|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46
]

{code:java}
  private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
{code}
to add
{code:java}
  private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
  private val lastIdleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_LAST_IDLE_TIMEOUT))
{code}


and replace (insert if-condition) in
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573
|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573
]

{code:java}
    def updateTimeout(): Unit = {
...
             val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
idleTimeoutNs).max
...
{code}

to be something like

{code:java}
    def updateTimeout(): Unit = {
...
          val isOnlyOneLastExecutorRemaining = ....
          val currIddleTimeoutNs = if (isOnlyOneLastExecutorRemaining) 
lastIdleTimeoutNs else idleTimeoutNs
          val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
currIddleTimeoutNs).max
...
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to