[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs

2014-12-26 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-2638:
---
Fix Version/s: (was: 1.2.0)
   1.3.0

 Improve concurrency of fetching Map outputs
 ---

 Key: SPARK-2638
 URL: https://issues.apache.org/jira/browse/SPARK-2638
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
 Environment: All
Reporter: Stephen Boesch
Assignee: Josh Rosen
Priority: Minor
  Labels: MapOutput, concurrency
 Fix For: 1.3.0

   Original Estimate: 0h
  Remaining Estimate: 0h

 This issue was noticed while perusing the MapOutputTracker source code. 
 Notice that the synchronization is on the containing fetching collection - 
 which makes ALL fetches wait if any fetch were occurring.  
 The fix is to synchronize instead on the shuffleId (interned as a string to 
 ensure JVM wide visibility).
   def getServerStatuses(shuffleId: Int, reduceId: Int): 
 Array[(BlockManagerId, Long)] = {
 val statuses = mapStatuses.get(shuffleId).orNull
 if (statuses == null) {
   logInfo(Don't have map outputs for shuffle  + shuffleId + , fetching 
 them)
   var fetchedStatuses: Array[MapStatus] = null
   fetching.synchronized {   // This is existing code
  //  shuffleId.toString.intern.synchronized {  // New Code
 if (fetching.contains(shuffleId)) {
   // Someone else is fetching it; wait for them to be done
   while (fetching.contains(shuffleId)) {
 try {
   fetching.wait()
 } catch {
   case e: InterruptedException =
 }
   }
 This is only a small code change, but the testcases to prove (a) proper 
 functionality and (b) proper performance improvement are not so trivial.  
 For (b) it is not worthwhile to add a testcase to the codebase. Instead I 
 have added a git project that demonstrates the concurrency/performance 
 improvement using the fine-grained approach . The github project is at
 https://github.com/javadba/scalatesting.git  .  Simply run sbt test. Note: 
 it is unclear how/where to include this ancillary testing/verification 
 information that will not be included in the git PR: i am open for any 
 suggestions - even as far as simply removing references to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs

2014-12-26 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-2638:
---
Fix Version/s: (was: 1.3.0)

 Improve concurrency of fetching Map outputs
 ---

 Key: SPARK-2638
 URL: https://issues.apache.org/jira/browse/SPARK-2638
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
 Environment: All
Reporter: Stephen Boesch
Assignee: Josh Rosen
Priority: Minor
  Labels: MapOutput, concurrency
   Original Estimate: 0h
  Remaining Estimate: 0h

 This issue was noticed while perusing the MapOutputTracker source code. 
 Notice that the synchronization is on the containing fetching collection - 
 which makes ALL fetches wait if any fetch were occurring.  
 The fix is to synchronize instead on the shuffleId (interned as a string to 
 ensure JVM wide visibility).
   def getServerStatuses(shuffleId: Int, reduceId: Int): 
 Array[(BlockManagerId, Long)] = {
 val statuses = mapStatuses.get(shuffleId).orNull
 if (statuses == null) {
   logInfo(Don't have map outputs for shuffle  + shuffleId + , fetching 
 them)
   var fetchedStatuses: Array[MapStatus] = null
   fetching.synchronized {   // This is existing code
  //  shuffleId.toString.intern.synchronized {  // New Code
 if (fetching.contains(shuffleId)) {
   // Someone else is fetching it; wait for them to be done
   while (fetching.contains(shuffleId)) {
 try {
   fetching.wait()
 } catch {
   case e: InterruptedException =
 }
   }
 This is only a small code change, but the testcases to prove (a) proper 
 functionality and (b) proper performance improvement are not so trivial.  
 For (b) it is not worthwhile to add a testcase to the codebase. Instead I 
 have added a git project that demonstrates the concurrency/performance 
 improvement using the fine-grained approach . The github project is at
 https://github.com/javadba/scalatesting.git  .  Simply run sbt test. Note: 
 it is unclear how/where to include this ancillary testing/verification 
 information that will not be included in the git PR: i am open for any 
 suggestions - even as far as simply removing references to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs

2014-09-15 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-2638:
---
Fix Version/s: (was: 1.1.0)
   1.2.0

 Improve concurrency of fetching Map outputs
 ---

 Key: SPARK-2638
 URL: https://issues.apache.org/jira/browse/SPARK-2638
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
 Environment: All
Reporter: Stephen Boesch
Assignee: Josh Rosen
Priority: Minor
  Labels: MapOutput, concurrency
 Fix For: 1.2.0

   Original Estimate: 0h
  Remaining Estimate: 0h

 This issue was noticed while perusing the MapOutputTracker source code. 
 Notice that the synchronization is on the containing fetching collection - 
 which makes ALL fetches wait if any fetch were occurring.  
 The fix is to synchronize instead on the shuffleId (interned as a string to 
 ensure JVM wide visibility).
   def getServerStatuses(shuffleId: Int, reduceId: Int): 
 Array[(BlockManagerId, Long)] = {
 val statuses = mapStatuses.get(shuffleId).orNull
 if (statuses == null) {
   logInfo(Don't have map outputs for shuffle  + shuffleId + , fetching 
 them)
   var fetchedStatuses: Array[MapStatus] = null
   fetching.synchronized {   // This is existing code
  //  shuffleId.toString.intern.synchronized {  // New Code
 if (fetching.contains(shuffleId)) {
   // Someone else is fetching it; wait for them to be done
   while (fetching.contains(shuffleId)) {
 try {
   fetching.wait()
 } catch {
   case e: InterruptedException =
 }
   }
 This is only a small code change, but the testcases to prove (a) proper 
 functionality and (b) proper performance improvement are not so trivial.  
 For (b) it is not worthwhile to add a testcase to the codebase. Instead I 
 have added a git project that demonstrates the concurrency/performance 
 improvement using the fine-grained approach . The github project is at
 https://github.com/javadba/scalatesting.git  .  Simply run sbt test. Note: 
 it is unclear how/where to include this ancillary testing/verification 
 information that will not be included in the git PR: i am open for any 
 suggestions - even as far as simply removing references to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org