[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-09-03 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-54383344
  
@mridulm Are the issues in this PR taken care of by #1722 and and #1678? Do 
we still need this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-09-03 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-54393908
  
I think it got split into four issues, two of which got committed, not sure
of the other other two  And the first one was regressed upon in
1.1.already.
But this or probably is defunct now  Will close
On 04-Sep-2014 5:03 am, andrewor14 notificati...@github.com wrote:

 @mridulm https://github.com/mridulm Are the issues in this PR taken
 care of by #1722 https://github.com/apache/spark/pull/1722 and and #1678
 https://github.com/apache/spark/pull/1678? Do we still need this PR?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1609#issuecomment-54383344.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-09-03 Thread mridulm
Github user mridulm closed the pull request at:

https://github.com/apache/spark/pull/1609


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-08-01 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50929329
  
So I looked through this and I also think it would be good to split it into 
smaller patches for 1.1. As far as I can see there are several orthogonal 
improvements here:
- Shuffle file consolidation fixes that Aaron copied in 
https://github.com/apache/spark/pull/1678
- ExternalAppendOnlyMap fixes to deal with writes past end of stream; we 
also need these in ExternalSorter
- Fixes to directory creation in DiskBlockManager (I'm still not sure when 
this would be a problem actually if all accesses to these directories are 
through getFile; needs some investigation)
- Fixes to isSymlink (though as is this seems like it would only compile on 
Java 7)
- Improvements to the API of DiskBlockObjectWriter

Of these, the first two are most critical. So I'd like to get those into 
1.1, and then we can do API refactoring and the other fixes on the master 
branch. For the directory creation fix I'd still like to understand when that 
can be a problem (I'm probably just missing something), but it's also one we 
can add in 1.1 during the QA window.

I'm going to update the JIRA to create sub-tasks for these things so we can 
track where each one is fixed. Thanks again for putting this together Mridul, 
this is very helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-08-01 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50943106
  
Opened https://github.com/apache/spark/pull/1722 to do the second fix (map 
batch writing) for 1.1, including applying the same fix to ExternalSorter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-31 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15662654
  
--- Diff: core/src/main/scala/org/apache/spark/util/Java7Util.scala ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.File
+
+/**
+ * Java 7 (or higher) specific util methods.
+ */
+object Java7Util {
+  def isSymlink(file: File) = 
java.nio.file.Files.isSymbolicLink(file.toPath)
+}
--- End diff --

Will this still compile on Java 6? I don't see any changes to pom.xml or 
anything to exclude it. Or maybe did you mean to call this by reflection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-31 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50804445
  
So FYI I'm going to make a more detailed pass through this soon to see if 
we can get all of it into 1.1. It would be nice to get all these changes in so 
we can QA them along with the other QA we do for 1.1, but if that doesn't work 
out, we can split some of them into smaller patches as Aaron did.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50600899
  
QA tests have started for PR 1609. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17443/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50604735
  
QA results for PR 1609:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17443/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-30 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15599910
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
--- End diff --

Alright, as long as you can confirm that it still happens without this. I 
just don't see how it could happen from the other code in there. I guess we can 
try disabling it after this fix goes in and see if things break.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-30 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50704781
  
I created #1678, which only includes the changes directly related to fixing 
the issues with shuffle file consolidation (essentially forking off a piece of 
this PR), intended as a simple candidate for review to make the 1.1 release. 
The smaller PR is not intended as a replacement for this more complete one, 
however -- it is merely an option to fix some of the more severe bugs in time 
for the next major release. If we can get this one in for 1.1 instead, then we 
should.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50455483
  
All pending fixes work be done.
I dont think there are any pieces missing in the merge from internal branch 
to master.
Open for review, thanks !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50455818
  
QA tests have started for PR 1609. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17352/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50460011
  
QA results for PR 1609:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17352/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15537366
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
@@ -40,7 +40,7 @@ private[spark] class JavaSerializationStream(out: 
OutputStream, counterReset: In
*/
   def writeObject[T: ClassTag](t: T): SerializationStream = {
 objOut.writeObject(t)
-if (counterReset  0  counter = counterReset) {
+if (counterReset = 0  counter = counterReset) {
--- End diff --

This was done only to support adding marker after each object has been 
written.
Only practical reason to do this is to test that part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15540734
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
@@ -116,8 +118,13 @@ class HashShuffleWriter[K, V](
   private def revertWrites(): Unit = {
 if (shuffle != null  shuffle.writers != null) {
   for (writer - shuffle.writers) {
-writer.revertPartialWrites()
-writer.close()
+try {
+  writer.revertPartialWritesAndClose()
+} catch {
+  // Ensure that all revert's get done - log exception and continue
+  case ex: Exception =
+logError(Exception reverting/closing writers, ex)
+}
   }
--- End diff --

revert/close can throw exception - causing other writers to be left hanging.
Hence, log and continue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15540782
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
@@ -71,7 +72,8 @@ class HashShuffleWriter[K, V](
 try {
   return Some(commitWritesAndBuildStatus())
 } catch {
-  case e: Exception =
+  case e: Throwable =
+success = false // for finally block
 revertWrites()
--- End diff --

If success != false, then release writers will attempt to register.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15540934
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException(Already closed)
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists  initiallyExists  0 != initialPosition  ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException(file  + file +  cleaned up ? exists =  + 
exists +
+  , initiallyExists =  + initiallyExists + , initialPosition = 
 + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug(Unable to open  + file + , canRetry =  + canRetry + 
, exists =  + exists +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown(), fEx)
+if (canRetry  ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
--- End diff --

As mentioned in the comments, this tries to retry once in case file could 
not be created due to lack of presence of directory (which is what the 
FileNotFoundException is usually for) : except when we are already in shutdown.
This is a case which happens due to some race in spark between creation of 
directory and allowing files to be created under that directory.

We have fixed a double checked locking bug below (in DiskBlockManager) but 
looks like it is not sufficient - since this was observed even after that.
(In our branch, the logDebug is actually logError to flush out these cases).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541003
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException(Already closed)
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists  initiallyExists  0 != initialPosition  ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException(file  + file +  cleaned up ? exists =  + 
exists +
+  , initiallyExists =  + initiallyExists + , initialPosition = 
 + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug(Unable to open  + file + , canRetry =  + canRetry + 
, exists =  + exists +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown(), fEx)
+if (canRetry  ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException(file cleaned up ?  + file.exists() + 
+  , initialpos =  + initialPosition +
+  current len =  + fosPos + , in shutdown ?  + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initialized) {
-  if (syncWrites) 

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541065
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -188,6 +425,39 @@ private[spark] class DiskBlockObjectWriter(
 
   // Only valid if called after commit()
   override def bytesWritten: Long = {
-lastValidPosition - initialPosition
+val retval = lastValidPosition - initialPosition
+
+assert(retval = 0 || Utils.inShutdown(),
+  exists =  + file.exists() + , bytesWritten =  + retval +
+  , lastValidPosition =  + lastValidPosition + , initialPosition = 
 + initialPosition +
+  , in shutdown =  + Utils.inShutdown())
+
+// TODO: Comment this out when we are done validating : can be 
expensive due to file.length()
+assert (file.length() = lastValidPosition || Utils.inShutdown(),
+  exists =  + file.exists() + , file len =  + file.length() +
+  , bytesWritten =  + retval + , lastValidPosition =  + 
lastValidPosition +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown())
+
+if (retval = 0) retval else 0
   }
 }
+
+object DiskBlockObjectWriter{
+
+  // Unfortunately, cant do it atomically ...
+  private def truncateIfExists(file: File, truncatePos: Long) {
+var fos: FileOutputStream = null
+try {
+  // There is no way to do this atomically iirc.
+  if (file.exists()  file.length() != truncatePos) {
+fos = new FileOutputStream(file, true)
+fos.getChannel.truncate(truncatePos)
+  }
+} finally {
+  if (fos ne null) {
+fos.close()
+  }
+}
+  }
+}
--- End diff --

Hopefully, rest of the code changes in this class is documented (a bit too 
heavily probably ? lighten it before final commit ?) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541257
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---
@@ -236,31 +241,61 @@ object ShuffleBlockManager {
   new PrimitiveVector[Long]()
 }
 
-def numBlocks = mapIdToIndex.size
+/*
+ * This is required for shuffle consolidation to work. In particular 
when updates to file are
+ * happening while parallel requests to fetch block happens.
+ */
+private val blockLengthsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
+  new PrimitiveVector[Long]()
+}
+
+private var numBlocks = 0
--- End diff --

The reason for change to var from def is perhaps subtle.
Consider the case of :

add for mapIdToIndex with mapId 0
add for mapIdToIndex with mapId 1
add for mapIdToIndex with mapId 0 (on re-execution)
add for mapIdToIndex with mapId 1 (on re-execution)

Now both 0 and 1 will end up with the same index assigned (since it was 
based on mapIdToIndex.size).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541435
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
 extends Iterator[(K, C)] {
-private val fileStream = new FileInputStream(file)
-private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
+
+assert (! batchSizes.isEmpty)
+assert (! batchSizes.exists(_ = 0))
+private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)
--- End diff --

This is to give us the starting offset for each batch; and not rely on 
where the last batch's read ended.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15542308
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -418,7 +459,25 @@ class ExternalAppendOnlyMap[K, V, C](
 
 // TODO: Ensure this gets called even if the iterator isn't drained.
 private def cleanup() {
-  deserializeStream.close()
+  batchIndex = batchOffsets.length
+  val dstrm = deserializeStream
+  val fstrm = fileStream
+  deserializeStream = null
+  fileStream = null
+
+  if (dstrm ne null) {
+try {
+  dstrm.close()
+} catch {
+  case ioEx: IOException = {
+// best case attempt - atleast free the handles
+if (fstrm ne null) {
+  try { fstrm.close() } catch {case ioEx: IOException = }
+}
+throw ioEx
+  }
+}
+  }
--- End diff --

This is just more defensive cleanup compared to earlier and setting 
batchIndex to unusable value.
To ensure that any close related exceptions do not result in unclosed files 
(which impacts ulimit of spark process)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50517754
  
I have added some comments to the PR in the hopes that it will aid in the 
review.

I am sure it is still involved process inspite of this, so please do feel 
free to raise as many queries as required : sometimes they might trigger 
unearthing some other issues as part of the discussion.
I want to ensure that we do not miss on any subtle issue here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15553165
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException(Already closed)
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists  initiallyExists  0 != initialPosition  ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException(file  + file +  cleaned up ? exists =  + 
exists +
+  , initiallyExists =  + initiallyExists + , initialPosition = 
 + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug(Unable to open  + file + , canRetry =  + canRetry + 
, exists =  + exists +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown(), fEx)
+if (canRetry  ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException(file cleaned up ?  + file.exists() + 
+  , initialpos =  + initialPosition +
+  current len =  + fosPos + , in shutdown ?  + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initialized) {
-  if (syncWrites) 

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15553214
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
@@ -96,9 +98,9 @@ class HashShuffleWriter[K, V](
 var totalBytes = 0L
 var totalTime = 0L
 val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter 
=
-  writer.commit()
-  writer.close()
+  writer.commitAndClose()
   val size = writer.fileSegment().length
+  assert (size = 0)
--- End diff --

Nit: no space after assert (it's not a keyword)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15553264
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
--- End diff --

Any idea on how this could happen? I don't fully understand it, did someone 
do `new File(fullPath)` and correctly create it before the directory was made?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15553299
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
--- End diff --

BTW we could also add a lock around what creates the directories.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15553689
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
 }
 false
   }
+  /*
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
+
+  def inShutdown(): Boolean = {
+if (shutdownStarted) return true
+doShutdownCheck()
+shutdownStarted
+  }
+
+  private[spark] def doShutdownCheck() {
+var shutdown = false
+try {
+  val hook = new Thread {
+override def run() {}
+  }
+  Runtime.getRuntime.addShutdownHook(hook)
+  Runtime.getRuntime.removeShutdownHook(hook)
+} catch {
+  case ise: IllegalStateException =
+shutdown = true
+} finally {
+  shutdownStarted = shutdown
+}
+  }
+  */
--- End diff --

IMO we should leave this out of the patch if it's dead code. You can open a 
JIRA with it instead and attach the code there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15565447
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
--- End diff --

As I mentioned above, we did fix a dcl bug, bug that did not seem 
sufficient.
I agree this is a rare condition, and the 'fix' is a hack to workaround the 
problem : but pending identifying root cause, this is the best we have 
unfortunately.
Any thoughts ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15565486
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException(Already closed)
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists  initiallyExists  0 != initialPosition  ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException(file  + file +  cleaned up ? exists =  + 
exists +
+  , initiallyExists =  + initiallyExists + , initialPosition = 
 + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug(Unable to open  + file + , canRetry =  + canRetry + 
, exists =  + exists +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown(), fEx)
+if (canRetry  ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException(file cleaned up ?  + file.exists() + 
+  , initialpos =  + initialPosition +
+  current len =  + fosPos + , in shutdown ?  + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initialized) {
-  if (syncWrites) 

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15565552
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
 }
 false
   }
+  /*
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
+
+  def inShutdown(): Boolean = {
+if (shutdownStarted) return true
+doShutdownCheck()
+shutdownStarted
+  }
+
+  private[spark] def doShutdownCheck() {
+var shutdown = false
+try {
+  val hook = new Thread {
+override def run() {}
+  }
+  Runtime.getRuntime.addShutdownHook(hook)
+  Runtime.getRuntime.removeShutdownHook(hook)
+} catch {
+  case ise: IllegalStateException =
+shutdown = true
+} finally {
+  shutdownStarted = shutdown
+}
+  }
+  */
--- End diff --

Sure, will move it out when I push next.
It becomes directly relevant to this patch since there are assertions which 
check for either file/directory being in expected state or VM is in shutdown 
(and so cleanup happened/is happening - which caused file deletions).
For VM shutdown, this is just an optimization - but for shutdown ordered by 
driver, inShutdown will return false, but same codepath as shutdown is invoked 
by spark (stop on various subsystems) : resulting in exceptions/assertion 
failures in threads which are still running.

Unfortunately, this diff interacts badly with local mode - particularly 
tests, since it keeps reusing the same VM.
Any ideas on how to 'fix' or resolve this ? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---