[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50363390
  
@mridulm Thanks for the clarifications! Those make sense and are some 
tricky edge cases. I will begin reviewing the code as soon as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50311354
  
QA results for PR 1609:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17279/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50308229
  
QA tests have started for PR 1609. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17279/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15449433
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
 }
 false
   }
+  /*
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
+
+  def inShutdown(): Boolean = {
+if (shutdownStarted) return true
+doShutdownCheck()
+shutdownStarted
+  }
+
+  private[spark] def doShutdownCheck() {
+var shutdown = false
+try {
+  val hook = new Thread {
+override def run() {}
+  }
+  Runtime.getRuntime.addShutdownHook(hook)
+  Runtime.getRuntime.removeShutdownHook(hook)
+} catch {
+  case ise: IllegalStateException =>
+shutdown = true
+} finally {
+  shutdownStarted = shutdown
+}
+  }
+  */
--- End diff --

This (and related commented inShutdown references) has been left around in 
case someone else can suggest an improvement !
The issue is, this works fine in 'normal' use : but while running spark 
tests in local mode, it fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50307411
  
QA results for PR 1609:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17277/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50307247
  
QA tests have started for PR 1609. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17277/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50307155
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm closed the pull request at:

https://github.com/apache/spark/pull/1609


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50306633
  
@witgo I did not understand the space issue : stylecheck seems to run fine.

Regarding the actual issues : the JIRA lists some of them - unfortunately 
it is not exhaustive.
Spark code assumes a few things :
1) A flush followed by close should not cause additional data to be written 
to the stream - which is not valid in general case (close can still write more 
data).
2) reading an object from stream will consume all data written as  part of 
the object - which is not valid in general case, additional info could have 
been written after the object was written (like reset markers in java serde). 
So stream wrapping has to account for that.
3) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
GitHub user mridulm reopened a pull request:

https://github.com/apache/spark/pull/1609

[SPARK-2532] WIP Consolidated shuffle fixes

Status of the PR
- [X] Cherry pick and merge changes from internal branch to spark master
- [X] Remove WIP comments and 2G branch references.
- [X] Tests for BlockObjectWriter
- [ ] Tests for ExternalAppendOnlyMap
- [x] Tests for ShuffleBlockManager




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1609


commit f1182f8a3d3328248d471038d6ab0db6e6a1396d
Author: Mridul Muralidharan 
Date:   2014-07-27T15:23:05Z

Consolidated shuffle fixes

commit 66d6ec3f99882ad6062c5bff36f2edb82b0c24c0
Author: Mridul Muralidharan 
Date:   2014-07-27T15:40:32Z

Add missing setShutdownStarted hooks

commit 027c7f18c44c57960a2a94eee961f0aa811e7a34
Author: Mridul Muralidharan 
Date:   2014-07-27T16:05:31Z

stylecheck fixes

commit 195c529c1ae5ffa7e8f9cf6af4df8b9536a39d6a
Author: Mridul Muralidharan 
Date:   2014-07-27T23:46:53Z

Fix build, add testcases for DiskBlockManagerSuite

commit 6095545bf55a87ef0b28bc11adc63dcc5b661b6c
Author: Mridul Muralidharan 
Date:   2014-07-27T23:50:45Z

Consolidated fixes

commit 1c1faea69d9709c7e65afc9bdd13a8e0d5488c82
Author: Mridul Muralidharan 
Date:   2014-07-27T23:50:50Z

Merge branch 'consolidated_shuffle_fixes' of github.com:mridulm/spark into 
consolidated_shuffle_fixes

commit fbf20f792baf7ab8d6705c7a9525c2db92bb7ae3
Author: Mridul Muralidharan 
Date:   2014-07-28T06:59:45Z

Disable code to detect programming shutdown via stop's. Make actor/store 
shutdown in DiskBlockManagerSuite more robust




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50306648
  
