[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-54383344 @mridulm Are the issues in this PR taken care of by #1722 and and #1678? Do we still need this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-54393908 I think it got split into four issues, two of which got committed, not sure of the other other two And the first one was regressed upon in 1.1.already. But this or probably is defunct now Will close On 04-Sep-2014 5:03 am, andrewor14 notificati...@github.com wrote: @mridulm https://github.com/mridulm Are the issues in this PR taken care of by #1722 https://github.com/apache/spark/pull/1722 and and #1678 https://github.com/apache/spark/pull/1678? Do we still need this PR? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1609#issuecomment-54383344. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm closed the pull request at: https://github.com/apache/spark/pull/1609 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50929329 So I looked through this and I also think it would be good to split it into smaller patches for 1.1. As far as I can see there are several orthogonal improvements here: - Shuffle file consolidation fixes that Aaron copied in https://github.com/apache/spark/pull/1678 - ExternalAppendOnlyMap fixes to deal with writes past end of stream; we also need these in ExternalSorter - Fixes to directory creation in DiskBlockManager (I'm still not sure when this would be a problem actually if all accesses to these directories are through getFile; needs some investigation) - Fixes to isSymlink (though as is this seems like it would only compile on Java 7) - Improvements to the API of DiskBlockObjectWriter Of these, the first two are most critical. So I'd like to get those into 1.1, and then we can do API refactoring and the other fixes on the master branch. For the directory creation fix I'd still like to understand when that can be a problem (I'm probably just missing something), but it's also one we can add in 1.1 during the QA window. I'm going to update the JIRA to create sub-tasks for these things so we can track where each one is fixed. Thanks again for putting this together Mridul, this is very helpful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50943106 Opened https://github.com/apache/spark/pull/1722 to do the second fix (map batch writing) for 1.1, including applying the same fix to ExternalSorter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15662654 --- Diff: core/src/main/scala/org/apache/spark/util/Java7Util.scala --- @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.File + +/** + * Java 7 (or higher) specific util methods. + */ +object Java7Util { + def isSymlink(file: File) = java.nio.file.Files.isSymbolicLink(file.toPath) +} --- End diff -- Will this still compile on Java 6? I don't see any changes to pom.xml or anything to exclude it. Or maybe did you mean to call this by reflection? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50804445 So FYI I'm going to make a more detailed pass through this soon to see if we can get all of it into 1.1. It would be nice to get all these changes in so we can QA them along with the other QA we do for 1.1, but if that doesn't work out, we can split some of them into smaller patches as Aaron did. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50600899 QA tests have started for PR 1609. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17443/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50604735 QA results for PR 1609:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17443/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15599910 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. --- End diff -- Alright, as long as you can confirm that it still happens without this. I just don't see how it could happen from the other code in there. I guess we can try disabling it after this fix goes in and see if things break. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50704781 I created #1678, which only includes the changes directly related to fixing the issues with shuffle file consolidation (essentially forking off a piece of this PR), intended as a simple candidate for review to make the 1.1 release. The smaller PR is not intended as a replacement for this more complete one, however -- it is merely an option to fix some of the more severe bugs in time for the next major release. If we can get this one in for 1.1 instead, then we should. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50455483 All pending fixes work be done. I dont think there are any pieces missing in the merge from internal branch to master. Open for review, thanks ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50455818 QA tests have started for PR 1609. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17352/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50460011 QA results for PR 1609:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17352/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15537366 --- Diff: core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala --- @@ -40,7 +40,7 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In */ def writeObject[T: ClassTag](t: T): SerializationStream = { objOut.writeObject(t) -if (counterReset 0 counter = counterReset) { +if (counterReset = 0 counter = counterReset) { --- End diff -- This was done only to support adding marker after each object has been written. Only practical reason to do this is to test that part. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15540734 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala --- @@ -116,8 +118,13 @@ class HashShuffleWriter[K, V]( private def revertWrites(): Unit = { if (shuffle != null shuffle.writers != null) { for (writer - shuffle.writers) { -writer.revertPartialWrites() -writer.close() +try { + writer.revertPartialWritesAndClose() +} catch { + // Ensure that all revert's get done - log exception and continue + case ex: Exception = +logError(Exception reverting/closing writers, ex) +} } --- End diff -- revert/close can throw exception - causing other writers to be left hanging. Hence, log and continue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15540782 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala --- @@ -71,7 +72,8 @@ class HashShuffleWriter[K, V]( try { return Some(commitWritesAndBuildStatus()) } catch { - case e: Exception = + case e: Throwable = +success = false // for finally block revertWrites() --- End diff -- If success != false, then release writers will attempt to register. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15540934 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException(Already closed) + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists initiallyExists 0 != initialPosition ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException(file + file + cleaned up ? exists = + exists + + , initiallyExists = + initiallyExists + , initialPosition = + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException = +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug(Unable to open + file + , canRetry = + canRetry + , exists = + exists + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown(), fEx) +if (canRetry ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) --- End diff -- As mentioned in the comments, this tries to retry once in case file could not be created due to lack of presence of directory (which is what the FileNotFoundException is usually for) : except when we are already in shutdown. This is a case which happens due to some race in spark between creation of directory and allowing files to be created under that directory. We have fixed a double checked locking bug below (in DiskBlockManager) but looks like it is not sufficient - since this was observed even after that. (In our branch, the logDebug is actually logError to flush out these cases). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541003 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException(Already closed) + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists initiallyExists 0 != initialPosition ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException(file + file + cleaned up ? exists = + exists + + , initiallyExists = + initiallyExists + , initialPosition = + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException = +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug(Unable to open + file + , canRetry = + canRetry + , exists = + exists + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown(), fEx) +if (canRetry ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException(file cleaned up ? + file.exists() + + , initialpos = + initialPosition + + current len = + fosPos + , in shutdown ? + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initialized) { - if (syncWrites)
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541065 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -188,6 +425,39 @@ private[spark] class DiskBlockObjectWriter( // Only valid if called after commit() override def bytesWritten: Long = { -lastValidPosition - initialPosition +val retval = lastValidPosition - initialPosition + +assert(retval = 0 || Utils.inShutdown(), + exists = + file.exists() + , bytesWritten = + retval + + , lastValidPosition = + lastValidPosition + , initialPosition = + initialPosition + + , in shutdown = + Utils.inShutdown()) + +// TODO: Comment this out when we are done validating : can be expensive due to file.length() +assert (file.length() = lastValidPosition || Utils.inShutdown(), + exists = + file.exists() + , file len = + file.length() + + , bytesWritten = + retval + , lastValidPosition = + lastValidPosition + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown()) + +if (retval = 0) retval else 0 } } + +object DiskBlockObjectWriter{ + + // Unfortunately, cant do it atomically ... + private def truncateIfExists(file: File, truncatePos: Long) { +var fos: FileOutputStream = null +try { + // There is no way to do this atomically iirc. + if (file.exists() file.length() != truncatePos) { +fos = new FileOutputStream(file, true) +fos.getChannel.truncate(truncatePos) + } +} finally { + if (fos ne null) { +fos.close() + } +} + } +} --- End diff -- Hopefully, rest of the code changes in this class is documented (a bit too heavily probably ? lighten it before final commit ?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541257 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala --- @@ -236,31 +241,61 @@ object ShuffleBlockManager { new PrimitiveVector[Long]() } -def numBlocks = mapIdToIndex.size +/* + * This is required for shuffle consolidation to work. In particular when updates to file are + * happening while parallel requests to fetch block happens. + */ +private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + new PrimitiveVector[Long]() +} + +private var numBlocks = 0 --- End diff -- The reason for change to var from def is perhaps subtle. Consider the case of : add for mapIdToIndex with mapId 0 add for mapIdToIndex with mapId 1 add for mapIdToIndex with mapId 0 (on re-execution) add for mapIdToIndex with mapId 1 (on re-execution) Now both 0 and 1 will end up with the same index assigned (since it was based on mapIdToIndex.size). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541435 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C]( */ private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) extends Iterator[(K, C)] { -private val fileStream = new FileInputStream(file) -private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + +assert (! batchSizes.isEmpty) +assert (! batchSizes.exists(_ = 0)) +private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) --- End diff -- This is to give us the starting offset for each batch; and not rely on where the last batch's read ended. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15542308 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -418,7 +459,25 @@ class ExternalAppendOnlyMap[K, V, C]( // TODO: Ensure this gets called even if the iterator isn't drained. private def cleanup() { - deserializeStream.close() + batchIndex = batchOffsets.length + val dstrm = deserializeStream + val fstrm = fileStream + deserializeStream = null + fileStream = null + + if (dstrm ne null) { +try { + dstrm.close() +} catch { + case ioEx: IOException = { +// best case attempt - atleast free the handles +if (fstrm ne null) { + try { fstrm.close() } catch {case ioEx: IOException = } +} +throw ioEx + } +} + } --- End diff -- This is just more defensive cleanup compared to earlier and setting batchIndex to unusable value. To ensure that any close related exceptions do not result in unclosed files (which impacts ulimit of spark process) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50517754 I have added some comments to the PR in the hopes that it will aid in the review. I am sure it is still involved process inspite of this, so please do feel free to raise as many queries as required : sometimes they might trigger unearthing some other issues as part of the discussion. I want to ensure that we do not miss on any subtle issue here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15553165 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException(Already closed) + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists initiallyExists 0 != initialPosition ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException(file + file + cleaned up ? exists = + exists + + , initiallyExists = + initiallyExists + , initialPosition = + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException = +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug(Unable to open + file + , canRetry = + canRetry + , exists = + exists + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown(), fEx) +if (canRetry ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException(file cleaned up ? + file.exists() + + , initialpos = + initialPosition + + current len = + fosPos + , in shutdown ? + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initialized) { - if (syncWrites)
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15553214 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala --- @@ -96,9 +98,9 @@ class HashShuffleWriter[K, V]( var totalBytes = 0L var totalTime = 0L val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter = - writer.commit() - writer.close() + writer.commitAndClose() val size = writer.fileSegment().length + assert (size = 0) --- End diff -- Nit: no space after assert (it's not a keyword) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15553264 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. --- End diff -- Any idea on how this could happen? I don't fully understand it, did someone do `new File(fullPath)` and correctly create it before the directory was made? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15553299 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. --- End diff -- BTW we could also add a lock around what creates the directories. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15553689 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging { } false } + /* + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } + + def inShutdown(): Boolean = { +if (shutdownStarted) return true +doShutdownCheck() +shutdownStarted + } + + private[spark] def doShutdownCheck() { +var shutdown = false +try { + val hook = new Thread { +override def run() {} + } + Runtime.getRuntime.addShutdownHook(hook) + Runtime.getRuntime.removeShutdownHook(hook) +} catch { + case ise: IllegalStateException = +shutdown = true +} finally { + shutdownStarted = shutdown +} + } + */ --- End diff -- IMO we should leave this out of the patch if it's dead code. You can open a JIRA with it instead and attach the code there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15565447 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. --- End diff -- As I mentioned above, we did fix a dcl bug, bug that did not seem sufficient. I agree this is a rare condition, and the 'fix' is a hack to workaround the problem : but pending identifying root cause, this is the best we have unfortunately. Any thoughts ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15565486 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException(Already closed) + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists initiallyExists 0 != initialPosition ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException(file + file + cleaned up ? exists = + exists + + , initiallyExists = + initiallyExists + , initialPosition = + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException = +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug(Unable to open + file + , canRetry = + canRetry + , exists = + exists + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown(), fEx) +if (canRetry ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException(file cleaned up ? + file.exists() + + , initialpos = + initialPosition + + current len = + fosPos + , in shutdown ? + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initialized) { - if (syncWrites)
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15565552 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging { } false } + /* + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } + + def inShutdown(): Boolean = { +if (shutdownStarted) return true +doShutdownCheck() +shutdownStarted + } + + private[spark] def doShutdownCheck() { +var shutdown = false +try { + val hook = new Thread { +override def run() {} + } + Runtime.getRuntime.addShutdownHook(hook) + Runtime.getRuntime.removeShutdownHook(hook) +} catch { + case ise: IllegalStateException = +shutdown = true +} finally { + shutdownStarted = shutdown +} + } + */ --- End diff -- Sure, will move it out when I push next. It becomes directly relevant to this patch since there are assertions which check for either file/directory being in expected state or VM is in shutdown (and so cleanup happened/is happening - which caused file deletions). For VM shutdown, this is just an optimization - but for shutdown ordered by driver, inShutdown will return false, but same codepath as shutdown is invoked by spark (stop on various subsystems) : resulting in exceptions/assertion failures in threads which are still running. Unfortunately, this diff interacts badly with local mode - particularly tests, since it keeps reusing the same VM. Any ideas on how to 'fix' or resolve this ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---