[ 
https://issues.apache.org/jira/browse/SPARK-46659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Nauwynck updated SPARK-46659:
------------------------------------
    Description: 
When using dynamicAllocation (but not spark.decommission.enabled=true) with a 
micro-batches activity, very small tasks are arriving at regular interval, and 
are processed extremely quickly.
The flow of events that are processed may consume less than 1% of the cpu of 
the cluster.
But globally, the number of executors stay at a high level 
(spark.dynamicAllocation.maxExecutors) eventhough they are all 99% of the time 
IDDLE.

Unfortunatly, in the current code, tasks are assigned randomly to executors, so 
a constant flow of very small tasks maintain artificially in an "active" status 
all the executors: 
all executors are receiving tasks from time to time, so strictly speaking, they 
are never considered as IDDLE during a duration longer than 
"spark.dynamicAllocation.executorIdleTimeout". 

Therefore, executors are never marked as candidate for decommissioning, and 
they continue to receive tasks forever, while thoses tasks could easily be 
assigned to any other executor (chosen not randomly).


The proposition is therefore to add a new configuration property to suppress 
the random shuffling of assignable offers for task.

see this code 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773]


{code:java}
  /**
   * Shuffle offers around to avoid always placing tasks on the same workers.  
Exposed to allow
   * overriding in tests, so it can be deterministic.
   */
  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
    Random.shuffle(offers)
  }
{code}

It could be replaced simply by

{code:java}
    val SKIP_RANDOMIZE_WORKER_OFFERS =  
ConfigBuilder("spark.task.skipRandomizeWorkerOffers")
      .version("3.6.0")
      .booleanConf
      .createWithDefault(false)
..

    val skipRandomizeWorkerOffers = conf.get(SKIP_RANDOMIZE_WORKER_OFFERS)

..

  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
    if (skipRandomizeWorkerOffers) {
       offers
    } else {
       Random.shuffle(offers)
    }
  }
{code}



  was:
When using dynamicAllocation (but not spark.decommission.enabled=true) with a 
micro-batches activity, very small tasks are arriving at regular interval, and 
are processed extremely quickly.
The flow of events that are processed may consume less than 1% of the cpu of 
the cluster.
But globally, the number of executors stay at a high level 
(spark.dynamicAllocation.maxExecutors) eventhough they are all 99% of the time 
IDDLE.

Unfortunatly, in the current code, tasks are assigned randomly to executors, so 
a constant flow of very small tasks maintain artificially in an "active" status 
all the executors: 
all executors are receiving tasks from time to time, so strictly speaking, they 
are never considered as IDDLE during a duration longer than 
"spark.dynamicAllocation.executorIdleTimeout". 

Therefore, executors are never marked as candidate for decommissioning, and 
they continue to receive tasks forever, while thoses tasks could easily be 
assigned to any other executor (chosen not randomly).


The proposition is therefore to add a new configuration property to suppress 
the random shuffling of assignable offers for task.

see this code 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773]

``` java
  /**
   * Shuffle offers around to avoid always placing tasks on the same workers.  
Exposed to allow
   * overriding in tests, so it can be deterministic.
   */
  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
    Random.shuffle(offers)
  }
```

It could be replaced simply by

``` java
    val SKIP_RANDOMIZE_WORKER_OFFERS =  
ConfigBuilder("spark.task.skipRandomizeWorkerOffers")
      .version("3.6.0")
      .booleanConf
      .createWithDefault(false)
..

    val skipRandomizeWorkerOffers = conf.get(SKIP_RANDOMIZE_WORKER_OFFERS)

..

  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
    if (skipRandomizeWorkerOffers) {
       offers
    } else {
       Random.shuffle(offers)
    }
  }
```



> Add customizable TaskScheduling param, to avoid randomly choosing executor 
> for tasks, and downscale on low micro-batches activity
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-46659
>                 URL: https://issues.apache.org/jira/browse/SPARK-46659
>             Project: Spark
>          Issue Type: Wish
>          Components: Spark Core
>    Affects Versions: 3.4.0, 3.5.0, 4.0.0
>            Reporter: Arnaud Nauwynck
>            Priority: Minor
>
> When using dynamicAllocation (but not spark.decommission.enabled=true) with a 
> micro-batches activity, very small tasks are arriving at regular interval, 
> and are processed extremely quickly.
> The flow of events that are processed may consume less than 1% of the cpu of 
> the cluster.
> But globally, the number of executors stay at a high level 
> (spark.dynamicAllocation.maxExecutors) eventhough they are all 99% of the 
> time IDDLE.
> Unfortunatly, in the current code, tasks are assigned randomly to executors, 
> so a constant flow of very small tasks maintain artificially in an "active" 
> status all the executors: 
> all executors are receiving tasks from time to time, so strictly speaking, 
> they are never considered as IDDLE during a duration longer than 
> "spark.dynamicAllocation.executorIdleTimeout". 
> Therefore, executors are never marked as candidate for decommissioning, and 
> they continue to receive tasks forever, while thoses tasks could easily be 
> assigned to any other executor (chosen not randomly).
> The proposition is therefore to add a new configuration property to suppress 
> the random shuffling of assignable offers for task.
> see this code 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773]
> {code:java}
>   /**
>    * Shuffle offers around to avoid always placing tasks on the same workers. 
>  Exposed to allow
>    * overriding in tests, so it can be deterministic.
>    */
>   protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
> IndexedSeq[WorkerOffer] = {
>     Random.shuffle(offers)
>   }
> {code}
> It could be replaced simply by
> {code:java}
>     val SKIP_RANDOMIZE_WORKER_OFFERS =  
> ConfigBuilder("spark.task.skipRandomizeWorkerOffers")
>       .version("3.6.0")
>       .booleanConf
>       .createWithDefault(false)
> ..
>     val skipRandomizeWorkerOffers = conf.get(SKIP_RANDOMIZE_WORKER_OFFERS)
> ..
>   protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
> IndexedSeq[WorkerOffer] = {
>     if (skipRandomizeWorkerOffers) {
>        offers
>     } else {
>        Random.shuffle(offers)
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to