[ https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071490#comment-14071490 ]
Stephen Boesch commented on SPARK-2638: --------------------------------------- Other examples: HttpBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false) } TorrentBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false) } Note: there are well over one hundred other uses of synchronized - but the others seem to be properly scoped - ie synchronized on sufficiently confined objects, or encompassing short-lived operations. > Improve concurrency of fetching Map outputs > ------------------------------------------- > > Key: SPARK-2638 > URL: https://issues.apache.org/jira/browse/SPARK-2638 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 1.0.0 > Environment: All > Reporter: Stephen Boesch > Priority: Minor > Labels: MapOutput, concurrency > Fix For: 1.1.0 > > Original Estimate: 0h > Remaining Estimate: 0h > > This issue was noticed while perusing the MapOutputTracker source code. > Notice that the synchronization is on the containing "fetching" collection - > which makes ALL fetches wait if any fetch were occurring. > The fix is to synchronize instead on the shuffleId (interned as a string to > ensure JVM wide visibility). > def getServerStatuses(shuffleId: Int, reduceId: Int): > Array[(BlockManagerId, Long)] = { > val statuses = mapStatuses.get(shuffleId).orNull > if (statuses == null) { > logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching > them") > var fetchedStatuses: Array[MapStatus] = null > fetching.synchronized { // This is existing code > // shuffleId.toString.intern.synchronized { // New Code > if (fetching.contains(shuffleId)) { > // Someone else is fetching it; wait for them to be done > while (fetching.contains(shuffleId)) { > try { > fetching.wait() > } catch { > case e: InterruptedException => > } > } > This is only a small code change, but the testcases to prove (a) proper > functionality and (b) proper performance improvement are not so trivial. > For (b) it is not worthwhile to add a testcase to the codebase. Instead I > have added a git project that demonstrates the concurrency/performance > improvement using the fine-grained approach . The github project is at > https://github.com/javadba/scalatesting.git . Simply run "sbt test". Note: > it is unclear how/where to include this ancillary testing/verification > information that will not be included in the git PR: i am open for any > suggestions - even as far as simply removing references to it. -- This message was sent by Atlassian JIRA (v6.2#6252)