Accidental close, apologies !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15448803
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -935,15 +941,22 @@ private[spark] object Utils extends Logging {
* Currently, this detects whether the JVM is shutting down by 
Runtime#addShutdownHook throwing
* an IllegalStateException.
*/
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
   def inShutdown(): Boolean = {
 try {
+  if (shutdownStarted) return true
--- End diff --

Unfortunately, had to be reverted :-(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50306230
  
QA results for PR 1609:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17276/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50306070
  
QA tests have started for PR 1609. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17276/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50305416
  
@mridulm Thanks for submitting this! I would like to dig a little deeper 
into understanding the specific issues you found, in order to understand the 
solutions you have provided (since the specific solutions seem interleaved with 
a lot of new asserts and code paths). 

You mention that there was an issue if shuffle writes co-occur with shuffle 
fetches, which is true, but should not typically occur due to the barrier 
before the reduce stage of a shuffle. In what situations does this happen 
(outside of failure conditions)?

Did you observe a prior pattern of close/revert/close on the same block 
writer?

How did task failures induce inconsistent state on the map side? Was it due 
to the same close/revert/close pattern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447535
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
 extends Iterator[(K, C)] {
-private val fileStream = new FileInputStream(file)
-private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
+
+assert (! batchSizes.isEmpty)
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447541
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
 extends Iterator[(K, C)] {
-private val fileStream = new FileInputStream(file)
-private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
+
+assert (! batchSizes.isEmpty)
+assert (! batchSizes.exists(_ <= 0))
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447510
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -935,15 +941,22 @@ private[spark] object Utils extends Logging {
* Currently, this detects whether the JVM is shutting down by 
Runtime#addShutdownHook throwing
* an IllegalStateException.
*/
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
   def inShutdown(): Boolean = {
 try {
+  if (shutdownStarted) return true
--- End diff --

This is a very nice improvement


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447449
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists && initiallyExists && 0 != initialPosition && ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException("file " + file + " cleaned up ? exists = " + 
exists +
+  ", initiallyExists = " + initiallyExists + ", initialPosition = 
" + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =>
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug("Unable to open " + file + ", canRetry = " + canRetry + 
", exists = " + exists +
+  ", initialPosition = " + initialPosition + ", in shutdown = " + 
Utils.inShutdown(), fEx)
+if (canRetry && ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException("file cleaned up ? " + file.exists() + 
+  ", initialpos = " + initialPosition +
+  "current len = " + fosPos + ", in shutdown ? " + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initi

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447430
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists && initiallyExists && 0 != initialPosition && ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException("file " + file + " cleaned up ? exists = " + 
exists +
+  ", initiallyExists = " + initiallyExists + ", initialPosition = 
" + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =>
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug("Unable to open " + file + ", canRetry = " + canRetry + 
", exists = " + exists +
+  ", initialPosition = " + initialPosition + ", in shutdown = " + 
Utils.inShutdown(), fEx)
+if (canRetry && ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException("file cleaned up ? " + file.exists() + 
+  ", initialpos = " + initialPosition +
+  "current len = " + fosPos + ", in shutdown ? " + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initi

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447436
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists && initiallyExists && 0 != initialPosition && ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException("file " + file + " cleaned up ? exists = " + 
exists +
+  ", initiallyExists = " + initiallyExists + ", initialPosition = 
" + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =>
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug("Unable to open " + file + ", canRetry = " + canRetry + 
", exists = " + exists +
+  ", initialPosition = " + initialPosition + ", in shutdown = " + 
Utils.inShutdown(), fEx)
+if (canRetry && ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException("file cleaned up ? " + file.exists() + 
+  ", initialpos = " + initialPosition +
+  "current len = " + fosPos + ", in shutdown ? " + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initi

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447401
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists && initiallyExists && 0 != initialPosition && ! 
Utils.inShutdown) {
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447402
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists && initiallyExists && 0 != initialPosition && ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException("file " + file + " cleaned up ? exists = " + 
exists +
+  ", initiallyExists = " + initiallyExists + ", initialPosition = 
" + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =>
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug("Unable to open " + file + ", canRetry = " + canRetry + 
", exists = " + exists +
+  ", initialPosition = " + initialPosition + ", in shutdown = " + 
Utils.inShutdown(), fEx)
+if (canRetry && ! Utils.inShutdown()) {
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447397
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists && initiallyExists && 0 != initialPosition && ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException("file " + file + " cleaned up ? exists = " + 
exists +
+  ", initiallyExists = " + initiallyExists + ", initialPosition = 
" + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =>
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug("Unable to open " + file + ", canRetry = " + canRetry + 
", exists = " + exists +
+  ", initialPosition = " + initialPosition + ", in shutdown = " + 
Utils.inShutdown(), fEx)
+if (canRetry && ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException("file cleaned up ? " + file.exists() + 
+  ", initialpos = " + initialPosition +
+  "current len = " + fosPos + ", in shutdown ? " + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initi

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15447384
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
--- End diff --

One more spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15446921
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -176,10 +406,17 @@ private[spark] class DiskBlockObjectWriter(
 if (!initialized) {
   open()
 }
+// Not checking if closed on purpose ... introduce it ? No usecase for 
it right now.
 objOut.writeObject(value)
   }
 
   override def fileSegment(): FileSegment = {
+assert (! initialized)
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15446910
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists() && file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException("Already closed")
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists && initiallyExists && 0 != initialPosition && ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException("file " + file + " cleaned up ? exists = " + 
exists +
+  ", initiallyExists = " + initiallyExists + ", initialPosition = 
" + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =>
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug("Unable to open " + file + ", canRetry = " + canRetry + 
", exists = " + exists +
+  ", initialPosition = " + initialPosition + ", in shutdown = " + 
Utils.inShutdown(), fEx)
+if (canRetry && ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException("file cleaned up ? " + file.exists() + 
+  ", initialpos = " + initialPosition +
+  "current len = " + fosPos + ", in shutdown ? " + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
--- End diff --

One more Spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50291239
  
QA results for PR 1609:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17256/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50290580
  
Did you mean @aarondav?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50290037
  
QA tests have started for PR 1609. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17256/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15442779
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
---
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import java.io.{IOException, FileOutputStream, OutputStream, File}
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/**
+ * Test various code paths in DiskBlockObjectWriter
+ */
+class DiskBlockObjectWriterSuite extends FunSuite {
+
+  private val conf = new SparkConf
+  private val BUFFER_SIZE = 32 * 1024
+
+  private def tempFile(): File = {
+val file = File.createTempFile("temp_", "block")
+// We dont want file to exist ! Just need a temp file name
+file.delete()
+file
+  }
+
+  private def createWriter(file: File = tempFile()) :
+  (File, DiskBlockObjectWriter) = {
+file.deleteOnExit()
+
+(file, new DiskBlockObjectWriter(BlockId("test_1"), file,
+  new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) => out, 
true))
+  }
+
+
+  test("write after close should throw IOException") {
+val (file, bow) = createWriter()
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.commitAndClose()
+
+intercept[IOException] {
+  bow.write("test2")
+}
+
+file.delete()
+  }
+
+  test("write after revert should throw IOException") {
+val (file, bow) = createWriter()
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.revertPartialWritesAndClose()
+
+intercept[IOException] {
+  bow.write("test2")
+}
+
+file.delete()
+  }
+
+  test("create even if directory does not exist") {
+val dir = File.createTempFile("temp_", "dir")
+dir.delete()
+
+val file = new File(dir, "temp.file")
+file.deleteOnExit()
+
+val bow = new DiskBlockObjectWriter(BlockId("test_1"), file, new 
JavaSerializer(conf),
+  BUFFER_SIZE, (out: OutputStream) => out, true)
+
+bow.write("test")
+assert (file.exists() && file.isFile)
+bow.commitAndClose()
+Utils.deleteRecursively(dir)
+  }
+
+  test("revert of new file should delete it") {
+val (file, bow) = createWriter()
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.revertPartialWritesAndClose()
+assert (! file.exists())
+// file.delete()
+  }
+
+  test("revert of existing file should revert it to previous state") {
+val (file, bow1) = createWriter()
+
+bow1.write("test")
+bow1.write("test1")
+assert (file.exists() && file.isFile)
+
+bow1.commitAndClose()
+val length = file.length()
+
+// reopen same file.
+val bow2 = createWriter(file)._2
+
+bow2.write("test3")
+bow2.write("test4")
+
+assert (file.exists() && file.isFile)
+
+bow2.revertPartialWritesAndClose()
+assert (file.exists())
+assert (length == file.length())
+file.delete()
+  }
+
+  test("revert of writer after close should delete if it did not exist 
earlier") {
+val (file, bow) = createWriter(tempFile())
+
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.commitAndClose()
+val length = file.length()
+
+assert (file.exists() && file.isFile)
+assert (length > 0)
+
+// Now revert the file, after it has been closed : should delete the 
file
+// since it did not exist earlier.
+b

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50289340
  
@adav @andrewor14 would be good if you two take a look at this when it's 
merging correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15442565
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
---
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import java.io.{IOException, FileOutputStream, OutputStream, File}
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/**
+ * Test various code paths in DiskBlockObjectWriter
+ */
+class DiskBlockObjectWriterSuite extends FunSuite {
+
+  private val conf = new SparkConf
+  private val BUFFER_SIZE = 32 * 1024
+
+  private def tempFile(): File = {
+val file = File.createTempFile("temp_", "block")
+// We dont want file to exist ! Just need a temp file name
+file.delete()
+file
+  }
+
+  private def createWriter(file: File = tempFile()) :
+  (File, DiskBlockObjectWriter) = {
+file.deleteOnExit()
+
+(file, new DiskBlockObjectWriter(BlockId("test_1"), file,
+  new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) => out, 
true))
+  }
+
+
+  test("write after close should throw IOException") {
+val (file, bow) = createWriter()
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.commitAndClose()
+
+intercept[IOException] {
+  bow.write("test2")
+}
+
+file.delete()
+  }
+
+  test("write after revert should throw IOException") {
+val (file, bow) = createWriter()
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.revertPartialWritesAndClose()
+
+intercept[IOException] {
+  bow.write("test2")
+}
+
+file.delete()
+  }
+
+  test("create even if directory does not exist") {
+val dir = File.createTempFile("temp_", "dir")
+dir.delete()
+
+val file = new File(dir, "temp.file")
+file.deleteOnExit()
+
+val bow = new DiskBlockObjectWriter(BlockId("test_1"), file, new 
JavaSerializer(conf),
+  BUFFER_SIZE, (out: OutputStream) => out, true)
+
+bow.write("test")
+assert (file.exists() && file.isFile)
+bow.commitAndClose()
+Utils.deleteRecursively(dir)
+  }
+
+  test("revert of new file should delete it") {
+val (file, bow) = createWriter()
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.revertPartialWritesAndClose()
+assert (! file.exists())
+// file.delete()
+  }
+
+  test("revert of existing file should revert it to previous state") {
+val (file, bow1) = createWriter()
+
+bow1.write("test")
+bow1.write("test1")
+assert (file.exists() && file.isFile)
+
+bow1.commitAndClose()
+val length = file.length()
+
+// reopen same file.
+val bow2 = createWriter(file)._2
+
+bow2.write("test3")
+bow2.write("test4")
+
+assert (file.exists() && file.isFile)
+
+bow2.revertPartialWritesAndClose()
+assert (file.exists())
+assert (length == file.length())
+file.delete()
+  }
+
+  test("revert of writer after close should delete if it did not exist 
earlier") {
+val (file, bow) = createWriter(tempFile())
+
+bow.write("test")
+bow.write("test1")
+assert (file.exists() && file.isFile)
+
+bow.commitAndClose()
+val length = file.length()
+
+assert (file.exists() && file.isFile)
+assert (length > 0)
+
+// Now revert the file, after it has been closed : should delete the 
file
+// since it did not exist earlier.
+bo

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50278406
  
QA results for PR 1609:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17244/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50277674
  
QA tests have started for PR 1609. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17244/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50274618
  
QA tests have started for PR 1609. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17243/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50274622
  
QA results for PR 1609:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17243/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50267540
  
QA tests have started for PR 1609. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17242/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50267547
  
QA results for PR 1609:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17242/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread mridulm
GitHub user mridulm opened a pull request:

https://github.com/apache/spark/pull/1609

[SPARK-2532] WIP Consolidated shuffle fixes

Status of the PR
- [X] Cherry pick and merge changes from internal branch to spark master
- [X] Remove WIP comments and 2G branch references.
- [X] Tests for BlockObjectWriter
- [ ] Tests for ExternalAppendOnlyMap
- [ ] Tests for MapOutputTracker
- [ ] Tests for ShuffleBlockManager




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1609


commit f1182f8a3d3328248d471038d6ab0db6e6a1396d
Author: Mridul Muralidharan 
Date:   2014-07-27T15:23:05Z

Consolidated shuffle fixes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---