Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22371#discussion_r216167356 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -138,13 +148,22 @@ private[spark] class IndexShuffleBlockResolver( mapId: Int, lengths: Array[Long], dataTmp: File): Unit = { + shuffleIdToLocks.putIfAbsent(shuffleId, new Array[Object](lengths.length)) + val mapLocks = shuffleIdToLocks.get(shuffleId) + val lock = mapLocks.synchronized { + if (mapLocks(mapId) == null) { + mapLocks(mapId) = new Object() + } + mapLocks(mapId) + } + val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { val dataFile = getDataFile(shuffleId, mapId) - // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure - // the following check and rename are atomic. - synchronized { + // We need make sure the following check and rename are atomic, and we only need to --- End diff -- I don't know the logic well enough to really evaluate this, but it looks plausible. It looks like this block operates on `indexFile` and `dataFile` and things derived from them, and those appear to be keyed by `shuffleId` and `mapId`, so sounds plausible that there is no need to synchronize access when different shuffle or map IDs are used. I see the synchronized block does things like delete `indexFile`. This is read outside the synchronized block. I wonder if there is an issue there? should this really be checking for the file inside a block that excludes deletion of that file at the same time? Granted that is the existing logic here and maybe OK to not touch that now, but I did wonder when trying to reason about this.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org