[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs
[ https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-2638: --- Fix Version/s: (was: 1.2.0) 1.3.0 Improve concurrency of fetching Map outputs --- Key: SPARK-2638 URL: https://issues.apache.org/jira/browse/SPARK-2638 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Environment: All Reporter: Stephen Boesch Assignee: Josh Rosen Priority: Minor Labels: MapOutput, concurrency Fix For: 1.3.0 Original Estimate: 0h Remaining Estimate: 0h This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing fetching collection - which makes ALL fetches wait if any fetch were occurring. The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility). def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo(Don't have map outputs for shuffle + shuffleId + , fetching them) var fetchedStatuses: Array[MapStatus] = null fetching.synchronized { // This is existing code // shuffleId.toString.intern.synchronized { // New Code if (fetching.contains(shuffleId)) { // Someone else is fetching it; wait for them to be done while (fetching.contains(shuffleId)) { try { fetching.wait() } catch { case e: InterruptedException = } } This is only a small code change, but the testcases to prove (a) proper functionality and (b) proper performance improvement are not so trivial. For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added a git project that demonstrates the concurrency/performance improvement using the fine-grained approach . The github project is at https://github.com/javadba/scalatesting.git . Simply run sbt test. Note: it is unclear how/where to include this ancillary testing/verification information that will not be included in the git PR: i am open for any suggestions - even as far as simply removing references to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs
[ https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-2638: --- Fix Version/s: (was: 1.3.0) Improve concurrency of fetching Map outputs --- Key: SPARK-2638 URL: https://issues.apache.org/jira/browse/SPARK-2638 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Environment: All Reporter: Stephen Boesch Assignee: Josh Rosen Priority: Minor Labels: MapOutput, concurrency Original Estimate: 0h Remaining Estimate: 0h This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing fetching collection - which makes ALL fetches wait if any fetch were occurring. The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility). def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo(Don't have map outputs for shuffle + shuffleId + , fetching them) var fetchedStatuses: Array[MapStatus] = null fetching.synchronized { // This is existing code // shuffleId.toString.intern.synchronized { // New Code if (fetching.contains(shuffleId)) { // Someone else is fetching it; wait for them to be done while (fetching.contains(shuffleId)) { try { fetching.wait() } catch { case e: InterruptedException = } } This is only a small code change, but the testcases to prove (a) proper functionality and (b) proper performance improvement are not so trivial. For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added a git project that demonstrates the concurrency/performance improvement using the fine-grained approach . The github project is at https://github.com/javadba/scalatesting.git . Simply run sbt test. Note: it is unclear how/where to include this ancillary testing/verification information that will not be included in the git PR: i am open for any suggestions - even as far as simply removing references to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs
[ https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-2638: --- Fix Version/s: (was: 1.1.0) 1.2.0 Improve concurrency of fetching Map outputs --- Key: SPARK-2638 URL: https://issues.apache.org/jira/browse/SPARK-2638 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Environment: All Reporter: Stephen Boesch Assignee: Josh Rosen Priority: Minor Labels: MapOutput, concurrency Fix For: 1.2.0 Original Estimate: 0h Remaining Estimate: 0h This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing fetching collection - which makes ALL fetches wait if any fetch were occurring. The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility). def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo(Don't have map outputs for shuffle + shuffleId + , fetching them) var fetchedStatuses: Array[MapStatus] = null fetching.synchronized { // This is existing code // shuffleId.toString.intern.synchronized { // New Code if (fetching.contains(shuffleId)) { // Someone else is fetching it; wait for them to be done while (fetching.contains(shuffleId)) { try { fetching.wait() } catch { case e: InterruptedException = } } This is only a small code change, but the testcases to prove (a) proper functionality and (b) proper performance improvement are not so trivial. For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added a git project that demonstrates the concurrency/performance improvement using the fine-grained approach . The github project is at https://github.com/javadba/scalatesting.git . Simply run sbt test. Note: it is unclear how/where to include this ancillary testing/verification information that will not be included in the git PR: i am open for any suggestions - even as far as simply removing references to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org