[ https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liupengcheng updated SPARK-27214: --------------------------------- Description: Currently, Spark locality wait mechanism is not friendly for large job, when number of tasks is large(e.g. 10000+)and with a large number of executors(e.g. 2000), executors may be launched on some nodes where the locality is not the best(not the same nodes hold HDFS blocks). There are cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within `spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` seconds a task is finished), so locality level would not be upgraded and lots of pending tasks will wait a long time. In this case, when `spark.dynamicAllocation.enabled=true`, then lots of executors may be removed by Driver due to become idle and finally slow down the job. We encountered this issue in our production spark cluster, it caused lots of resources wasting and slowed down user's application. Actually, we can optimize this by following formula: Suppose numPendingTasks=10000, localityExecutionGainFactor=0.1, probabilityOfNextLocalitySchedule=0.5 {code:java} maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * localityExecutionGainFactor * probabilityOfNextLocalitySchedule totalStarvingTime = sum(starvingTimeByTasks) if (totalStarvingTime > maxStarvingTimeForTasks) { upgrading locality level... }{code} was: Currently, Spark locality wait mechanism is not friendly for large job, when number of tasks is large(e.g. 10000+)and with a large number of executors(e.g. 2000), executors may be launched on some nodes where the locality is not the best(not the same nodes hold HDFS blocks). There are cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within `spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` seconds a task is finished), so locality level would not be upgraded and lots of pending tasks will wait a long time. In this case, when `spark.dynamicAllocation.enabled=true`, then lots of executors may be removed by Driver due to become idle and finally slow down the job. Actually, we can optimize this by following formula: Suppose numPendingTasks=10000, localityExecutionGainFactor=0.1, probabilityOfNextLocalitySchedule=0.5 ``` maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * localityExecutionGainFactor * probabilityOfNextLocalitySchedule / `spark.locality.wait` if (numStavingTasks > maxStarvingTasks) { upgrading locality level... } .... ``` > Upgrading locality level when lots of pending tasks have been waiting more > than locality.wait > --------------------------------------------------------------------------------------------- > > Key: SPARK-27214 > URL: https://issues.apache.org/jira/browse/SPARK-27214 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 2.1.0, 2.4.0 > Reporter: liupengcheng > Priority: Major > > Currently, Spark locality wait mechanism is not friendly for large job, when > number of tasks is large(e.g. 10000+)and with a large number of > executors(e.g. 2000), executors may be launched on some nodes where the > locality is not the best(not the same nodes hold HDFS blocks). There are > cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks > within `spark.locality.wait` but coming at low rate(e.g. every > `spark.locality.wait` seconds a task is finished), so locality level would > not be upgraded and lots of pending tasks will wait a long time. > In this case, when `spark.dynamicAllocation.enabled=true`, then lots of > executors may be removed by Driver due to become idle and finally slow down > the job. > We encountered this issue in our production spark cluster, it caused lots of > resources wasting and slowed down user's application. > Actually, we can optimize this by following formula: > Suppose numPendingTasks=10000, localityExecutionGainFactor=0.1, > probabilityOfNextLocalitySchedule=0.5 > {code:java} > maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * > localityExecutionGainFactor * probabilityOfNextLocalitySchedule > totalStarvingTime = sum(starvingTimeByTasks) > if (totalStarvingTime > maxStarvingTimeForTasks) > { upgrading locality level... }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org