Repository: spark Updated Branches: refs/heads/master e9faae135 -> 5952ad2b4
[SPARK-21444] Be more defensive when removing broadcasts in MapOutputTracker ## What changes were proposed in this pull request? In SPARK-21444, sitalkedia reported an issue where the `Broadcast.destroy()` call in `MapOutputTracker`'s `ShuffleStatus.invalidateSerializedMapOutputStatusCache()` was failing with an `IOException`, causing the DAGScheduler to crash and bring down the entire driver. This is a bug introduced by #17955. In the old code, we removed a broadcast variable by calling `BroadcastManager.unbroadcast` with `blocking=false`, but the new code simply calls `Broadcast.destroy()` which is capable of failing with an IOException in case certain blocking RPCs time out. The fix implemented here is to replace this with a call to `destroy(blocking = false)` and to wrap the entire operation in `Utils.tryLogNonFatalError`. ## How was this patch tested? I haven't written regression tests for this because it's really hard to inject mocks to simulate RPC failures here. Instead, this class of issue is probably best uncovered with more generalized error injection / network unreliability / fuzz testing tools. Author: Josh Rosen <joshro...@databricks.com> Closes #18662 from JoshRosen/SPARK-21444. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5952ad2b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5952ad2b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5952ad2b Branch: refs/heads/master Commit: 5952ad2b40c82c0ccb2ec16fa09071bf198ff99d Parents: e9faae1 Author: Josh Rosen <joshro...@databricks.com> Authored: Mon Jul 17 20:40:32 2017 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Mon Jul 17 20:40:32 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5952ad2b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 5d48bc7..7f760a5 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -194,7 +194,12 @@ private class ShuffleStatus(numPartitions: Int) { */ def invalidateSerializedMapOutputStatusCache(): Unit = synchronized { if (cachedSerializedBroadcast != null) { - cachedSerializedBroadcast.destroy() + // Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444) + Utils.tryLogNonFatalError { + // Use `blocking = false` so that this operation doesn't hang while trying to send cleanup + // RPCs to dead executors. + cachedSerializedBroadcast.destroy(blocking = false) + } cachedSerializedBroadcast = null } cachedSerializedMapStatus = null --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org