[
https://issues.apache.org/jira/browse/SPARK-52589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Richard Cheng updated SPARK-52589:
----------------------------------
Description:
_TLDR: After upgrading from Spark 3.5.1 to 3.5.5, we observe that jobs using
dynamic allocation on Kubernetes with
{{spark.dynamicAllocation.minExecutors=0}} intermittently hang: when all
executors are decommissioned, no new executors are allocated even if there is
pending work. Reverting to 3.5.1 resolves the issue. This appears to be a
severe regression._
*Repro Steps*
# Use Spark 3.5.5 on Kubernetes with dynamic allocation enabled and
{{{}spark.dynamicAllocation.minExecutors=0{}}}.
# Submit a job that causes all executors to be decommissioned (e.g., idle
period).
# Submit more work to the same job/application.
# Observe that no new executors are allocated, and the job occasionally hangs
with pending work.
# Reverting to 3.5.1, the same job works as expected.
Following a bump from 3.5.1 to 3.5.5, we started observing that when running
Spark on Kubernetes with dynamic allocation and
{{{}spark.dynamicAllocation.minExecutors=0{}}}, executor
removal/decommissioning can sometimes leave the system in a state where no new
executors are allocated, even though there is pending work. Our hypothesis is
that there's a race or missed event between the Kubernetes pod watcher, the
internal executor state in the driver, and {{{}ExecutorPodsAllocator{}}},
resulting in a "ghost executor" that prevents further scaling.
*Summary of our Investigation (via logging ExecutorPodsAllocator):*
# We noticed that we never receive a log from {{ExecutorPodsAllocator}}
confirming we make a request to Kubernetes for another executor. We added logs
to the conditional that triggers that code block, and we found that
{{podsToAllocateWithRpId.size =}} 0, which is why we never request an
additional executor.
# Next, we checked why the {{podsToAllocateWithRpId }}map is empty – we see
that it only adds an element if{{{}(podCountForRpId < targetNum){}}}. We added
logs, and we confirmed that even though {{podCountForRpId}} should equal 0 when
the only previous executor is decommissioned (and thus be less than
{{targetNum}} = 1), {{podCountForRpId}} is still equal to 1.
# 3. We investigated why {{podCountForRpId}} == 1, and we found that
{{schedulerKnownNewlyCreatedExecsForRpId}} still contains an entry for the
previously created (and now decomissioned) executor. Logging confirmed that it
is *not*
because {{schedulerBackend.getExecutorIds()}} still contains an entry for a
deleted executor post-decommission; the scheduler definitely removed the
executor properly. Instead, we are finding that the culprit is here:
{code:java}
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds{code}
We are logging that {{k8sKnownExecIds}} is always empty each
{{onNewSnapshots()}} cycle throughout the entire Spark job. The intended
behavior is for the original executor to have appeared in a snapshot and be
removed from {{schedulerKnownNewlyCreatedExecs}} – that never happens in these
hanging jobs, and there is no timeout logic to clear executors from
{{{}schedulerKnownNewlyCreatedExecs{}}}. The pod allocator permanently is
waiting for a newly created executor from the scheduler to appear in a
snapshot.
We can confirm that reverting to 3.5.1 stops these intermittent hanging jobs.
We've looked through the commits between these two versions, and we are stumped
because we don't see obviously related changes in dynamic allocation or pod
snapshot logic. We really need to solve these hangs ASAP – any guidance would
be appreciated!
was:
_TLDR: After upgrading from Spark 3.5.1 to 3.5.5, we observe that jobs using
dynamic allocation on Kubernetes with
{{spark.dynamicAllocation.minExecutors=0}} intermittently hang: when all
executors are decommissioned, no new executors are allocated even if there is
pending work. Reverting to 3.5.1 resolves the issue. This appears to be a
severe regression._
*Repro Steps*
# Use Spark 3.5.5 on Kubernetes with dynamic allocation enabled and
{{{}spark.dynamicAllocation.minExecutors=0{}}}.
# Submit a job that causes all executors to be decommissioned (e.g., idle
period).
# Submit more work to the same job/application.
# Observe that no new executors are allocated, and the job occasionally hangs
with pending work.
# Reverting to 3.5.1, the same job works as expected.
Following a bump from 3.5.1 to 3.5.5, we started observing that when running
Spark on Kubernetes with dynamic allocation and
{{{}spark.dynamicAllocation.minExecutors=0{}}}, executor
removal/decommissioning can sometimes leave the system in a state where no new
executors are allocated, even though there is pending work. Our hypothesis is
that there's a race or missed event between the Kubernetes pod watcher, the
internal executor state in the driver, and {{{}ExecutorPodsAllocator{}}},
resulting in a "ghost executor" that prevents further scaling.
*Summary of our Investigation (via logging ExecutorPodsAllocator):*
# We noticed that we never receive a log from {{ExecutorPodsAllocator}}
confirming we make a request to Kubernetes for another executor. We added logs
to the conditional that triggers that code block, and we found that
{{podsToAllocateWithRpId.size =}} 0, which is why we never request an
additional executor.
# Next, we checked why the {{podsToAllocateWithRpId }}map is empty – we see
that it only adds an element if {{{}podCountForRpId < targetNum{}}}. We added
logs, and we confirmed that even though {{podCountForRpId}} should equal 0 when
the only previous executor is decommissioned (and thus be less than
{{targetNum}} = 1), {{podCountForRpId}} is still equal to 1.
# 3. We investigated why {{podCountForRpId}} == 1, and we found that
{{schedulerKnownNewlyCreatedExecsForRpId}} still contains an entry for the
previously created (and now decomissioned) executor. Logging confirmed that it
is *not*
because {{schedulerBackend.getExecutorIds()}} still contains an entry for a
deleted executor post-decommission; the scheduler definitely removed the
executor properly. Instead, we are finding that the culprit is here:
{code:java}
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds{code}
We are logging that {{k8sKnownExecIds}} is always empty each
{{onNewSnapshots()}} cycle throughout the entire Spark job. The intended
behavior is for the original executor to have appeared in a snapshot and be
removed from {{schedulerKnownNewlyCreatedExecs}} – that never happens in these
hanging jobs, and there is no timeout logic to clear executors from
{{{}schedulerKnownNewlyCreatedExecs{}}}. The pod allocator permanently is
waiting for a newly created executor from the scheduler to appear in a
snapshot.
We can confirm that reverting to 3.5.1 stops these intermittent hanging jobs.
We've looked through the commits between these two versions, and we are stumped
because we don't see obviously related changes in dynamic allocation or pod
snapshot logic. We really need to solve these hangs ASAP – any guidance would
be appreciated!
> Spark 3.5.5 on Kubernetes with Dynamic Allocation Hangs (minExecutors=0) Due
> to Stale Ghost Executors
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-52589
> URL: https://issues.apache.org/jira/browse/SPARK-52589
> Project: Spark
> Issue Type: Bug
> Components: k8s, Spark Core
> Affects Versions: 3.5.5
> Reporter: Richard Cheng
> Priority: Major
>
> _TLDR: After upgrading from Spark 3.5.1 to 3.5.5, we observe that jobs using
> dynamic allocation on Kubernetes with
> {{spark.dynamicAllocation.minExecutors=0}} intermittently hang: when all
> executors are decommissioned, no new executors are allocated even if there is
> pending work. Reverting to 3.5.1 resolves the issue. This appears to be a
> severe regression._
> *Repro Steps*
> # Use Spark 3.5.5 on Kubernetes with dynamic allocation enabled and
> {{{}spark.dynamicAllocation.minExecutors=0{}}}.
> # Submit a job that causes all executors to be decommissioned (e.g., idle
> period).
> # Submit more work to the same job/application.
> # Observe that no new executors are allocated, and the job occasionally
> hangs with pending work.
> # Reverting to 3.5.1, the same job works as expected.
> Following a bump from 3.5.1 to 3.5.5, we started observing that when running
> Spark on Kubernetes with dynamic allocation and
> {{{}spark.dynamicAllocation.minExecutors=0{}}}, executor
> removal/decommissioning can sometimes leave the system in a state where no
> new executors are allocated, even though there is pending work. Our
> hypothesis is that there's a race or missed event between the Kubernetes pod
> watcher, the internal executor state in the driver, and
> {{{}ExecutorPodsAllocator{}}}, resulting in a "ghost executor" that prevents
> further scaling.
> *Summary of our Investigation (via logging ExecutorPodsAllocator):*
> # We noticed that we never receive a log from {{ExecutorPodsAllocator}}
> confirming we make a request to Kubernetes for another executor. We added
> logs to the conditional that triggers that code block, and we found that
> {{podsToAllocateWithRpId.size =}} 0, which is why we never request an
> additional executor.
> # Next, we checked why the {{podsToAllocateWithRpId }}map is empty – we see
> that it only adds an element if{{{}(podCountForRpId < targetNum){}}}. We
> added logs, and we confirmed that even though {{podCountForRpId}} should
> equal 0 when the only previous executor is decommissioned (and thus be less
> than {{targetNum}} = 1), {{podCountForRpId}} is still equal to 1.
> # 3. We investigated why {{podCountForRpId}} == 1, and we found that
> {{schedulerKnownNewlyCreatedExecsForRpId}} still contains an entry for the
> previously created (and now decomissioned) executor. Logging confirmed that
> it is *not*
> because {{schedulerBackend.getExecutorIds()}} still contains an entry for a
> deleted executor post-decommission; the scheduler definitely removed the
> executor properly. Instead, we are finding that the culprit is here:
> {code:java}
> val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
> newlyCreatedExecutors --= k8sKnownExecIds
> schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds{code}
> We are logging that {{k8sKnownExecIds}} is always empty each
> {{onNewSnapshots()}} cycle throughout the entire Spark job. The intended
> behavior is for the original executor to have appeared in a snapshot and be
> removed from {{schedulerKnownNewlyCreatedExecs}} – that never happens in
> these hanging jobs, and there is no timeout logic to clear executors from
> {{{}schedulerKnownNewlyCreatedExecs{}}}. The pod allocator permanently is
> waiting for a newly created executor from the scheduler to appear in a
> snapshot.
> We can confirm that reverting to 3.5.1 stops these intermittent hanging jobs.
> We've looked through the commits between these two versions, and we are
> stumped because we don't see obviously related changes in dynamic allocation
> or pod snapshot logic. We really need to solve these hangs ASAP – any
> guidance would be appreciated!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]