[ 
https://issues.apache.org/jira/browse/SPARK-44389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17949584#comment-17949584
 ] 

Sam Wheating commented on SPARK-44389:
--------------------------------------

It looks like that revert linked above was only applied to the 3.4 branch, does 
that mean that this issue may have been reintroduced in 3.5?

We are seeing the similar issues trying to run Spark 3.5.3 applications in a 
kubernetes environment with a high volume of node rotation, and jobs utilizing 
a large number of executors require a many stage retries to get through a 
shuffle stage, due to the baseline rate of executor evictions.

Please let me know if there's anything I can do to help with a fix here.

Somewhat related - it looks like Google has added some internal-only 
configuration in their managed Spark offering to enable re-fetching shuffle 
segment location metadata after a failed fetch from a decommissioned executor:
[https://cloud.google.com/dataproc-serverless/docs/concepts/autoscaling#spark_dynamic_allocation_issues_and_solutions]

Would something similar to this be helpful here?

 

> ExecutorDeadException when using decommissioning without external shuffle 
> service
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-44389
>                 URL: https://issues.apache.org/jira/browse/SPARK-44389
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 3.4.0
>            Reporter: Volodymyr Kot
>            Priority: Major
>
> Hey, we are trying to use executor decommissioning without external shuffle 
> service. We are trying to understand:
>  # How often should we expect to see ExecutorDeadException? How is 
> information about changes to location of blocks is propagated?
>  # Whether the task should be re-submited if we hit that during 
> decommissioning?
>  
> Current behavior that we observe:
>  # Executor 1 is decommissioned
>  # Driver successfully removes executor 1's block manager 
> [here|https://github.com/apache/spark/blob/87a5442f7ed96b11051d8a9333476d080054e5a0/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala#L44]
>  # A task is started on executor 2
>  # We hit `ExecutorDeadException` on executor 2 when trying to fetch blocks 
> from executor 1 
> [here|https://github.com/apache/spark/blob/87a5442f7ed96b11051d8a9333476d080054e5a0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L139-L140]
>  # Task on executor 2 fails
>  # Stage fails
>  # Stage is re-submitted and succeeds
> As far as we understand, this happens because executor 2 has stale [map 
> status 
> cache|https://github.com/apache/spark/blob/87a5442f7ed96b11051d8a9333476d080054e5a0/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L1235-L1236]
> Is that expected behavior? Shouldn't the task be retried in that case instead 
> of whole stage failing and being retried? This makes Spark job execution 
> longer, especially if there are a lot of decommission events.
>  
> Also found [this 
> comment|https://github.palantir.build/foundry/spark/blob/aad028ae02011b079e8812f7e63869323cc1ed78/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113-L115],
>  which makes sense for FetchFailures w/o decommissioning, but with 
> decommissioning data could have been migrated - and we need to fetch a new 
> location. Maybe it makes sense to special case this codepath to check whether 
> executor was decommissioned? Since 
> https://issues.apache.org/jira/browse/SPARK-40979 we already store that 
> information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to