Emil Ejbyfeldt created SPARK-38101:
--------------------------------------

             Summary: MetadataFetchFailedException due to decommission block 
migrations
                 Key: SPARK-38101
                 URL: https://issues.apache.org/jira/browse/SPARK-38101
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.1, 3.1.2, 3.1.3, 3.2.2, 3.3
            Reporter: Emil Ejbyfeldt


As noted in SPARK-34939 there is race when using broadcast for map output 
status. Explanation from SPARK-34939

> After map statuses are broadcasted and the executors obtain serialized 
> broadcasted map statuses. If any fetch failure happens after, Spark scheduler 
> invalidates cached map statuses and destroy broadcasted value of the map 
> statuses. Then any executor trying to deserialize serialized broadcasted map 
> statuses and access broadcasted value, IOException will be thrown. Currently 
> we don't catch it in MapOutputTrackerWorker and above exception will fail the 
> application.

But if running with `spark.decommission.enabled=true` and 
`spark.storage.decommission.shuffleBlocks.enabled=true` there is another way to 
hit this race, when a node is decommissioning and the shuffle blocks are 
migrated. After a block has been migrated an update will be sent to the driver 
for each block and the map output caches will be invalidated.

Here are a driver when we hit the race condition running with spark 3.2.0:


{code:java}
2022-01-28 03:20:12,409 INFO memory.MemoryStore: Block broadcast_27 stored as 
values in memory (estimated size 5.5 MiB, free 11.0 GiB)
2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output for 
192108 to BlockManagerId(760, ip-10-231-63-204.ec2.internal, 34707, None)
2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output for 
179529 to BlockManagerId(743, ip-10-231-34-160.ec2.internal, 44225, None)
2022-01-28 03:20:12,414 INFO spark.ShuffleStatus: Updating map output for 
187194 to BlockManagerId(761, ip-10-231-43-219.ec2.internal, 39943, None)
2022-01-28 03:20:12,415 INFO spark.ShuffleStatus: Updating map output for 
190303 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, None)
2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output for 
192220 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, None)
2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output for 
182306 to BlockManagerId(688, ip-10-231-43-41.ec2.internal, 35967, None)
2022-01-28 03:20:12,417 INFO spark.ShuffleStatus: Updating map output for 
190387 to BlockManagerId(772, ip-10-231-55-173.ec2.internal, 35523, None)
2022-01-28 03:20:12,417 INFO memory.MemoryStore: Block broadcast_27_piece0 
stored as bytes in memory (estimated size 4.0 MiB, free 10.9 GiB)
2022-01-28 03:20:12,417 INFO storage.BlockManagerInfo: Added 
broadcast_27_piece0 in memory on ip-10-231-63-1.ec2.internal:34761 (size: 4.0 
MiB, free: 11.0 GiB)
2022-01-28 03:20:12,418 INFO memory.MemoryStore: Block broadcast_27_piece1 
stored as bytes in memory (estimated size 1520.4 KiB, free 10.9 GiB)
2022-01-28 03:20:12,418 INFO storage.BlockManagerInfo: Added 
broadcast_27_piece1 in memory on ip-10-231-63-1.ec2.internal:34761 (size: 
1520.4 KiB, free: 11.0 GiB)
2022-01-28 03:20:12,418 INFO spark.MapOutputTracker: Broadcast outputstatuses 
size = 416, actual size = 5747443
2022-01-28 03:20:12,419 INFO spark.ShuffleStatus: Updating map output for 
153389 to BlockManagerId(154, ip-10-231-42-104.ec2.internal, 44717, None)
2022-01-28 03:20:12,419 INFO broadcast.TorrentBroadcast: Destroying 
Broadcast(27) (from updateMapOutput at BlockManagerMasterEndpoint.scala:594)
2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Added rdd_65_20310 on 
disk on ip-10-231-32-25.ec2.internal:40657 (size: 77.6 MiB)
2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Removed 
broadcast_27_piece0 on ip-10-231-63-1.ec2.internal:34761 in memory (size: 4.0 
MiB, free: 11.0 GiB)
{code}


While the Broadcast is being constructed we have updates coming in and the 
broadcast is destroyed almost immediately. On this particular job we ended up 
hitting the race condition a lot of times and it caused ~18 task failures and 
stage retries within 20 seconds causing us to hit our stage retry limit and the 
job to fail.

As far I understand this was the expected behavior for handling this case after 
SPARK-34939. But it seems like when combined with decommissioning hitting the 
race is a bit too common.

We have observed this behavior running 3.2.0 and 3.2.1, but I think other 
current versions are all so affected.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to