Arnaud Nauwynck created SPARK-46659:
---------------------------------------

             Summary: Add customizable TaskScheduling param, to avoid randomly 
choosing executor for tasks, and downscale on low micro-batches activity
                 Key: SPARK-46659
                 URL: https://issues.apache.org/jira/browse/SPARK-46659
             Project: Spark
          Issue Type: Wish
          Components: Spark Core
    Affects Versions: 3.5.0, 3.4.0, 4.0.0
            Reporter: Arnaud Nauwynck


When using dynamicAllocation (but not spark.decommission.enabled=true) with a 
micro-batches activity, very small tasks are arriving at regular interval, and 
are processed extremely quickly.
The flow of events that are processed may consume less than 1% of the cpu of 
the cluster.
But globally, the number of executors stay at a high level 
(spark.dynamicAllocation.maxExecutors) eventhough they are all 99% of the time 
IDDLE.

Unfortunatly, in the current code, tasks are assigned randomly to executors, so 
a constant flow of very small tasks maintain artificially in an "active" status 
all the executors: 
all executors are receiving tasks from time to time, so strictly speaking, they 
are never considered as IDDLE during a duration longer than 
"spark.dynamicAllocation.executorIdleTimeout". 

Therefore, executors are never marked as candidate for decommissioning, and 
they continue to receive tasks forever, while thoses tasks could easily be 
assigned to any other executor (chosen not randomly).


The proposition is therefore to add a new configuration property to suppress 
the random shuffling of assignable offers for task.

see this code 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773]
{{
  /**
   * Shuffle offers around to avoid always placing tasks on the same workers.  
Exposed to allow
   * overriding in tests, so it can be deterministic.
   */
  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
    Random.shuffle(offers)
  }
}}

It could be replaced simply by
{{
    val SKIP_RANDOMIZE_WORKER_OFFERS =  
ConfigBuilder("spark.task.skipRandomizeWorkerOffers")
      .version("3.6.0")
      .booleanConf
      .createWithDefault(false)
..

    val skipRandomizeWorkerOffers = conf.get(SKIP_RANDOMIZE_WORKER_OFFERS)

..

  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
    if (skipRandomizeWorkerOffers) {
       offers
    } else {
       Random.shuffle(offers)
    }
  }
}}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to