[ https://issues.apache.org/jira/browse/SPARK-48394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
wuyi updated SPARK-48394: ------------------------- Description: There is only one valid mapstatus for the same {{mapIndex}} at the same time in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos. The issue leads to shuffle fetch failure and the job failure in the end. It happens this way: # Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on executor e1 # Executor Y starts deommission # Executor Y reports false-positve lost to driver during its decommission # Stage B reuse the shuffle dependency with Stage A, and computes the partition P0 again by task t2 on executor e2 # When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the same paritition in {{mapIdToMapIndex}} but only one item (mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}. # Executor Y starts to migrate task t1's mapstatus (to executor e3 for example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's mapstatus. // updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_)) # Task t2's mapstatus's location then would be updated to executor e3 but it's indeed still located on executor e2. This finally leads to the fetch failure in the end. was:There is only one valid mapstatus for the same {{mapIndex}} at the same time in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos. > Cleanup mapIdToMapIndex on mapoutput unregister > ----------------------------------------------- > > Key: SPARK-48394 > URL: https://issues.apache.org/jira/browse/SPARK-48394 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.0, 4.0.0, 3.5.1 > Reporter: wuyi > Assignee: wuyi > Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > There is only one valid mapstatus for the same {{mapIndex}} at the same time > in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid > chaos. > > The issue leads to shuffle fetch failure and the job failure in the end. It > happens this way: > # Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on > executor e1 > # Executor Y starts deommission > # Executor Y reports false-positve lost to driver during its decommission > # Stage B reuse the shuffle dependency with Stage A, and computes the > partition P0 again by task t2 on executor e2 > # When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the > same paritition in {{mapIdToMapIndex}} but only one item > (mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}. > # Executor Y starts to migrate task t1's mapstatus (to executor e3 for > example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use > mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's > mapstatus. > // updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt = > mapIndex.map(mapStatuses(_)).flatMap(Option(_)) > # Task t2's mapstatus's location then would be updated to executor e3 but > it's indeed still located on executor e2. This finally leads to the fetch > failure in the end. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org