[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter's readingItera...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r236083618 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C]( spills.clear() forceSpillFiles.foreach(s => s.file.delete()) forceSpillFiles.clear() -if (map != null || buffer != null) { +if (map != null || buffer != null || readingIterator != null) { map = null // So that the memory can be garbage-collected buffer = null // So that the memory can be garbage-collected + readingIterator = null // So that the memory can be garbage-collected --- End diff -- I've added [test case for CompletionIterator](https://github.com/apache/spark/pull/23083/files#diff-444ed6b5e5333c3359cecec7d082396dR50). Regarding `ExternalSorter` - taking into account that only the private api has been changed and there no similar test cases which verify that private `map` and `buffer` fields are set to `null` after sorter stops, don't you think that already existing tests cover the situation with `readingIterator` too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/23083 Hi @cloud-fan > Looking at the code, we are trying to fix 2 memory leaks: the task completion listener in ShuffleBlockFetcherIterator, and the CompletionIterator. If that's case, can you say that in the PR description? I've updated the description and the title of this PR correspondingly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r236057101 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C]( spills.clear() forceSpillFiles.foreach(s => s.file.delete()) forceSpillFiles.clear() -if (map != null || buffer != null) { +if (map != null || buffer != null || readingIterator != null) { map = null // So that the memory can be garbage-collected buffer = null // So that the memory can be garbage-collected + readingIterator = null // So that the memory can be garbage-collected --- End diff -- @advancedxy I've tried to remove all the modifications except for this one and got OutOfMemoryErrors once again. Here are the details: 1. Now there are 4 `ExternalSorter` remained 2 of them are not closed ones ... ![1_readingiterator_isnull_nonclosed_externalsorter](https://user-images.githubusercontent.com/1523889/48973288-2218d180-f04d-11e8-9329-27b3edf33c48.png) and 2 of them are closed ones ... ![2_readingiterator_isnull_closed_externalsorter](https://user-images.githubusercontent.com/1523889/48973295-483e7180-f04d-11e8-83cf-23361515363f.png) as expected 2. There are 2 `SpillableIterator`s (which consume a significant part of memory) of already closed `ExternalSorter`s remained ![4_readingiterator_isnull_spillableiterator_of_closed_externalsorter](https://user-images.githubusercontent.com/1523889/48973318-cf8be500-f04d-11e8-912f-74be7420ca95.png) 3. These `SpillableIterator`s are referenced by `CompletionIterator`s ... ![6_completioniterator_of_blockstoreshufflereader](https://user-images.githubusercontent.com/1523889/48973357-a6b81f80-f04e-11e8-810f-dc8941430f34.png) ... which in their order seem to be referenced by the `cur` field ... ![7_coalescedrdd_compute_flatmap](https://user-images.githubusercontent.com/1523889/48973491-7e7df000-f051-11e8-8864-7e9e7f3f994b.png) ... of the standard `Iterator`'s `flatMap` that is used in the `compute` method of `CoalescedRDD` ![image](https://user-images.githubusercontent.com/1523889/48973401-7fae1d80-f04f-11e8-8cf2-043c808173d9.png) Standard `Iterator`'s `flatMap` does not clean up its `cur` field before obtaining the next value for it which in its order will consume quite a lot of memory too ![image](https://user-images.githubusercontent.com/1523889/48973418-dfa4c400-f04f-11e8-8f0e-b464567d43de.png) .. and in case of Spark that means that the previous iterator consuming the memory will live there while fetching the next value for it ![8_coalescedrdd_compute_flatmap_cur_isnotassigned](https://user-images.githubusercontent.com/1523889/48974089-0000-f05f-11e8-8319-f7d1f778f381.png) So I've returned the changes made to the `CompletionIterator` to reassign the reference of its sub-iterator to the `empty` iterator ... ![image](https://user-images.githubusercontent.com/1523889/48973472-27781b00-f051-11e8-86e1-cd6b87cd114b.png) ... and that has helped. P.S. I believe that cleaning up the standard `flatMap`'s iterator `cur` field before calling `nextCur` could help too ```scala def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] { private var cur: Iterator[B] = empty private def nextCur() { cur = f(self.next()).toIterator } def hasNext: Boolean = { // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext } // but slightly shorter bytecode (better JVM inlining!) while (!cur.hasNext) { cur = empty if (!self.hasNext) return false nextCur() } true } def next(): B = (if (hasNext) cur else empty).next() } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235769204 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -103,11 +116,26 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) +val taskListener = new TaskCompletionListener { + override def onTaskCompletion(context: TaskContext): Unit = sorter.stop() +} // Use completion callback to stop sorter if task was finished/cancelled. -context.addTaskCompletionListener[Unit](_ => { - sorter.stop() -}) -CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) +context.addTaskCompletionListener(taskListener) +CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]]( + sorter.iterator, + { +sorter.stop() +// remove task completion listener as soon as the sorter stops to prevent holding +// its references till the end of the task which may lead to memory leaks, for +// example, in case of processing multiple ShuffledRDDPartitions by a single task +// like in case of CoalescedRDD occurred after the ShuffledRDD in the same stage +// (e.g. rdd.repartition(1000).coalesce(10)); +// note that holding sorter references till the end of the task also holds +// references to PartitionedAppendOnlyMap and PartitionedPairBuffer too and these +// ones may consume a significant part of the available memory +context.remoteTaskCompletionListener(taskListener) --- End diff -- Great question! Honestly speaking I don't have pretty good solution right now. TaskCompletionListener stops sorter in case of task failures, cancels, etc., i.e. in case of abnormal termination. In "happy path" case task completion listener is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235759169 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -72,7 +73,8 @@ final class ShuffleBlockFetcherIterator( maxBlocksInFlightPerAddress: Int, maxReqSizeShuffleToMem: Long, detectCorrupt: Boolean) - extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging { + extends Iterator[(BlockId, InputStream)] with DownloadFileManager with TaskCompletionListener --- End diff -- Introduced [the corresponding field](https://github.com/apache/spark/pull/23083/files#diff-27109eb30a77542d377c936e0d134420R156). ```scala /** * Task completion callback to be called in both success as well as failure cases to cleanup. * It may not be called at all in case the `cleanup` method has already been called before * task completion. */ private[this] val cleanupTaskCompletionListener = (_: TaskContext) => cleanup() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235757779 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -99,6 +99,13 @@ private[spark] class TaskContextImpl( this } + override def remoteTaskCompletionListener(listener: TaskCompletionListener) + : this.type = synchronized { +onCompleteCallbacks -= listener --- End diff -- Replaced ArrayBuffer with LinkedHashSet. Thank you! Another interesting question is why these collections are traversed in the reverse order in [invokeLisneters](https://github.com/apache/spark/pull/23083/files#diff-df60223d4208aff4c67335528a55154cR135) like the following ```scala private def invokeListeners[T]( listeners: Seq[T], name: String, error: Option[Throwable])( callback: T => Unit): Unit = { val errorMsgs = new ArrayBuffer[String](2) // Process callbacks in the reverse order of registration listeners.reverse.foreach { listener => ... } } ``` I believe @hvanhovell could help to understand. @hvanhovell Could you please remind why task completion and error listeners are traversed in the reverse order (you seem to the the one who added the corresponding line)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on the issue: https://github.com/apache/spark/pull/23083 > So do you mean CoGroupRDDs with multiple input sources will have similar problems? Yep, but a little bit different ones > If so, can you create another Jira? Will do it shortly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235319165 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -72,7 +73,8 @@ final class ShuffleBlockFetcherIterator( maxBlocksInFlightPerAddress: Int, maxReqSizeShuffleToMem: Long, detectCorrupt: Boolean) - extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging { + extends Iterator[(BlockId, InputStream)] with DownloadFileManager with TaskCompletionListener --- End diff -- The main reason is that TaskCompletionListener is added in one place (in `initialize` method) and needs to be removed in another one (in `cleanup` method). ![image](https://user-images.githubusercontent.com/1523889/48833554-abe64780-ed8c-11e8-9da0-ef826918a275.png) Will introduce a field for TaskCompletionListener instead. Thank you! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235314949 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -99,6 +99,13 @@ private[spark] class TaskContextImpl( this } + override def remoteTaskCompletionListener(listener: TaskCompletionListener) + : this.type = synchronized { +onCompleteCallbacks -= listener --- End diff -- Should we do the same thing (i.e. chaning ArrayBuffer to LinkedHashSet) for onFailureCallbacks too? ```scala /** List of callback functions to execute when the task completes. */ @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener] /** List of callback functions to execute when the task fails. */ @transient private val onFailureCallbacks = new ArrayBuffer[TaskFailureListener] ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235308451 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -127,9 +127,21 @@ abstract class TaskContext extends Serializable { // Note that due to this scala bug: https://github.com/scala/bug/issues/11016, we need to make // this function polymorphic for every scala version >= 2.12, otherwise an overloaded method // resolution error occurs at compile time. -addTaskCompletionListener(new TaskCompletionListener { - override def onTaskCompletion(context: TaskContext): Unit = f(context) -}) +addTaskCompletionListener(TaskCompletionListenerWrapper(f)) + } + + /** + * Removes a (Java friendly) listener that is no longer needed to be executed on task completion. + */ + def remoteTaskCompletionListener(listener: TaskCompletionListener): TaskContext --- End diff -- Yep, seems that `v` was replaced with `t` on my keyboard) Thanks a lot! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user szhem commented on the issue: https://github.com/apache/spark/pull/23083 Hi @davies, @advancedxy, @rxin, You seem to be the last ones who touched the corresponding parts of the files in this PR. Could you be so kind to take a look at it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak
GitHub user szhem opened a pull request: https://github.com/apache/spark/pull/23083 [SPARK-26114][CORE] ExternalSorter Leak ## What changes were proposed in this pull request? This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations. For the following data ```scala import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) ``` and the following job ```scala import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count ``` ... executed like the following ```bash spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' ``` ... executors are always failing with OutOfMemoryErrors. The main issue is multiple leaks of ExternalSorter references. For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones. ![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png) P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places. ## How was this patch tested? - Existing unit tests - New unit tests - Job executions on the live environment Here is the screenshot before applying this patch ![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png) Here is the screenshot after applying this patch ![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png) And in case of reducing the number of executors even more the job is still stable ![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/szhem/spark SPARK-26114-externalsorter-leak Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23083 commit 9ca5ddbeb2022259ec79e60b3e81332466947d90 Author: Sergey Zhemzhitsky Date: 2018-11-05T22:31:59Z Allow memory consumers and spillables to be optionally unregistered and not trackable on freeing up the memory commit bf57de2c9d23c3cbac23817a3d0b6158739ae8aa Author: Sergey Zhemzhitsky Date: 2018-11-05T22:34:15Z allow to remove already registered task completion listeners if they dont need to be fired at task completion commit e3531acf2751d4ba63e7fa353b66c70d534271df Author: Sergey Zhemzhitsky Date: 2018-11-05T22:35:31Z clean up readingIterator on stop commit baa9656e9d9fa2d006ed6fb98257a213bcace588 Author: Sergey Zhemzhitsky Date: 2018-11-05T22:37:07Z prevent capturing and holding sub-iterators after completion commit 3cc54522545f43e0ee9ff9443ba3ea67e5fb9d5b Author: Sergey Zhemzhitsky Date: 2018-11-05T22:40:13Z cleaning up resourses as soon as they no longer needed, dont waiting till the end of the task commit d36323e7db3d8dc5ec01a2ae46752342ff01fac5 Author: Sergey Zhemzhitsky Date: 2018-11-18T01:01:15Z adding some unit tests commit 12075ec265f0d09cd52865bb91155898b9ede523 Author: Sergey Zhemzhitsky Da
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 Hello @mallman, @sujithjay, @felixcheung, @jkbradley, @mengxr, it's already about a year passed since this pull request has been opened. I'm just wondering whether there is any chance to get any feedback for this PR (understanding that all of you have a little or probably no time having your own more important activities) and get it either rejected or merged? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19373 Hello @sujithjay, @felixcheung, @jkbradley, @mengxr, it's already more than a year passed since this pull request has been opened. I'm just wondering whether there is any chance for this PR to be reviewed (understanding that all of you have a little or probably no time having your own more important activities) by someone and either rejected or merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 I've tested the mentioned checkpointers with `spark.cleaner.referenceTracking.cleanCheckpoints` set to `true` and without explicit checkpoint files removal. It seems that there are somewhere hard references remained - old checkpoint files are not deleted at all and it seems that ContextCleaner.doCleanCheckpoint is never called. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 Hi @asolimando, I believe that the solution with weak references will work and probably with `ContextCleaner` too, but there are some points I'd like to discuss if you don't mind - Let's take `ContextCleaner` first. In that case we should have a pretty simple solution, but the backward compatibility of `PeriodicRDDCheckpointer` and `PeriodicGraphCheckpointer` will be lost, because - it will be necessary to update these classes to prevent deleting checkpoint files - user will always have to provide `spark.cleaner.referenceTracking.cleanCheckpoints` property in order to clean unnecessary checkpoints. - the users who haven't specified `spark.cleaner.referenceTracking.cleanCheckpoints` previously (and I believe there will be most of them) will be affected by this new unexpected behaviour - In case of custom solution based on weak references - it will be necessary to poll a reference queue at some place and moment to remove unnecessary checkpoint files. - polling the reference queue in the separate thread will complicate the code - polling the reference queue synchronously does not guarantee deleting all the unnecessary checkpoint files. In case of reference queue, could you please recommend the convenient place in the source code to do it? As for me - setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` by default should work - setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` by default will allow us to simplify `PeriodicRDDCheckpointer` and `PeriodicGraphCheckpointer` too by deleting unnecessary code that cleans up checkpoint files - setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` by default sounds reasonable and should not break the code of those users who didn't do it previously - but setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` will probably break the backward compatibility (although this PR tries to preserve it) What do you think? Will the community accept setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true`by default? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 Hi @mallman, I believe, that `ContextCleaner` currently does not delete checkpoint data it case of unexpected failures. Also as it works at the end of the job then there is still a chance that a job processing quite a big graph with a lot of iterations can influence other running jobs by consuming a lot of disk during its run. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 @mallman Just my two cents regarding built-in solutions: Periodic checkpointer deletes checkpoint files not to pollute the hard drive. Although disk storage is cheap it's not free. For example, in my case (graph with >1B vertices and about the same amount of edges) checkpoint directory with a single checkpoint took about 150-200GB. Checkpoint interval was set to 5, and then job was able to complete in about 100 iterations. So in case of not cleaning up unnecessary checkpoints, the checkpoint directory could grow up to 6TB (which is quite a lot) in my case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 Hi @mallman! In case of ``` StorageLevel.MEMORY_AND_DISK StorageLevel.MEMORY_AND_DISK_SER_2 ``` ... tests pass. They still fail in case of ``` StorageLevel.MEMORY_ONLY StorageLevel.MEMORY_ONLY_SER ``` Although it works, I'm not sure that changing the caching level of the graph is really a good option to go with as Spark starts complaining [here](https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala#L111) and [here](https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala#L131) ``` 18/06/27 16:08:46.802 Executor task launch worker for task 3 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow. 18/06/27 16:08:47.000 Executor task launch worker for task 4 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow. 18/06/27 16:08:47.164 Executor task launch worker for task 5 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow. 18/06/27 16:08:48.724 Executor task launch worker for task 18 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow. 18/06/27 16:08:48.749 Executor task launch worker for task 18 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow. 18/06/27 16:08:48.868 Executor task launch worker for task 19 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow. 18/06/27 16:08:48.899 Executor task launch worker for task 19 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow. 18/06/27 16:08:49.008 Executor task launch worker for task 20 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow. 18/06/27 16:08:49.028 Executor task launch worker for task 20 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow. ``` P.S. To emulate the lack of memory I just set the following options like [here](https://github.com/apache/spark/pull/19410/files?utf8=%E2%9C%93&diff=unified#diff-c2823ca69af75fc6cdfd1ebbf25c2aefR85) to emulate lack of memory resources. ``` spark.testing.reservedMemory spark.testing.memory ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 Just a kind remainder... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19373 Just a kind remainder... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19373 > so the fix might be to change to call checkpoint() to checkpoint(eager: true) - this ensures by the time checkpoint call is returned the checkpointing is completed. Even if checkpoint is completed, `PeriodicRDDCheckpointer` removes files of the checkpointed and materialized RDDs later on, so it may happen that another RDD depends on the already removed files. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 Hello @viirya, @mallman, @felixcheung, You were reviewing graph checkpointing, introduced here #15125, and this PR changes the behaviour a little bit. Could you please review this PR too if possible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19373 @felixcheung, Unfortunately, RDDs, `PeriodicRDDCheckpointer` is based on, do not have `checkpoint(eager: true)` yet. It's a functionality of DataSets. I've experimented with the similar method for RDDs ... ```scala def checkpoint(eager: Boolean): RDD[T] = { checkpoint() if (eager) { count() } this } ``` ... and it does not work for `PeriodicRDDCheckpointer` in some scenarios. Please, consider the following example ```scala val checkpointInterval = 2 val checkpointer = new PeriodicRDDCheckpointer[(Int, Int)](checkpointInterval, sc) val rdd1 = sc.makeRDD((0 until 10).map(i => i -> i)) // rdd1 is not materialized yet, checkpointer(update=1, checkpointInterval=2) checkpointer.update(rdd1) // rdd2 depends on rdd1 val rdd2 = rdd1.filter(_ => true) // rdd1 is materialized, checkpointer(update=2, checkpointInterval=2) checkpointer.update(rdd1) // rdd3 depends on rdd1 val rdd3 = rdd1.filter(_ => true) // rdd3 is not materialized yet, checkpointer(update=3, checkpointInterval=2) checkpointer.update(rdd3) // rdd3 is materialized, rdd1's files are removed, checkpointer(update=4, checkpointInterval=2) checkpointer.update(rdd3) // fails with FileNotFoundException because // rdd1's files were removed on the previous step and // rdd2 depends on rdd1 rdd2.count() ``` It fails with `FileNotFoundException` even in case of `eager` checkpointing, and passes in case of preserving parent checkpointed RDDs like it's done in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19373 BTW, how do you think guys, may be it would be better to merge changes from #19410 into this one? The #19410 is almost about the same issue and fixes the described behaviour for GraphX. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19373#discussion_r177484476 --- Diff: core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala --- @@ -73,8 +76,6 @@ import org.apache.spark.util.PeriodicCheckpointer * * @param checkpointInterval RDDs will be checkpointed at this interval * @tparam T RDD element type - * - * TODO: Move this out of MLlib? */ private[spark] class PeriodicRDDCheckpointer[T]( --- End diff -- @sujithjay, thanks a lot for noticing! Just updated the docs a little bit to clarify the new behaviour. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19373 @felixcheung > It is deleting earlier checkpoint after the current checkpoint is called though? Currently PeriodicCheckpointer can fail in case of checkpointing RDDs which depend on each other like in the sample below. ``` // create a periodic checkpointer with interval of 2 val checkpointer = new PeriodicRDDCheckpointer[Double](2, sc) val rdd1 = createRDD(sc) // rdd2 depends on rdd1 val rdd2 = rdd1.filter(_ => true) checkpointer.update(rdd2) // on the second update rdd2 is checkpointed and checkpoint files of rdd1 are deleted checkpointer.update(rdd2) // on action it's necessary to read already removed checkpoint files of rdd1 rdd2.count() ``` It's about deleting files of the already checkpointed and materialized RDD in case of another RDD depends on it. If RDDs are cached before checkpointing (like it is often recommended) then this issue is likely to be not visible, because the checkpointed RDD will be read from cache and not from the materiazed files. The good example of such a behaviour is described in this PR - #19410, where GraphX fails with `FileNotFoundException` in case of insufficient memory resources when cached blocks of checkpointed and materialized RDDs are evicted from memory, causing them to be read from already deleted files. > is this just an issue with DataSet.checkpoint(eager = true)? This PR does not include modifications to DataSet API and affects mainly `PeriodicCheckpointer` and `PeriodicRDDCheckpointer`. It was created as a preliminary PR to this one - #19410 (where GraphX fails in case of reading cached RDDs already evicted from memory). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19410 I would happy if anyone can take a look at this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19373 I would happy if anyone can take a look at this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r143306215 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) + /** + * Checks whether there are files to be committed to an absolute output location. + * + * As the committing and aborting the job occurs on driver where `addedAbsPathFiles` is always + * null, it is necessary to check whether the output path is specified, that may not be the case + * for committers not writing to distributed file systems. --- End diff -- Thanks a lot, guys! I've just updated the comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r143305853 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) + /** + * Checks whether there are files to be committed to an absolute output location. + * + * As the committing and aborting the job occurs on driver where `addedAbsPathFiles` is always --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19294 @mridulm sql-related tests were removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r142319273 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -57,6 +57,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) + /** + * Checks whether there are files to be committed to an absolute output location. + * + * As the committing and aborting the job occurs on driver where `addedAbsPathFiles` is always + * null, it is necessary to check whether the output path is specified, that may not be the case + * for committers not writing to distributed file systems. + */ + private def hasAbsPathFiles: Boolean = path != null --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19294 @gatorsmile I believe that in Spark SQL code path `path` cannot be null, because in that case `FileFormatWriter` [fails even before](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L118) `setupJob` ([which in its order calls setupCommitter](https://github.com/apache/spark/blob/e47f48c737052564e92903de16ff16707fae32c3/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L124)) on the committer is called. The interesting part is that the [FileOutputCommitter allows null output paths](https://github.com/apache/hadoop/blob/5af572b6443715b7a741296c1bd520a1840f9a7c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java#L96) and the line you highlighted is executed only in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case ...
GitHub user szhem opened a pull request: https://github.com/apache/spark/pull/19410 [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insufficient memory and checkpoints enabled ## What changes were proposed in this pull request? Fix for [SPARK-22184](https://issues.apache.org/jira/browse/SPARK-22184) JIRA issue (and also includes the related #19373). In case of GraphX jobs, when checkpoints are enabled, GraphX can fail with `FileNotFoundException`. The failure can happen during Pregel iterations or when Pregel completes only in cases of insufficient memory when checkpointed RDDs are evicted from memory and have to be read from disk (but already removed from there). This PR proposes to preserve all the checkpoints the last one (checkpoint) of `messages` and `graph` depends on during the iterations, and all the checkpoints of `messages` and `graph` the resulting `graph` depends at the end of Pregel iterations. ## How was this patch tested? Unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/szhem/spark SPARK-22184-graphx-early-checkpoints-removal Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19410.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19410 commit f2386b61a47abf19b8ca6cea7e0e5c7da9baf7d6 Author: Sergey Zhemzhitsky Date: 2017-09-27T21:33:18Z [SPARK-22150][CORE] preventing too early removal of checkpoints in case of dependant RDDs commit aa2bedae74999694b0a9992986e85d3f9feab5f6 Author: Sergey Zhemzhitsky Date: 2017-10-02T13:10:48Z [SPARK-22150][CORE] checking whether two checkpoints have the same checkpointed RDD as their parent to prevent early removal commit 6406aea3bc87c1f3a9460bbc2ae1af67d7c0c294 Author: Sergey Zhemzhitsky Date: 2017-10-02T13:22:19Z [SPARK-22150][CORE] respecting scala style settings commit 4a55cda79e61e7eec67ae9545beb0c38eca7b11b Author: Sergey Zhemzhitsky Date: 2017-10-02T14:43:27Z [SPARK-22184][CORE][GRAPHX] retain all the checkpoints the last one depends on --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19294 @mridulm Regarding FileFormatWriter I've implemented some basic tests which show that 1. [FileFormatWriter fails](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L118) even before setupJob on the committer is called [if the path is null](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aaR40) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) 2. [FileFormatWriter succeeds](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aaR70) in case of default partitioning [when customPath is not defined](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L501) (the second branch of the `if` statement) val currentPath = if (customPath.isDefined) { committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) } else { committer.newTaskTempFile(taskAttemptContext, partDir, ext) } 3. [FileFormatWriter succeeds](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aaR107) in case of custom partitioning [when customPath is defined](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L499) (the first branch of the `if` statement) val currentPath = if (customPath.isDefined) { committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) } else { committer.newTaskTempFile(taskAttemptContext, partDir, ext) } Is there anything else I can help with to be sure nothing else was affected? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in...
GitHub user szhem opened a pull request: https://github.com/apache/spark/pull/19373 [SPARK-22150][CORE] PeriodicCheckpointer fails in case of dependant RDDs ## What changes were proposed in this pull request? Fix for [SPARK-22150](https://issues.apache.org/jira/browse/SPARK-22150) JIRA issue. In case of checkpointing RDDs which depend on previously checkpointed RDDs (for example in iterative algorithms) PeriodicCheckpointer removes already checkpointed materialized RDDs too early leading to FileNotFoundExceptions. Consider the following snippet // create a periodic checkpointer with interval of 2 val checkpointer = new PeriodicRDDCheckpointer[Double](2, sc) val rdd1 = createRDD(sc) checkpointer.update(rdd1) // on the second update rdd1 is checkpointed checkpointer.update(rdd1) // on action checkpointed rdd is materialized and its lineage is truncated rdd1.count() // rdd2 depends on rdd1 val rdd2 = rdd1.filter(_ => true) checkpointer.update(rdd2) // on the second update rdd2 is checkpointed and checkpoint files of rdd1 are deleted checkpointer.update(rdd2) // on action it's necessary to read already removed checkpoint files of rdd1 rdd2.count() ## How was this patch tested? Unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/szhem/spark SPARK-22150-early-checkpoints Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19373 commit 0c3338cd645f5824f08fe37fd7174e25c416529b Author: Sergey Zhemzhitsky Date: 2017-09-27T21:33:18Z [SPARK-22150][CORE] preventing too early removal of checkpoints in case of dependant RDDs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19294 Hello guys, are there a change for this patch to be merged to master? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19294 @mridulm Updated `FileFormatWriterSuite` [to cover](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aa) both branches of the [committer calling](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L498) - for `newTaskTempFile` as well as for `newTaskTempFileAbsPath`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140654204 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -57,6 +57,11 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) + /** + * Checks whether there are files to be committed to an absolute output location. + */ + private def hasAbsPathFiles: Boolean = addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty --- End diff -- Good catch, thank you! According to the `FileCommitProtocol`, `addedAbsPathFiles` is always null on driver, so we will not be able to commit or remove these files. Replaced it with private def hasAbsPathFiles: Boolean = path != null --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140652214 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -130,17 +135,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") -val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) -for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) +if (hasAbsPathFiles) { + val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) + for ((src, dst) <- filesToMove) { +fs.rename(new Path(src), new Path(dst)) + } + fs.delete(absPathStagingDir, true) } -fs.delete(absPathStagingDir, true) --- End diff -- Wouldn't it be better to fix it in separate PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19294 @mridulm > incorporating a test for the sql part will also help in this matter. What should be the expected behaviour in case of sql? I'm asking because [the sql part seems to fail even before setupJob the on committer is called](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L118). FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140165731 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -130,17 +130,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") -val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) -for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) +if (addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty) { --- End diff -- Introduced method /** * Checks whether there are files to be committed to an absolute output location. */ private def hasAbsPathFiles: Boolean = addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140076614 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assert(FakeWriterWithCallback.exception.getMessage contains "failed to write") } + test("saveAsNewAPIHadoopDataset should use current working directory " + +"for files to be committed to an absolute output location when empty output path specified") { +val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) + +val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) +job.setOutputKeyClass(classOf[Integer]) +job.setOutputValueClass(classOf[Integer]) +job.setOutputFormatClass(classOf[NewFakeFormat]) +val jobConfiguration = job.getConfiguration + +val fs = FileSystem.get(jobConfiguration) +fs.setWorkingDirectory(new Path(getClass.getResource(".").toExternalForm)) +try { + // just test that the job does not fail with + // java.lang.IllegalArgumentException: Can not create a Path from a null string + pairs.saveAsNewAPIHadoopDataset(jobConfiguration) +} finally { + // close to prevent filesystem caching across different tests + fs.close() +} + } + + test("saveAsHadoopDataset should use current working directory " + +"for files to be committed to an absolute output location when empty output path specified") { +val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) + +val conf = new JobConf() +conf.setOutputKeyClass(classOf[Integer]) +conf.setOutputValueClass(classOf[Integer]) +conf.setOutputFormat(classOf[FakeOutputFormat]) +conf.setOutputCommitter(classOf[FakeOutputCommitter]) + +val fs = FileSystem.get(conf) +fs.setWorkingDirectory(new Path(getClass.getResource(".").toExternalForm)) +try { + FakeOutputCommitter.ran = false + pairs.saveAsHadoopDataset(conf) +} finally { + // close to prevent filesystem caching across different tests + fs.close() --- End diff -- I've updated PR not to use filesystem at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140076564 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assert(FakeWriterWithCallback.exception.getMessage contains "failed to write") } + test("saveAsNewAPIHadoopDataset should use current working directory " + +"for files to be committed to an absolute output location when empty output path specified") { +val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) + +val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) +job.setOutputKeyClass(classOf[Integer]) +job.setOutputValueClass(classOf[Integer]) +job.setOutputFormatClass(classOf[NewFakeFormat]) +val jobConfiguration = job.getConfiguration + +val fs = FileSystem.get(jobConfiguration) +fs.setWorkingDirectory(new Path(getClass.getResource(".").toExternalForm)) +try { + // just test that the job does not fail with + // java.lang.IllegalArgumentException: Can not create a Path from a null string + pairs.saveAsNewAPIHadoopDataset(jobConfiguration) +} finally { + // close to prevent filesystem caching across different tests + fs.close() --- End diff -- I was counting on indirect filesystem caching, so that it was exactly the same both in tests as well as in `SparkHadoopWriter` and calling to `newInstance` prevents us from such a possibility. Currently I've updated PR not to use filesystem at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
GitHub user szhem opened a pull request: https://github.com/apache/spark/pull/19294 [SPARK-21549][CORE] Respect OutputFormats with no output directory provided ## What changes were proposed in this pull request? Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue. Since version 2.2 Spark does not respect OutputFormat with no output paths provided. The examples of such formats are [Cassandra OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java), [Aerospike OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java), etc. which do not have an ability to rollback the results written to an external systems on job failure. Provided output directory is required by Spark to allows files to committed to an absolute output location, that is not the case for output formats which write data to external systems. This pull request proposes to use Filysystem's working directory, that is usually user's home directory in case of distributed file systems, if no output directory is provided by means of job configuration. ## How was this patch tested? Unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/szhem/spark SPARK-21549-abs-output-commits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19294.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19294 commit b99344845a73b33d4ec319b6484c3104306c34ee Author: Sergey Zhemzhitsky Date: 2017-09-20T13:07:20Z [SPARK-21549][CORE] Respect empty output paths for files to be committed to an absolute output location in case of custom output formats commit 5c1474ab78f46a73236d971a23d9b112d8613405 Author: Sergey Zhemzhitsky Date: 2017-09-20T13:13:58Z [SPARK-21549][CORE] Respect empty output paths for files to be committed to an absolute location - reformatting imports --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17740: [SPARK-SPARK-20404][CORE] Using Option(name) inst...
GitHub user szhem opened a pull request: https://github.com/apache/spark/pull/17740 [SPARK-SPARK-20404][CORE] Using Option(name) instead of Some(name) Using Option(name) instead of Some(name) to prevent runtime failures when using accumulators created like the following ``` sparkContext.accumulator(0, null) ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/szhem/spark SPARK-20404-null-acc-names Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17740.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17740 commit 9c6e4d685cebe034d12c085ae9f97f5187cec36b Author: Sergey Zhemzhitsky Date: 2017-04-24T08:58:12Z [SPARK-SPARK-20404][CORE] Using Option(name) instead of Some(name) when creating accumulators to prevent failures at runtime when using null names --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org