[ 
https://issues.apache.org/jira/browse/SPARK-46661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Nauwynck updated SPARK-46661:
------------------------------------
    Description: 
when using dynamicAllocation, the parameter 
"spark.dynamicAllocation.executorIdleTimeout" is used for any executor, 
regardless whether it is the last running one, or any other useless one.

However, it might be interresting to preserve the last executor running longer 
when it is the last remaining one, so that any incoming new task would be 
immediatly processed faster, instead of waiting for a complete restart of 
executors that may take >= 30 secondes.

This is particularly frequent in scenario when using Spark Streaming, and when 
polling for micro-batches. Preserving 1 alive executors help responding faster, 
while still allowing dynamic alllocation for 2,3..N executors.

In practise, this might change only the following source code lines 

In 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653]
to add
{code:java}
  private[spark] val DYN_ALLOCATION_LAST_EXECUTOR_IDLE_TIMEOUT =
    ConfigBuilder("spark.dynamicAllocation.lastExecutorIdleTimeout")
      .version("3.6.0")
      .timeConf(TimeUnit.SECONDS)
      .checkValue(_ >= 0L, "Last Timeout must be >= 0 (and preferrably >= 
spark.dynamicAllocation.executorIdleTimeout)")
      .createWithDefault(60)
{code}



In  
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46]

{code:java}
  private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
{code}
to add
{code:java}
  private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
  private val lastIdleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_LAST_IDLE_TIMEOUT))
{code}


and replace (insert if-condition) in
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573]

{code:java}
    def updateTimeout(): Unit = {
...
             val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
idleTimeoutNs).max
...
{code}

to be something like

{code:java}
    def updateTimeout(): Unit = {
...
          val isOnlyOneLastExecutorRemaining = ....
          val currIddleTimeoutNs = if (isOnlyOneLastExecutorRemaining) 
lastIdleTimeoutNs else idleTimeoutNs
          val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
currIddleTimeoutNs).max
...
{code}


  was:
when using dynamicAllocation, the parameter 
"spark.dynamicAllocation.executorIdleTimeout" is used for any executor, 
regardless whether it is the last running one, or any other useless one.

However, it might be interresting to preserve the last executor running longer 
when it is the last remaining one, so that any incoming new task would be 
immediatly processed faster, instead of waiting for a complete restart of 
executors that may take >= 30 secondes.

This is particularly frequent in scenario when using Spark Streaming, and when 
polling for micro-batches. Preserving 1 alive executors help responding faster, 
while still allowing dynamic alllocation for 2,3..N executors.

In practise, this might change only the following source code lines 

In 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653]
to add
{code:java}
  private[spark] val DYN_ALLOCATION_LAST_EXECUTOR_IDLE_TIMEOUT =
    ConfigBuilder("spark.dynamicAllocation.lastExecutorIdleTimeout")
      .version("3.6.0")
      .timeConf(TimeUnit.SECONDS)
      .checkValue(_ >= 0L, "Last Timeout must be >= 0 (and preferrably >= 
spark.dynamicAllocation.executorIdleTimeout)")
      .createWithDefault(60)
{code}



In  
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46
|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46
]

{code:java}
  private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
{code}
to add
{code:java}
  private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
  private val lastIdleTimeoutNs = TimeUnit.SECONDS.toNanos(
    conf.get(DYN_ALLOCATION_EXECUTOR_LAST_IDLE_TIMEOUT))
{code}


and replace (insert if-condition) in
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573
|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573
]

{code:java}
    def updateTimeout(): Unit = {
...
             val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
idleTimeoutNs).max
...
{code}

to be something like

{code:java}
    def updateTimeout(): Unit = {
...
          val isOnlyOneLastExecutorRemaining = ....
          val currIddleTimeoutNs = if (isOnlyOneLastExecutorRemaining) 
lastIdleTimeoutNs else idleTimeoutNs
          val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
currIddleTimeoutNs).max
...
{code}



> Add customizable property spark.dynamicAllocation.lastExecutorIdleTimeout for 
> last remaining executor, defaulting to 
> spark.dynamicAllocation.executorIdleTimeout
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-46661
>                 URL: https://issues.apache.org/jira/browse/SPARK-46661
>             Project: Spark
>          Issue Type: Wish
>          Components: Spark Core
>    Affects Versions: 3.5.0, 4.0.0
>            Reporter: Arnaud Nauwynck
>            Priority: Minor
>
> when using dynamicAllocation, the parameter 
> "spark.dynamicAllocation.executorIdleTimeout" is used for any executor, 
> regardless whether it is the last running one, or any other useless one.
> However, it might be interresting to preserve the last executor running 
> longer when it is the last remaining one, so that any incoming new task would 
> be immediatly processed faster, instead of waiting for a complete restart of 
> executors that may take >= 30 secondes.
> This is particularly frequent in scenario when using Spark Streaming, and 
> when polling for micro-batches. Preserving 1 alive executors help responding 
> faster, while still allowing dynamic alllocation for 2,3..N executors.
> In practise, this might change only the following source code lines 
> In 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L647-L653]
> to add
> {code:java}
>   private[spark] val DYN_ALLOCATION_LAST_EXECUTOR_IDLE_TIMEOUT =
>     ConfigBuilder("spark.dynamicAllocation.lastExecutorIdleTimeout")
>       .version("3.6.0")
>       .timeConf(TimeUnit.SECONDS)
>       .checkValue(_ >= 0L, "Last Timeout must be >= 0 (and preferrably >= 
> spark.dynamicAllocation.executorIdleTimeout)")
>       .createWithDefault(60)
> {code}
> In  
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L46]
> {code:java}
>   private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
>     conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
> {code}
> to add
> {code:java}
>   private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
>     conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
>   private val lastIdleTimeoutNs = TimeUnit.SECONDS.toNanos(
>     conf.get(DYN_ALLOCATION_EXECUTOR_LAST_IDLE_TIMEOUT))
> {code}
> and replace (insert if-condition) in
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L573]
> {code:java}
>     def updateTimeout(): Unit = {
> ...
>              val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
> idleTimeoutNs).max
> ...
> {code}
> to be something like
> {code:java}
>     def updateTimeout(): Unit = {
> ...
>           val isOnlyOneLastExecutorRemaining = ....
>           val currIddleTimeoutNs = if (isOnlyOneLastExecutorRemaining) 
> lastIdleTimeoutNs else idleTimeoutNs
>           val timeout = Seq(_cacheTimeout, _shuffleTimeout, 
> currIddleTimeoutNs).max
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to