[ 
https://issues.apache.org/jira/browse/SPARK-52589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Cheng updated SPARK-52589:
----------------------------------
    Description: 
_TLDR: After upgrading from Spark 3.5.1 to 3.5.5, we observe that jobs using 
dynamic allocation on Kubernetes with 
{{spark.dynamicAllocation.minExecutors=0}} intermittently hang: when all 
executors are decommissioned, no new executors are allocated even if there is 
pending work. Reverting to 3.5.1 resolves the issue. This appears to be a 
severe regression._

*Repro Steps*
 # Use Spark 3.5.5 on Kubernetes with dynamic allocation enabled and 
{{{}spark.dynamicAllocation.minExecutors=0{}}}.
 # Submit a job that causes all executors to be decommissioned (e.g., idle 
period).
 # Submit more work to the same job/application.
 # Observe that no new executors are allocated, and the job occasionally hangs 
with pending work.
 # Reverting to 3.5.1, the same job works as expected.

Following a bump from 3.5.1 to 3.5.5, we started observing that when running 
Spark on Kubernetes with dynamic allocation and 
{{{}spark.dynamicAllocation.minExecutors=0{}}}, executor 
removal/decommissioning can sometimes leave the system in a state where no new 
executors are allocated, even though there is pending work. Our hypothesis is 
that there's a race or missed event between the Kubernetes pod watcher, the 
internal executor state in the driver, and {{{}ExecutorPodsAllocator{}}}, 
resulting in a "ghost executor" that prevents further scaling.

*Summary of our Investigation (via logging ExecutorPodsAllocator):*
 # We noticed that we never receive a log from {{ExecutorPodsAllocator}} 
confirming we make a request to Kubernetes for another executor. We added logs 
to the conditional that triggers that code block, and we found that 
{{podsToAllocateWithRpId.size =}} 0, which is why we never request an 
additional executor.
 # Next, we checked why the podsToAllocateWithRpId map is empty – we see that 
it only adds an element if (podCountForRpId < targetNum). We added logs, and we 
confirmed that even though {{podCountForRpId}} should equal 0 when the only 
previous executor is decommissioned (and thus be less than {{targetNum}} = 1), 
{{podCountForRpId}} is still equal to 1.
 # 3. We investigated why {{podCountForRpId}} == 1, and we found that  
{{schedulerKnownNewlyCreatedExecsForRpId}} still contains an entry for the 
previously created (and now decomissioned) executor. Logging confirmed that it 
is *not*
because {{schedulerBackend.getExecutorIds()}} still contains an entry for a 
deleted executor post-decommission; the scheduler definitely removed the 
executor properly. Instead, we are finding that the culprit is here:
{code:java}
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds{code}
We are logging that {{k8sKnownExecIds}} is always empty each 
{{onNewSnapshots()}} cycle throughout the entire Spark job. The intended 
behavior is for the original executor to have appeared in a snapshot and be 
removed from {{schedulerKnownNewlyCreatedExecs}} – that never happens in these 
hanging jobs, and there is no timeout logic to clear executors from 
{{{}schedulerKnownNewlyCreatedExecs{}}}. The pod allocator permanently is 
waiting for a newly created executor from the scheduler to appear in a 
snapshot. 

We can confirm that reverting to 3.5.1 stops these intermittent hanging jobs. 
We've looked through the commits between these two versions, and we are stumped 
because we don't see obviously related changes in dynamic allocation or pod 
snapshot logic. We really need to solve these hangs ASAP – any guidance would 
be appreciated!

  was:
_TLDR: After upgrading from Spark 3.5.1 to 3.5.5, we observe that jobs using 
dynamic allocation on Kubernetes with 
{{spark.dynamicAllocation.minExecutors=0}} intermittently hang: when all 
executors are decommissioned, no new executors are allocated even if there is 
pending work. Reverting to 3.5.1 resolves the issue. This appears to be a 
severe regression._

*Repro Steps*
 # Use Spark 3.5.5 on Kubernetes with dynamic allocation enabled and 
{{{}spark.dynamicAllocation.minExecutors=0{}}}.
 # Submit a job that causes all executors to be decommissioned (e.g., idle 
period).
 # Submit more work to the same job/application.
 # Observe that no new executors are allocated, and the job occasionally hangs 
with pending work.
 # Reverting to 3.5.1, the same job works as expected.

Following a bump from 3.5.1 to 3.5.5, we started observing that when running 
Spark on Kubernetes with dynamic allocation and 
{{{}spark.dynamicAllocation.minExecutors=0{}}}, executor 
removal/decommissioning can sometimes leave the system in a state where no new 
executors are allocated, even though there is pending work. Our hypothesis is 
that there's a race or missed event between the Kubernetes pod watcher, the 
internal executor state in the driver, and {{{}ExecutorPodsAllocator{}}}, 
resulting in a "ghost executor" that prevents further scaling.

*Summary of our Investigation (via logging ExecutorPodsAllocator):*
 # We noticed that we never receive a log from {{ExecutorPodsAllocator}} 
confirming we make a request to Kubernetes for another executor. We added logs 
to the conditional that triggers that code block, and we found that 
{{podsToAllocateWithRpId.size =}} 0, which is why we never request an 
additional executor.
 # Next, we checked why the {{podsToAllocateWithRpId }}map is empty – we see 
