[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50363390 @mridulm Thanks for the clarifications! Those make sense and are some tricky edge cases. I will begin reviewing the code as soon as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50311354 QA results for PR 1609:- This patch PASSES unit tests.- This patch merges cleanly- This patch adds no public classesFor more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17279/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50308229 QA tests have started for PR 1609. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17279/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15449433 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging { } false } + /* + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } + + def inShutdown(): Boolean = { +if (shutdownStarted) return true +doShutdownCheck() +shutdownStarted + } + + private[spark] def doShutdownCheck() { +var shutdown = false +try { + val hook = new Thread { +override def run() {} + } + Runtime.getRuntime.addShutdownHook(hook) + Runtime.getRuntime.removeShutdownHook(hook) +} catch { + case ise: IllegalStateException => +shutdown = true +} finally { + shutdownStarted = shutdown +} + } + */ --- End diff -- This (and related commented inShutdown references) has been left around in case someone else can suggest an improvement ! The issue is, this works fine in 'normal' use : but while running spark tests in local mode, it fails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50307411 QA results for PR 1609:- This patch FAILED unit tests.- This patch merges cleanly- This patch adds no public classesFor more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17277/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50307247 QA tests have started for PR 1609. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17277/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50307155 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm closed the pull request at: https://github.com/apache/spark/pull/1609 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50306633 @witgo I did not understand the space issue : stylecheck seems to run fine. Regarding the actual issues : the JIRA lists some of them - unfortunately it is not exhaustive. Spark code assumes a few things : 1) A flush followed by close should not cause additional data to be written to the stream - which is not valid in general case (close can still write more data). 2) reading an object from stream will consume all data written as part of the object - which is not valid in general case, additional info could have been written after the object was written (like reset markers in java serde). So stream wrapping has to account for that. 3) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
GitHub user mridulm reopened a pull request: https://github.com/apache/spark/pull/1609 [SPARK-2532] WIP Consolidated shuffle fixes Status of the PR - [X] Cherry pick and merge changes from internal branch to spark master - [X] Remove WIP comments and 2G branch references. - [X] Tests for BlockObjectWriter - [ ] Tests for ExternalAppendOnlyMap - [x] Tests for ShuffleBlockManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1609.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1609 commit f1182f8a3d3328248d471038d6ab0db6e6a1396d Author: Mridul Muralidharan Date: 2014-07-27T15:23:05Z Consolidated shuffle fixes commit 66d6ec3f99882ad6062c5bff36f2edb82b0c24c0 Author: Mridul Muralidharan Date: 2014-07-27T15:40:32Z Add missing setShutdownStarted hooks commit 027c7f18c44c57960a2a94eee961f0aa811e7a34 Author: Mridul Muralidharan Date: 2014-07-27T16:05:31Z stylecheck fixes commit 195c529c1ae5ffa7e8f9cf6af4df8b9536a39d6a Author: Mridul Muralidharan Date: 2014-07-27T23:46:53Z Fix build, add testcases for DiskBlockManagerSuite commit 6095545bf55a87ef0b28bc11adc63dcc5b661b6c Author: Mridul Muralidharan Date: 2014-07-27T23:50:45Z Consolidated fixes commit 1c1faea69d9709c7e65afc9bdd13a8e0d5488c82 Author: Mridul Muralidharan Date: 2014-07-27T23:50:50Z Merge branch 'consolidated_shuffle_fixes' of github.com:mridulm/spark into consolidated_shuffle_fixes commit fbf20f792baf7ab8d6705c7a9525c2db92bb7ae3 Author: Mridul Muralidharan Date: 2014-07-28T06:59:45Z Disable code to detect programming shutdown via stop's. Make actor/store shutdown in DiskBlockManagerSuite more robust --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50306648 Accidental close, apologies ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15448803 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -935,15 +941,22 @@ private[spark] object Utils extends Logging { * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing * an IllegalStateException. */ + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } def inShutdown(): Boolean = { try { + if (shutdownStarted) return true --- End diff -- Unfortunately, had to be reverted :-( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50306230 QA results for PR 1609:- This patch FAILED unit tests.- This patch merges cleanly- This patch adds no public classesFor more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17276/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50306070 QA tests have started for PR 1609. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17276/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50305416 @mridulm Thanks for submitting this! I would like to dig a little deeper into understanding the specific issues you found, in order to understand the solutions you have provided (since the specific solutions seem interleaved with a lot of new asserts and code paths). You mention that there was an issue if shuffle writes co-occur with shuffle fetches, which is true, but should not typically occur due to the barrier before the reduce stage of a shuffle. In what situations does this happen (outside of failure conditions)? Did you observe a prior pattern of close/revert/close on the same block writer? How did task failures induce inconsistent state on the map side? Was it due to the same close/revert/close pattern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447535 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C]( */ private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) extends Iterator[(K, C)] { -private val fileStream = new FileInputStream(file) -private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + +assert (! batchSizes.isEmpty) --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447541 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C]( */ private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) extends Iterator[(K, C)] { -private val fileStream = new FileInputStream(file) -private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + +assert (! batchSizes.isEmpty) +assert (! batchSizes.exists(_ <= 0)) --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447510 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -935,15 +941,22 @@ private[spark] object Utils extends Logging { * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing * an IllegalStateException. */ + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } def inShutdown(): Boolean = { try { + if (shutdownStarted) return true --- End diff -- This is a very nice improvement --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447449 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException("file " + file + " cleaned up ? exists = " + exists + + ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException => +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists + + ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx) +if (canRetry && ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException("file cleaned up ? " + file.exists() + + ", initialpos = " + initialPosition + + "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initi
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447430 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException("file " + file + " cleaned up ? exists = " + exists + + ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException => +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists + + ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx) +if (canRetry && ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException("file cleaned up ? " + file.exists() + + ", initialpos = " + initialPosition + + "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initi
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447436 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException("file " + file + " cleaned up ? exists = " + exists + + ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException => +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists + + ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx) +if (canRetry && ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException("file cleaned up ? " + file.exists() + + ", initialpos = " + initialPosition + + "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initi
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447401 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) { --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447402 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException("file " + file + " cleaned up ? exists = " + exists + + ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException => +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists + + ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx) +if (canRetry && ! Utils.inShutdown()) { --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447397 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException("file " + file + " cleaned up ? exists = " + exists + + ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException => +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists + + ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx) +if (canRetry && ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException("file cleaned up ? " + file.exists() + + ", initialpos = " + initialPosition + + "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initi
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15447384 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) --- End diff -- One more spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15446921 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -176,10 +406,17 @@ private[spark] class DiskBlockObjectWriter( if (!initialized) { open() } +// Not checking if closed on purpose ... introduce it ? No usecase for it right now. objOut.writeObject(value) } override def fileSegment(): FileSegment = { +assert (! initialized) --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15446910 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() && file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException("Already closed") + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException("file " + file + " cleaned up ? exists = " + exists + + ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException => +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists + + ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx) +if (canRetry && ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException("file cleaned up ? " + file.exists() + + ", initialpos = " + initialPosition + + "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { --- End diff -- One more Spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50291239 QA results for PR 1609:- This patch FAILED unit tests.- This patch merges cleanly- This patch adds no public classesFor more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17256/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50290580 Did you mean @aarondav? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50290037 QA tests have started for PR 1609. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17256/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15442779 --- Diff: core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala --- @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite +import java.io.{IOException, FileOutputStream, OutputStream, File} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** + * Test various code paths in DiskBlockObjectWriter + */ +class DiskBlockObjectWriterSuite extends FunSuite { + + private val conf = new SparkConf + private val BUFFER_SIZE = 32 * 1024 + + private def tempFile(): File = { +val file = File.createTempFile("temp_", "block") +// We dont want file to exist ! Just need a temp file name +file.delete() +file + } + + private def createWriter(file: File = tempFile()) : + (File, DiskBlockObjectWriter) = { +file.deleteOnExit() + +(file, new DiskBlockObjectWriter(BlockId("test_1"), file, + new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) => out, true)) + } + + + test("write after close should throw IOException") { +val (file, bow) = createWriter() +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.commitAndClose() + +intercept[IOException] { + bow.write("test2") +} + +file.delete() + } + + test("write after revert should throw IOException") { +val (file, bow) = createWriter() +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.revertPartialWritesAndClose() + +intercept[IOException] { + bow.write("test2") +} + +file.delete() + } + + test("create even if directory does not exist") { +val dir = File.createTempFile("temp_", "dir") +dir.delete() + +val file = new File(dir, "temp.file") +file.deleteOnExit() + +val bow = new DiskBlockObjectWriter(BlockId("test_1"), file, new JavaSerializer(conf), + BUFFER_SIZE, (out: OutputStream) => out, true) + +bow.write("test") +assert (file.exists() && file.isFile) +bow.commitAndClose() +Utils.deleteRecursively(dir) + } + + test("revert of new file should delete it") { +val (file, bow) = createWriter() +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.revertPartialWritesAndClose() +assert (! file.exists()) +// file.delete() + } + + test("revert of existing file should revert it to previous state") { +val (file, bow1) = createWriter() + +bow1.write("test") +bow1.write("test1") +assert (file.exists() && file.isFile) + +bow1.commitAndClose() +val length = file.length() + +// reopen same file. +val bow2 = createWriter(file)._2 + +bow2.write("test3") +bow2.write("test4") + +assert (file.exists() && file.isFile) + +bow2.revertPartialWritesAndClose() +assert (file.exists()) +assert (length == file.length()) +file.delete() + } + + test("revert of writer after close should delete if it did not exist earlier") { +val (file, bow) = createWriter(tempFile()) + +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.commitAndClose() +val length = file.length() + +assert (file.exists() && file.isFile) +assert (length > 0) + +// Now revert the file, after it has been closed : should delete the file +// since it did not exist earlier. +b
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50289340 @adav @andrewor14 would be good if you two take a look at this when it's merging correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15442565 --- Diff: core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala --- @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite +import java.io.{IOException, FileOutputStream, OutputStream, File} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** + * Test various code paths in DiskBlockObjectWriter + */ +class DiskBlockObjectWriterSuite extends FunSuite { + + private val conf = new SparkConf + private val BUFFER_SIZE = 32 * 1024 + + private def tempFile(): File = { +val file = File.createTempFile("temp_", "block") +// We dont want file to exist ! Just need a temp file name +file.delete() +file + } + + private def createWriter(file: File = tempFile()) : + (File, DiskBlockObjectWriter) = { +file.deleteOnExit() + +(file, new DiskBlockObjectWriter(BlockId("test_1"), file, + new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) => out, true)) + } + + + test("write after close should throw IOException") { +val (file, bow) = createWriter() +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.commitAndClose() + +intercept[IOException] { + bow.write("test2") +} + +file.delete() + } + + test("write after revert should throw IOException") { +val (file, bow) = createWriter() +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.revertPartialWritesAndClose() + +intercept[IOException] { + bow.write("test2") +} + +file.delete() + } + + test("create even if directory does not exist") { +val dir = File.createTempFile("temp_", "dir") +dir.delete() + +val file = new File(dir, "temp.file") +file.deleteOnExit() + +val bow = new DiskBlockObjectWriter(BlockId("test_1"), file, new JavaSerializer(conf), + BUFFER_SIZE, (out: OutputStream) => out, true) + +bow.write("test") +assert (file.exists() && file.isFile) +bow.commitAndClose() +Utils.deleteRecursively(dir) + } + + test("revert of new file should delete it") { +val (file, bow) = createWriter() +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.revertPartialWritesAndClose() +assert (! file.exists()) +// file.delete() + } + + test("revert of existing file should revert it to previous state") { +val (file, bow1) = createWriter() + +bow1.write("test") +bow1.write("test1") +assert (file.exists() && file.isFile) + +bow1.commitAndClose() +val length = file.length() + +// reopen same file. +val bow2 = createWriter(file)._2 + +bow2.write("test3") +bow2.write("test4") + +assert (file.exists() && file.isFile) + +bow2.revertPartialWritesAndClose() +assert (file.exists()) +assert (length == file.length()) +file.delete() + } + + test("revert of writer after close should delete if it did not exist earlier") { +val (file, bow) = createWriter(tempFile()) + +bow.write("test") +bow.write("test1") +assert (file.exists() && file.isFile) + +bow.commitAndClose() +val length = file.length() + +assert (file.exists() && file.isFile) +assert (length > 0) + +// Now revert the file, after it has been closed : should delete the file +// since it did not exist earlier. +bo
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50278406 QA results for PR 1609:- This patch FAILED unit tests.- This patch merges cleanly- This patch adds no public classesFor more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17244/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50277674 QA tests have started for PR 1609. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17244/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50274618 QA tests have started for PR 1609. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17243/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50274622 QA results for PR 1609:- This patch FAILED unit tests.- This patch merges cleanly- This patch adds no public classesFor more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17243/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50267540 QA tests have started for PR 1609. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17242/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50267547 QA results for PR 1609:- This patch FAILED unit tests.- This patch merges cleanly- This patch adds no public classesFor more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17242/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
GitHub user mridulm opened a pull request: https://github.com/apache/spark/pull/1609 [SPARK-2532] WIP Consolidated shuffle fixes Status of the PR - [X] Cherry pick and merge changes from internal branch to spark master - [X] Remove WIP comments and 2G branch references. - [X] Tests for BlockObjectWriter - [ ] Tests for ExternalAppendOnlyMap - [ ] Tests for MapOutputTracker - [ ] Tests for ShuffleBlockManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1609.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1609 commit f1182f8a3d3328248d471038d6ab0db6e6a1396d Author: Mridul Muralidharan Date: 2014-07-27T15:23:05Z Consolidated shuffle fixes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---