that it only adds an element if{{{}(podCountForRpId < targetNum){}}}. We added 
logs, and we confirmed that even though {{podCountForRpId}} should equal 0 when 
the only previous executor is decommissioned (and thus be less than 
{{targetNum}} = 1), {{podCountForRpId}} is still equal to 1.
 # 3. We investigated why {{podCountForRpId}} == 1, and we found that  
{{schedulerKnownNewlyCreatedExecsForRpId}} still contains an entry for the 
previously created (and now decomissioned) executor. Logging confirmed that it 
is *not*
because {{schedulerBackend.getExecutorIds()}} still contains an entry for a 
deleted executor post-decommission; the scheduler definitely removed the 
executor properly. Instead, we are finding that the culprit is here:
{code:java}
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds{code}
We are logging that {{k8sKnownExecIds}} is always empty each 
{{onNewSnapshots()}} cycle throughout the entire Spark job. The intended 
behavior is for the original executor to have appeared in a snapshot and be 
removed from {{schedulerKnownNewlyCreatedExecs}} – that never happens in these 
hanging jobs, and there is no timeout logic to clear executors from 
{{{}schedulerKnownNewlyCreatedExecs{}}}. The pod allocator permanently is 
waiting for a newly created executor from the scheduler to appear in a 
snapshot. 

We can confirm that reverting to 3.5.1 stops these intermittent hanging jobs. 
We've looked through the commits between these two versions, and we are stumped 
because we don't see obviously related changes in dynamic allocation or pod 
snapshot logic. We really need to solve these hangs ASAP – any guidance would 
be appreciated!


> Spark 3.5.5 on Kubernetes with Dynamic Allocation Hangs (minExecutors=0) Due 
> to Stale Ghost Executors
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-52589
>                 URL: https://issues.apache.org/jira/browse/SPARK-52589
>             Project: Spark
>          Issue Type: Bug
>          Components: k8s, Spark Core
>    Affects Versions: 3.5.5
>            Reporter: Richard Cheng
>            Priority: Major
>
> _TLDR: After upgrading from Spark 3.5.1 to 3.5.5, we observe that jobs using 
> dynamic allocation on Kubernetes with 
> {{spark.dynamicAllocation.minExecutors=0}} intermittently hang: when all 
> executors are decommissioned, no new executors are allocated even if there is 
> pending work. Reverting to 3.5.1 resolves the issue. This appears to be a 
> severe regression._
> *Repro Steps*
>  # Use Spark 3.5.5 on Kubernetes with dynamic allocation enabled and 
> {{{}spark.dynamicAllocation.minExecutors=0{}}}.
>  # Submit a job that causes all executors to be decommissioned (e.g., idle 
> period).
>  # Submit more work to the same job/application.
>  # Observe that no new executors are allocated, and the job occasionally 
> hangs with pending work.
>  # Reverting to 3.5.1, the same job works as expected.
> Following a bump from 3.5.1 to 3.5.5, we started observing that when running 
> Spark on Kubernetes with dynamic allocation and 
> {{{}spark.dynamicAllocation.minExecutors=0{}}}, executor 
> removal/decommissioning can sometimes leave the system in a state where no 
> new executors are allocated, even though there is pending work. Our 
> hypothesis is that there's a race or missed event between the Kubernetes pod 
> watcher, the internal executor state in the driver, and 
> {{{}ExecutorPodsAllocator{}}}, resulting in a "ghost executor" that prevents 
> further scaling.
> *Summary of our Investigation (via logging ExecutorPodsAllocator):*
>  # We noticed that we never receive a log from {{ExecutorPodsAllocator}} 
> confirming we make a request to Kubernetes for another executor. We added 
> logs to the conditional that triggers that code block, and we found that 
> {{podsToAllocateWithRpId.size =}} 0, which is why we never request an 
> additional executor.
>  # Next, we checked why the podsToAllocateWithRpId map is empty – we see that 
> it only adds an element if (podCountForRpId < targetNum). We added logs, and 
> we confirmed that even though {{podCountForRpId}} should equal 0 when the 
> only previous executor is decommissioned (and thus be less than {{targetNum}} 
> = 1), {{podCountForRpId}} is still equal to 1.
>  # 3. We investigated why {{podCountForRpId}} == 1, and we found that  
> {{schedulerKnownNewlyCreatedExecsForRpId}} still contains an entry for the 
> previously created (and now decomissioned) executor. Logging confirmed that 
> it is *not*
> because {{schedulerBackend.getExecutorIds()}} still contains an entry for a 
> deleted executor post-decommission; the scheduler definitely removed the 
> executor properly. Instead, we are finding that the culprit is here:
> {code:java}
> val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
> newlyCreatedExecutors --= k8sKnownExecIds
> schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds{code}
> We are logging that {{k8sKnownExecIds}} is always empty each 
> {{onNewSnapshots()}} cycle throughout the entire Spark job. The intended 
> behavior is for the original executor to have appeared in a snapshot and be 
> removed from {{schedulerKnownNewlyCreatedExecs}} – that never happens in 
> these hanging jobs, and there is no timeout logic to clear executors from 
> {{{}schedulerKnownNewlyCreatedExecs{}}}. The pod allocator permanently is 
> waiting for a newly created executor from the scheduler to appear in a 
> snapshot. 
> We can confirm that reverting to 3.5.1 stops these intermittent hanging jobs. 
> We've looked through the commits between these two versions, and we are 
> stumped because we don't see obviously related changes in dynamic allocation 
> or pod snapshot logic. We really need to solve these hangs ASAP – any 
> guidance would be appreciated!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to