[jira] [Created] (SPARK-4617) Fix spark.yarn.applicationMaster.waitTries doc
Sandy Ryza created SPARK-4617: - Summary: Fix spark.yarn.applicationMaster.waitTries doc Key: SPARK-4617 URL: https://issues.apache.org/jira/browse/SPARK-4617 Project: Spark Issue Type: Bug Components: Documentation, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza Assignee: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4352) Incorporate locality preferences in dynamic allocation requests
[ https://issues.apache.org/jira/browse/SPARK-4352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4352: -- Description: Currently, achieving data locality in Spark is difficult u preferredNodeLocalityData provides a sort of hacky workaround that has been broken since 1.0. Incorporate locality preferences in dynamic allocation requests --- Key: SPARK-4352 URL: https://issues.apache.org/jira/browse/SPARK-4352 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza Currently, achieving data locality in Spark is difficult u preferredNodeLocalityData provides a sort of hacky workaround that has been broken since 1.0. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4352) Incorporate locality preferences in dynamic allocation requests
[ https://issues.apache.org/jira/browse/SPARK-4352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4352: -- Description: Currently, achieving data locality in Spark is difficult unless an application takes resources on every node in the cluster. preferredNodeLocalityData provides a sort of hacky workaround that has been broken since 1.0. With dynamic executor allocation, Spark requests executors in response to demand from the application. When this occurs, it would be useful to look at the pending tasks and communicate their location preferences to the cluster resource manager. was: Currently, achieving data locality in Spark is difficult u preferredNodeLocalityData provides a sort of hacky workaround that has been broken since 1.0. Incorporate locality preferences in dynamic allocation requests --- Key: SPARK-4352 URL: https://issues.apache.org/jira/browse/SPARK-4352 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza Currently, achieving data locality in Spark is difficult unless an application takes resources on every node in the cluster. preferredNodeLocalityData provides a sort of hacky workaround that has been broken since 1.0. With dynamic executor allocation, Spark requests executors in response to demand from the application. When this occurs, it would be useful to look at the pending tasks and communicate their location preferences to the cluster resource manager. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-4447) Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha
[ https://issues.apache.org/jira/browse/SPARK-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza reassigned SPARK-4447: - Assignee: Patrick Wendell Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha Key: SPARK-4447 URL: https://issues.apache.org/jira/browse/SPARK-4447 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.3.0 Reporter: Sandy Ryza Assignee: Patrick Wendell For example, YarnRMClient and YarnRMClientImpl can be merged YarnAllocator and YarnAllocationHandler can be merged -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-4447) Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha
[ https://issues.apache.org/jira/browse/SPARK-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza reassigned SPARK-4447: - Assignee: Sandy Ryza (was: Patrick Wendell) Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha Key: SPARK-4447 URL: https://issues.apache.org/jira/browse/SPARK-4447 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.3.0 Reporter: Sandy Ryza Assignee: Sandy Ryza For example, YarnRMClient and YarnRMClientImpl can be merged YarnAllocator and YarnAllocationHandler can be merged -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222572#comment-14222572 ] Sandy Ryza commented on SPARK-4452: --- [~tianshuo], I took a look at the patch, and the general approach looks reasonable to me. A couple additional thoughts that apply both to the current approach and Tianshuo's patch: * When we chain an ExternalAppendOnlyMap to an ExternalSorter for processing combined map outputs in sort based shuffle, we end up double counting, no? Both data structures will be holding references to the same objects and estimating their size based on these objects. * We could make the in-memory iterators destructive as well right? I.e. if the data structures can release references to objects as they yield them, then we can give memory back to the shuffle memory manager and make it available to other data structures in the same thread. If we can avoid double and holding on to unneeded objects, it would obviate some of the need for intra-thread limits / forced spilling. Shuffle data structures can starve others on the same thread for memory Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Tianshuo Deng Assignee: Tianshuo Deng Priority: Critical When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. Currently, ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes: 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. Previously the spillable objects trigger spilling by themselves. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to. 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4569) Rename externalSorting in Aggregator
Sandy Ryza created SPARK-4569: - Summary: Rename externalSorting in Aggregator Key: SPARK-4569 URL: https://issues.apache.org/jira/browse/SPARK-4569 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Trivial While technically all spilling in Spark does result in sorting, calling this variable externalSorting makes it seem like ExternalSorter will be used, when in fact it just means whether spilling is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default
[ https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14221453#comment-14221453 ] Sandy Ryza commented on SPARK-1956: --- This is of smaller importance now that sort-based shuffle is the default, but how do we feel about turning this on by default now? [~mridulm80], have the issues you're aware of been resolved? Enable shuffle consolidation by default --- Key: SPARK-1956 URL: https://issues.apache.org/jira/browse/SPARK-1956 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza The only drawbacks are on ext3, and most everyone has ext4 at this point. I think it's better to aim the default at the common case. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4550) In sort-based shuffle, store map outputs as serialized
Sandy Ryza created SPARK-4550: - Summary: In sort-based shuffle, store map outputs as serialized Key: SPARK-4550 URL: https://issues.apache.org/jira/browse/SPARK-4550 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza One drawback with sort-based shuffle compared to hash-based shuffle is that it ends up storing many more java objects in memory. If Spark could store map outputs in serialized form, it could * spill less often because the serialized form is more compact * reduce GC pressure This will only work when the serialized representations of objects are independent from each other and occupy contiguous segments of memory. E.g. when Kryo reference tracking is left on, objects may contain pointers to objects farther back in the stream, which means that the sort can't relocate objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form
[ https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4550: -- Summary: In sort-based shuffle, store map outputs in serialized form (was: In sort-based shuffle, store map outputs as serialized) In sort-based shuffle, store map outputs in serialized form --- Key: SPARK-4550 URL: https://issues.apache.org/jira/browse/SPARK-4550 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza One drawback with sort-based shuffle compared to hash-based shuffle is that it ends up storing many more java objects in memory. If Spark could store map outputs in serialized form, it could * spill less often because the serialized form is more compact * reduce GC pressure This will only work when the serialized representations of objects are independent from each other and occupy contiguous segments of memory. E.g. when Kryo reference tracking is left on, objects may contain pointers to objects farther back in the stream, which means that the sort can't relocate objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4550) In sort-based shuffle, store map outputs in serialized form
[ https://issues.apache.org/jira/browse/SPARK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14221710#comment-14221710 ] Sandy Ryza commented on SPARK-4550: --- We don't, though it would allow us to be much more efficient in certain situations. The way sort-based shuffle works right now, the map side only sorts by the partition, so we can store this number alongside the serialized record and not need to compare keys at all. SPARK-2926 proposes sorting by keys on the map side. For that, we'd need to deserialize keys before comparing them. There might be situations where this is slower than not serializing them in the first place. But even in those situations, we'd get more reliability by stressing GC less. It would probably be good to define raw comparators for common raw-comparable key types like ints and strings. In sort-based shuffle, store map outputs in serialized form --- Key: SPARK-4550 URL: https://issues.apache.org/jira/browse/SPARK-4550 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza One drawback with sort-based shuffle compared to hash-based shuffle is that it ends up storing many more java objects in memory. If Spark could store map outputs in serialized form, it could * spill less often because the serialized form is more compact * reduce GC pressure This will only work when the serialized representations of objects are independent from each other and occupy contiguous segments of memory. E.g. when Kryo reference tracking is left on, objects may contain pointers to objects farther back in the stream, which means that the sort can't relocate objects without corrupting them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14220620#comment-14220620 ] Sandy Ryza commented on SPARK-2089: --- Another possible solution here is SPARK-4352. Requesting executors at locations after jobs have started would allow mean that users don't need to specify locations at all. With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Critical When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14216933#comment-14216933 ] Sandy Ryza commented on SPARK-4452: --- One issue with a limits-by-object approach is that it could result in extra wasted memory over the current approach for tasks that produce less shuffle data than they read. E.g. consider a rdd.reduceByKey(...).map(...).reduceByKey(...)... The object aggregating inputs used to have access to the full memory allotted to the task, but now it only gets half the memory. In situations where the object aggregating outputs doesn't need as much memory (because there is less output data), some of the memory that previously would have been used is unused. A forced spilling approach seems like it could give some of the advantages that preemption provides in cluster scheduling - better utilization through enabling objects to use more than their fair amount until it turns out other objects need those resources. Shuffle data structures can starve others on the same thread for memory Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: Tianshuo Deng Assignee: Tianshuo Deng Priority: Blocker When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. Currently, ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes: 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. Previously the spillable objects trigger spilling by themselves. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to. 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14217340#comment-14217340 ] Sandy Ryza commented on SPARK-4452: --- [~matei] my point is not that forced spilling allows us to avoid setting limits, but that it allows those limits to be soft: if an entity (thread or object) is not requesting the 1/N memory reserved for it, that memory can be given to other entities that need it. Then, if the entity later requests the memory reserved to it, the other entities above their fair allocation can be forced to spill. (I don't necessarily mean to argue that this advantage is worth the added complexity.) Shuffle data structures can starve others on the same thread for memory Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Tianshuo Deng Assignee: Tianshuo Deng Priority: Critical When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. Currently, ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes: 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. Previously the spillable objects trigger spilling by themselves. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to. 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4447) Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha
Sandy Ryza created SPARK-4447: - Summary: Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha Key: SPARK-4447 URL: https://issues.apache.org/jira/browse/SPARK-4447 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.3.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4447) Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha
[ https://issues.apache.org/jira/browse/SPARK-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4447: -- Description: For example, YarnRMClient and YarnRMClientImpl can be merged YarnAllocator and YarnAllocationHandler can be merged Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha Key: SPARK-4447 URL: https://issues.apache.org/jira/browse/SPARK-4447 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.3.0 Reporter: Sandy Ryza For example, YarnRMClient and YarnRMClientImpl can be merged YarnAllocator and YarnAllocationHandler can be merged -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4447) Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha
[ https://issues.apache.org/jira/browse/SPARK-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14214411#comment-14214411 ] Sandy Ryza commented on SPARK-4447: --- Planning to work on this. Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha Key: SPARK-4447 URL: https://issues.apache.org/jira/browse/SPARK-4447 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.3.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Enhance Sort-based Shuffle to avoid spilling small files
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215018#comment-14215018 ] Sandy Ryza commented on SPARK-4452: --- I haven't thought the implications out fully, but it worries me that data structures wouldn't be in charge of their own spilling. It seems like for different data structures, different spilling patterns and sizes could be more efficient, and placing control in the hands of the shuffle memory manager negates this. Another fix (simpler but maybe less flexible) worth considering would be to define a static fraction of memory that goes to each of the aggregator and the external sorter, right? One way that could be implemented is that each time the aggregator gets memory, it allocates some for the sorter. Also, for SPARK-2926, we are working on a tiered merger that would avoid the Too many open files problem by only merging a limited number of files at once. Enhance Sort-based Shuffle to avoid spilling small files Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Reporter: tianshuo When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. This happens when using the sort-based shuffle. The issue is caused by multiple factors: 1. There seems to be a bug in setting the elementsRead variable in ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) useless for triggering spilling, the pr to fix it is https://github.com/apache/spark/pull/3302 2. Current ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. This avoids problem 2 from happening. Previously spillable object triggers spilling by themself. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4452) Enhance Sort-based Shuffle to avoid spilling small files
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4452: -- Affects Version/s: 1.1.0 Enhance Sort-based Shuffle to avoid spilling small files Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: tianshuo When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. This happens when using the sort-based shuffle. The issue is caused by multiple factors: 1. There seems to be a bug in setting the elementsRead variable in ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) useless for triggering spilling, the pr to fix it is https://github.com/apache/spark/pull/3302 2. Current ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. This avoids problem 2 from happening. Previously spillable object triggers spilling by themself. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Enhance Sort-based Shuffle to avoid spilling small files
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215047#comment-14215047 ] Sandy Ryza commented on SPARK-4452: --- A third possible fix would be to have the shuffle memory manager consider fairness in terms of spillable data structures instead of threads. Enhance Sort-based Shuffle to avoid spilling small files Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: tianshuo When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. This happens when using the sort-based shuffle. The issue is caused by multiple factors: 1. There seems to be a bug in setting the elementsRead variable in ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) useless for triggering spilling, the pr to fix it is https://github.com/apache/spark/pull/3302 2. Current ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. This avoids problem 2 from happening. Previously spillable object triggers spilling by themself. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4456) Document why spilling depends on both elements read and memory used
Sandy Ryza created SPARK-4456: - Summary: Document why spilling depends on both elements read and memory used Key: SPARK-4456 URL: https://issues.apache.org/jira/browse/SPARK-4456 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Minor It's not clear to me why the number of records should have an effect on when a data structure spills. It would probably be useful to document this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4457) Document how to build for Hadoop versions greater than 2.4
Sandy Ryza created SPARK-4457: - Summary: Document how to build for Hadoop versions greater than 2.4 Key: SPARK-4457 URL: https://issues.apache.org/jira/browse/SPARK-4457 Project: Spark Issue Type: Improvement Components: Documentation Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4452: -- Summary: Shuffle data structures can starve others on the same thread for memory (was: Enhance Sort-based Shuffle to avoid spilling small files) Shuffle data structures can starve others on the same thread for memory Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: tianshuo When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. This happens when using the sort-based shuffle. The issue is caused by multiple factors: 1. There seems to be a bug in setting the elementsRead variable in ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) useless for triggering spilling, the pr to fix it is https://github.com/apache/spark/pull/3302 2. Current ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. This avoids problem 2 from happening. Previously spillable object triggers spilling by themself. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215269#comment-14215269 ] Sandy Ryza commented on SPARK-4452: --- Updated the title to reflect the specific problem. Shuffle data structures can starve others on the same thread for memory Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: tianshuo When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. This happens when using the sort-based shuffle. The issue is caused by multiple factors: 1. There seems to be a bug in setting the elementsRead variable in ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) useless for triggering spilling, the pr to fix it is https://github.com/apache/spark/pull/3302 2. Current ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. This avoids problem 2 from happening. Previously spillable object triggers spilling by themself. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215436#comment-14215436 ] Sandy Ryza commented on SPARK-4452: --- [~andrewor14], IIUC, (2) shouldn't happen in hash-based shuffle at all, because hash-based shuffle doesn't use multiple spillable data structures in each task. Shuffle data structures can starve others on the same thread for memory Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: Tianshuo Deng When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. This happens when using the sort-based shuffle. The issue is caused by multiple factors: 1. There seems to be a bug in setting the elementsRead variable in ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) useless for triggering spilling, the pr to fix it is https://github.com/apache/spark/pull/3302 2. Current ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. This avoids problem 2 from happening. Previously spillable object triggers spilling by themself. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14215448#comment-14215448 ] Sandy Ryza commented on SPARK-4452: --- Ah, true. Shuffle data structures can starve others on the same thread for memory Key: SPARK-4452 URL: https://issues.apache.org/jira/browse/SPARK-4452 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: Tianshuo Deng Priority: Blocker When an Aggregator is used with ExternalSorter in a task, spark will create many small files and could cause too many files open error during merging. This happens when using the sort-based shuffle. The issue is caused by multiple factors: 1. There seems to be a bug in setting the elementsRead variable in ExternalSorter, which renders the trackMemoryThreshold(defined in Spillable) useless for triggering spilling, the pr to fix it is https://github.com/apache/spark/pull/3302 2. Current ShuffleMemoryManager does not work well when there are 2 spillable objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used by Aggregator) in this case. Here is an example: Due to the usage of mapside aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may ask as much memory as it can, which is totalMem/numberOfThreads. Then later on when ExternalSorter is created in the same thread, the ShuffleMemoryManager could refuse to allocate more memory to it, since the memory is already given to the previous requested object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling small files(due to the lack of memory) I'm currently working on a PR to address these two issues. It will include following changes 1. The ShuffleMemoryManager should not only track the memory usage for each thread, but also the object who holds the memory 2. The ShuffleMemoryManager should be able to trigger the spilling of a spillable object. In this way, if a new object in a thread is requesting memory, the old occupant could be evicted/spilled. This avoids problem 2 from happening. Previously spillable object triggers spilling by themself. So one may not trigger spilling even if another object in the same thread needs more memory. After this change The ShuffleMemoryManager could trigger the spilling of an object if it needs to 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously ExternalAppendOnlyMap returns an destructive iterator and can not be spilled after the iterator is returned. This should be changed so that even after the iterator is returned, the ShuffleMemoryManager can still spill it. Currently, I have a working branch in progress: https://github.com/tsdeng/spark/tree/enhance_memory_manager Already made change 3 and have a prototype of change 1 and 2 to evict spillable from memory manager, still in progress. I will send a PR when it's done. Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4375) assembly built with Maven is missing most of repl classes
Sandy Ryza created SPARK-4375: - Summary: assembly built with Maven is missing most of repl classes Key: SPARK-4375 URL: https://issues.apache.org/jira/browse/SPARK-4375 Project: Spark Issue Type: Bug Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Blocker -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4375) assembly built with Maven is missing most of repl classes
[ https://issues.apache.org/jira/browse/SPARK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4375: -- Description: In particular, the ones in the split scala-2.10/scala-2.11 directories aren't being added assembly built with Maven is missing most of repl classes - Key: SPARK-4375 URL: https://issues.apache.org/jira/browse/SPARK-4375 Project: Spark Issue Type: Bug Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Blocker In particular, the ones in the split scala-2.10/scala-2.11 directories aren't being added -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4375) Assembly built with Maven is missing most of repl classes
[ https://issues.apache.org/jira/browse/SPARK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4375: -- Summary: Assembly built with Maven is missing most of repl classes (was: assembly built with Maven is missing most of repl classes) Assembly built with Maven is missing most of repl classes - Key: SPARK-4375 URL: https://issues.apache.org/jira/browse/SPARK-4375 Project: Spark Issue Type: Bug Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Blocker In particular, the ones in the split scala-2.10/scala-2.11 directories aren't being added -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4375) Assembly built with Maven is missing most of repl classes
[ https://issues.apache.org/jira/browse/SPARK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14209388#comment-14209388 ] Sandy Ryza commented on SPARK-4375: --- This all makes sense to me. Will put up a patch. Assembly built with Maven is missing most of repl classes - Key: SPARK-4375 URL: https://issues.apache.org/jira/browse/SPARK-4375 Project: Spark Issue Type: Bug Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Blocker In particular, the ones in the split scala-2.10/scala-2.11 directories aren't being added -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4338) Remove yarn-alpha support
[ https://issues.apache.org/jira/browse/SPARK-4338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14206104#comment-14206104 ] Sandy Ryza commented on SPARK-4338: --- Planning to take a stab at this Remove yarn-alpha support - Key: SPARK-4338 URL: https://issues.apache.org/jira/browse/SPARK-4338 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4338) Remove yarn-alpha support
Sandy Ryza created SPARK-4338: - Summary: Remove yarn-alpha support Key: SPARK-4338 URL: https://issues.apache.org/jira/browse/SPARK-4338 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4352) Incorporate locality preferences in dynamic allocation requests
Sandy Ryza created SPARK-4352: - Summary: Incorporate locality preferences in dynamic allocation requests Key: SPARK-4352 URL: https://issues.apache.org/jira/browse/SPARK-4352 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
[ https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14205346#comment-14205346 ] Sandy Ryza commented on SPARK-4290: --- SparkFiles.get needs to be called, but it will only download them once per executor. The distributed cache is lazy in this way too. Provide an equivalent functionality of distributed cache as MR does --- Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4337) Add ability to cancel pending requests to YARN
Sandy Ryza created SPARK-4337: - Summary: Add ability to cancel pending requests to YARN Key: SPARK-4337 URL: https://issues.apache.org/jira/browse/SPARK-4337 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza This will be useful for things like SPARK-4136 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4267) Failing to launch jobs on Spark on YARN with Hadoop 2.5.0 or later
[ https://issues.apache.org/jira/browse/SPARK-4267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14202319#comment-14202319 ] Sandy Ryza commented on SPARK-4267: --- Strange. Checked in the code and it seems like this must mean the taskScheduler is null. Did you see any errors farther up in the shell before this happened? Does it work in local mode? Failing to launch jobs on Spark on YARN with Hadoop 2.5.0 or later -- Key: SPARK-4267 URL: https://issues.apache.org/jira/browse/SPARK-4267 Project: Spark Issue Type: Bug Reporter: Tsuyoshi OZAWA Currently we're trying Spark on YARN included in Hadoop 2.5.1. Hadoop 2.5 uses protobuf 2.5.0 so I compiled with protobuf 2.5.1 like this: {code} ./make-distribution.sh --name spark-1.1.1 --tgz -Pyarn -Dhadoop.version=2.5.1 -Dprotobuf.version=2.5.0 {code} Then Spark on YARN fails to launch jobs with NPE. {code} $ bin/spark-shell --master yarn-client scala sc.textFile(hdfs:///user/ozawa/wordcountInput20G).flatMap(line = line.split( )).map(word = (word, 1)).persist().reduceByKey((a, b) = a + b, 16).saveAsTextFile(hdfs:///user/ozawa/sparkWordcountOutNew2); java.lang.NullPointerException at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:1284) at org.apache.spark.SparkContext.defaultMinPartitions(SparkContext.scala:1291) at org.apache.spark.SparkContext.textFile$default$2(SparkContext.scala:480) at $iwC$$iwC$$iwC$$iwC.init(console:13) at $iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC.init(console:20) at $iwC.init(console:22) at init(console:24) at .init(console:28) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
[jira] [Commented] (SPARK-4280) In dynamic allocation, add option to never kill executors with cached blocks
[ https://issues.apache.org/jira/browse/SPARK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14202382#comment-14202382 ] Sandy Ryza commented on SPARK-4280: --- So it looks like the block IDs of broadcast variables on each node are the same broadcast IDs used on the driver. Which means it wouldn't be too hard to do this filtering. Even without it, this would still be useful. What do you think? In dynamic allocation, add option to never kill executors with cached blocks Key: SPARK-4280 URL: https://issues.apache.org/jira/browse/SPARK-4280 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza Even with the external shuffle service, this is useful in situations like Hive on Spark where a query might require caching some data. We want to be able to give back executors after the job ends, but not during the job if it would delete intermediate results. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4280) In dynamic allocation, add option to never kill executors with cached blocks
Sandy Ryza created SPARK-4280: - Summary: In dynamic allocation, add option to never kill executors with cached blocks Key: SPARK-4280 URL: https://issues.apache.org/jira/browse/SPARK-4280 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza Even with the external shuffle service, this is useful in situations like Hive on Spark where a query might require caching some data. We want to be able to give back executors after the job ends, but not during the job if it would delete intermediate results. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4280) In dynamic allocation, add option to never kill executors with cached blocks
[ https://issues.apache.org/jira/browse/SPARK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200801#comment-14200801 ] Sandy Ryza commented on SPARK-4280: --- My thinking was that it would just be based on whether the node reports it's storing blocks. So, e.g., applications wouldn't need to explicitly uncache the RDDs in situations where Spark garbage collects them because their references go away. Ideally we would make a special case for broadcast variables, because they're not unique to any node. Will look into whether there's a good way to do this. In dynamic allocation, add option to never kill executors with cached blocks Key: SPARK-4280 URL: https://issues.apache.org/jira/browse/SPARK-4280 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza Even with the external shuffle service, this is useful in situations like Hive on Spark where a query might require caching some data. We want to be able to give back executors after the job ends, but not during the job if it would delete intermediate results. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
[ https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201572#comment-14201572 ] Sandy Ryza commented on SPARK-4290: --- If you call SparkContext#addFile, the file will be pulled onto the local disks of the executors a single time. It's true that for large clusters, writing to HDFS with high replication could be more efficient than sending from the driver to every executor. It might be worth implementing / using a more sophisticated broadcast mechanism rather than adding a new API. Provide an equivalent functionality of distributed cache as MR does --- Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4214) With dynamic allocation, avoid outstanding requests for more executors than pending tasks need
[ https://issues.apache.org/jira/browse/SPARK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14196464#comment-14196464 ] Sandy Ryza edited comment on SPARK-4214 at 11/4/14 6:00 PM: We can implement this in either a weak way or a strong way. * Weak: whenever we would request additional executors, avoid requesting more than pending tasks need. * Strong: In addition to the above, when pending tasks go below the capacity of outstanding requests, cancel outstanding requests. My opinion is that the strong way is worthwhile. It allows us to behave well in scenarios like: * Due to contention on the cluster, an app can get no more than X executors from YARN * As a long job runs, we make lots of executors requests to YARN. At some point, YARN stops fulfilling additional requests. * The long job completes. With cancellation, we can avoid * Surges of unneeded executors when contention on the cluster goes away * New executors popping back up as we kill our own for being idly [~pwendell] [~andrewor] do you have any thoughts? was (Author: sandyr): We can implement this in either a weak way or a strong way. * Weak: whenever we would request additional executors, avoid requesting more than pending tasks need. * Strong: In addition to the above, when pending tasks go below the capacity of outstanding requests, cancel outstanding requests. My opinion is that the strong way is worthwhile. It allows us to behave well in scenarios like: 1. Due to contention on the cluster, an app can get no more than X executors from YARN 2. As a long job runs, we make lots of executors requests to YARN. At some point, YARN stops fulfilling additional requests. 3. The long job completes. With cancellation, we can avoid * Surges of unneeded executors when contention on the cluster goes away * New executors popping back up as we kill our own for being idly [~pwendell] [~andrewor] do you have any thoughts? With dynamic allocation, avoid outstanding requests for more executors than pending tasks need -- Key: SPARK-4214 URL: https://issues.apache.org/jira/browse/SPARK-4214 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Dynamic allocation tries to allocate more executors while we have pending tasks remaining. Our current policy can end up with more outstanding executor requests than needed to fulfill all the pending tasks. Capping the executor requests to the number of cores needed to fulfill all pending tasks would make dynamic allocation behavior less sensitive to settings for maxExecutors. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4214) With dynamic allocation, avoid outstanding requests for more executors than pending tasks need
[ https://issues.apache.org/jira/browse/SPARK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14196464#comment-14196464 ] Sandy Ryza commented on SPARK-4214: --- We can implement this in either a weak way or a strong way. * Weak: whenever we would request additional executors, avoid requesting more than pending tasks need. * Strong: In addition to the above, when pending tasks go below the capacity of outstanding requests, cancel outstanding requests. My opinion is that the strong way is worthwhile. It allows us to behave well in scenarios like: 1. Due to contention on the cluster, an app can get no more than X executors from YARN 2. As a long job runs, we make lots of executors requests to YARN. At some point, YARN stops fulfilling additional requests. 3. The long job completes. With cancellation, we can avoid * Surges of unneeded executors when contention on the cluster goes away * New executors popping back up as we kill our own for being idly [~pwendell] [~andrewor] do you have any thoughts? With dynamic allocation, avoid outstanding requests for more executors than pending tasks need -- Key: SPARK-4214 URL: https://issues.apache.org/jira/browse/SPARK-4214 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Dynamic allocation tries to allocate more executors while we have pending tasks remaining. Our current policy can end up with more outstanding executor requests than needed to fulfill all the pending tasks. Capping the executor requests to the number of cores needed to fulfill all pending tasks would make dynamic allocation behavior less sensitive to settings for maxExecutors. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4214) With dynamic allocation, avoid outstanding requests for more executors than pending tasks need
[ https://issues.apache.org/jira/browse/SPARK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14196464#comment-14196464 ] Sandy Ryza edited comment on SPARK-4214 at 11/4/14 6:00 PM: We can implement this in either a weak way or a strong way. * Weak: whenever we would request additional executors, avoid requesting more than pending tasks need. * Strong: In addition to the above, when pending tasks go below the capacity of outstanding requests, cancel outstanding requests. My opinion is that the strong way is worthwhile. It allows us to behave well in scenarios like: * Due to contention on the cluster, an app can get no more than X executors from YARN * As a long job runs, we make lots of executors requests to YARN. At some point, YARN stops fulfilling additional requests. * The long job completes. With cancellation, we can avoid * Surges of unneeded executors when contention on the cluster goes away * New executors popping back up as we kill our own for being idly [~pwendell] [~andrewor] do you have any thoughts? was (Author: sandyr): We can implement this in either a weak way or a strong way. * Weak: whenever we would request additional executors, avoid requesting more than pending tasks need. * Strong: In addition to the above, when pending tasks go below the capacity of outstanding requests, cancel outstanding requests. My opinion is that the strong way is worthwhile. It allows us to behave well in scenarios like: * Due to contention on the cluster, an app can get no more than X executors from YARN * As a long job runs, we make lots of executors requests to YARN. At some point, YARN stops fulfilling additional requests. * The long job completes. With cancellation, we can avoid * Surges of unneeded executors when contention on the cluster goes away * New executors popping back up as we kill our own for being idly [~pwendell] [~andrewor] do you have any thoughts? With dynamic allocation, avoid outstanding requests for more executors than pending tasks need -- Key: SPARK-4214 URL: https://issues.apache.org/jira/browse/SPARK-4214 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Dynamic allocation tries to allocate more executors while we have pending tasks remaining. Our current policy can end up with more outstanding executor requests than needed to fulfill all the pending tasks. Capping the executor requests to the number of cores needed to fulfill all pending tasks would make dynamic allocation behavior less sensitive to settings for maxExecutors. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4227) Document external shuffle service
Sandy Ryza created SPARK-4227: - Summary: Document external shuffle service Key: SPARK-4227 URL: https://issues.apache.org/jira/browse/SPARK-4227 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza We should add spark.shuffle.service.enabled to the Configuration page and give instructions for launching the shuffle service as an auxiliary service on YARN. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4230) Doc for spark.default.parallelism is incorrect
Sandy Ryza created SPARK-4230: - Summary: Doc for spark.default.parallelism is incorrect Key: SPARK-4230 URL: https://issues.apache.org/jira/browse/SPARK-4230 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Sandy Ryza The default default parallelism for shuffle transformations is actually the maximum number of partitions in dependent RDDs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4230) Doc for spark.default.parallelism is incorrect
[ https://issues.apache.org/jira/browse/SPARK-4230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-4230: -- Description: The default default parallelism for shuffle transformations is actually the maximum number of partitions in dependent RDDs. Should also probably be clear about what SparkContext.defaultParallelism is used for was:The default default parallelism for shuffle transformations is actually the maximum number of partitions in dependent RDDs. Doc for spark.default.parallelism is incorrect -- Key: SPARK-4230 URL: https://issues.apache.org/jira/browse/SPARK-4230 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Sandy Ryza The default default parallelism for shuffle transformations is actually the maximum number of partitions in dependent RDDs. Should also probably be clear about what SparkContext.defaultParallelism is used for -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4214) With dynamic allocation, avoid outstanding requests for more executors than pending tasks need
Sandy Ryza created SPARK-4214: - Summary: With dynamic allocation, avoid outstanding requests for more executors than pending tasks need Key: SPARK-4214 URL: https://issues.apache.org/jira/browse/SPARK-4214 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.2.0 Reporter: Sandy Ryza Dynamic allocation tries to allocate more executors while we have pending tasks remaining. Our current policy can end up with more outstanding executor requests than needed to fulfill all the pending tasks. Capping the executor requests to the number of cores needed to fulfill all pending tasks would make dynamic allocation behavior less sensitive to settings for maxExecutors. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4175) Exception on stage page
Sandy Ryza created SPARK-4175: - Summary: Exception on stage page Key: SPARK-4175 URL: https://issues.apache.org/jira/browse/SPARK-4175 Project: Spark Issue Type: Bug Affects Versions: 1.2.0 Reporter: Sandy Ryza Priority: Critical {code} 14/10/31 14:52:58 WARN servlet.ServletHandler: /stages/stage/ java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at org.apache.spark.ui.jobs.StagePage.taskRow(StagePage.scala:331) at org.apache.spark.ui.jobs.StagePage$$anonfun$8.apply(StagePage.scala:173) at org.apache.spark.ui.jobs.StagePage$$anonfun$8.apply(StagePage.scala:173) at org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:282) at org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:282) at scala.collection.immutable.Stream.map(Stream.scala:376) at org.apache.spark.ui.UIUtils$.listingTable(UIUtils.scala:282) at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:171) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:68) at javax.servlet.http.HttpServlet.service(HttpServlet.java:735) at javax.servlet.http.HttpServlet.service(HttpServlet.java:848) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1496) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1467) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:499) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:722) {code} I'm guessing this was caused by SPARK-4016? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4016) Allow user to optionally show additional, advanced metrics in the UI
[ https://issues.apache.org/jira/browse/SPARK-4016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14192604#comment-14192604 ] Sandy Ryza commented on SPARK-4016: --- It looks like after this change, stage-level summary metrics no longer include in-progress tasks. Is this on purpose? Allow user to optionally show additional, advanced metrics in the UI Key: SPARK-4016 URL: https://issues.apache.org/jira/browse/SPARK-4016 Project: Spark Issue Type: Improvement Components: Web UI Reporter: Kay Ousterhout Assignee: Kay Ousterhout Priority: Minor Fix For: 1.2.0 Allowing the user to show/hide additional metrics will allow us to both (1) add more advanced metrics without cluttering the UI for the average user and (2) hide, by default, some of the metrics currently shown that are not widely used. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4016) Allow user to optionally show additional, advanced metrics in the UI
[ https://issues.apache.org/jira/browse/SPARK-4016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14192609#comment-14192609 ] Sandy Ryza commented on SPARK-4016: --- Also, it looks like this can cause an exception: SPARK-4175 Allow user to optionally show additional, advanced metrics in the UI Key: SPARK-4016 URL: https://issues.apache.org/jira/browse/SPARK-4016 Project: Spark Issue Type: Improvement Components: Web UI Reporter: Kay Ousterhout Assignee: Kay Ousterhout Priority: Minor Fix For: 1.2.0 Allowing the user to show/hide additional metrics will allow us to both (1) add more advanced metrics without cluttering the UI for the average user and (2) hide, by default, some of the metrics currently shown that are not widely used. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4178) Hadoop input metrics ignore bytes read in RecordReader instantiation
Sandy Ryza created SPARK-4178: - Summary: Hadoop input metrics ignore bytes read in RecordReader instantiation Key: SPARK-4178 URL: https://issues.apache.org/jira/browse/SPARK-4178 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4178) Hadoop input metrics ignore bytes read in RecordReader instantiation
[ https://issues.apache.org/jira/browse/SPARK-4178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14192773#comment-14192773 ] Sandy Ryza commented on SPARK-4178: --- Thanks [~kostas] for noticing this. Hadoop input metrics ignore bytes read in RecordReader instantiation Key: SPARK-4178 URL: https://issues.apache.org/jira/browse/SPARK-4178 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4136) Under dynamic allocation, cancel outstanding executor requests when pending task queue is empty
Sandy Ryza created SPARK-4136: - Summary: Under dynamic allocation, cancel outstanding executor requests when pending task queue is empty Key: SPARK-4136 URL: https://issues.apache.org/jira/browse/SPARK-4136 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.1.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14183173#comment-14183173 ] Sandy Ryza commented on SPARK-3573: --- Is this still targeted for 1.2? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(interactor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(training, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1856) Standardize MLlib interfaces
[ https://issues.apache.org/jira/browse/SPARK-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14183174#comment-14183174 ] Sandy Ryza commented on SPARK-1856: --- Is this work still targeted for 1.2? Standardize MLlib interfaces Key: SPARK-1856 URL: https://issues.apache.org/jira/browse/SPARK-1856 Project: Spark Issue Type: New Feature Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Blocker Instead of expanding MLlib based on the current class naming scheme (ProblemWithAlgorithm), we should standardize MLlib's interfaces that clearly separate datasets, formulations, algorithms, parameter sets, and models. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179631#comment-14179631 ] Sandy Ryza commented on SPARK-2926: --- [~rxin] did you ever get a chance to try this out? Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle -- Key: SPARK-2926 URL: https://issues.apache.org/jira/browse/SPARK-2926 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 1.1.0 Reporter: Saisai Shao Attachments: SortBasedShuffleRead.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test Report.pdf Currently Spark has already integrated sort-based shuffle write, which greatly improve the IO performance and reduce the memory consumption when reducer number is very large. But for the reducer side, it still adopts the implementation of hash-based shuffle reader, which neglects the ordering attributes of map output data in some situations. Here we propose a MR style sort-merge like shuffle reader for sort-based shuffle to better improve the performance of sort-based shuffle. Working in progress code and performance test report will be posted later when some unit test bugs are fixed. Any comments would be greatly appreciated. Thanks a lot. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171024#comment-14171024 ] Sandy Ryza commented on SPARK-3174: --- bq. If I understand correctly, your concern with requesting executors in rounds is that we will end up making many requests if we have many executors, when we could instead just batch them? My original claim was that a policy that could be aware of the number of tasks needed by a stage would be necessary to get enough executors quickly when going from idle to running a job. You claim was that the exponential policy can be tuned to give similar enough behavior to the type of policy I'm describing. My concern was that, even if that is true, tuning the exponential policy in this way would be detrimental at other points of the application lifecycle: e.g., starting the next stage within a job could lead to over-allocation. Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, SparkElasticScalingDesignB.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171024#comment-14171024 ] Sandy Ryza edited comment on SPARK-3174 at 10/14/14 3:03 PM: - bq. If I understand correctly, your concern with requesting executors in rounds is that we will end up making many requests if we have many executors, when we could instead just batch them? My original claim was that a policy that could be aware of the number of tasks needed by a stage would be necessary to get enough executors quickly when going from idle to running a job. Your claim was that the exponential policy can be tuned to give similar enough behavior to the type of policy I'm describing. My concern was that, even if that is true, tuning the exponential policy in this way would be detrimental at other points of the application lifecycle: e.g., starting the next stage within a job could lead to over-allocation. was (Author: sandyr): bq. If I understand correctly, your concern with requesting executors in rounds is that we will end up making many requests if we have many executors, when we could instead just batch them? My original claim was that a policy that could be aware of the number of tasks needed by a stage would be necessary to get enough executors quickly when going from idle to running a job. You claim was that the exponential policy can be tuned to give similar enough behavior to the type of policy I'm describing. My concern was that, even if that is true, tuning the exponential policy in this way would be detrimental at other points of the application lifecycle: e.g., starting the next stage within a job could lead to over-allocation. Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, SparkElasticScalingDesignB.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3360) Add RowMatrix.multiply(Vector)
[ https://issues.apache.org/jira/browse/SPARK-3360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171238#comment-14171238 ] Sandy Ryza commented on SPARK-3360: --- bq. You don't need Vector.multiply(RowMatrix) really since it is either the same as RowMatrix.multiply(Vector) or the same with the RowMatrix transposed. Any reason not to offer it for convenience though? Add RowMatrix.multiply(Vector) -- Key: SPARK-3360 URL: https://issues.apache.org/jira/browse/SPARK-3360 Project: Spark Issue Type: Improvement Components: MLlib Reporter: Sandy Ryza RowMatrix currently has multiply(Matrix), but multiply(Vector) would be useful as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1209) SparkHadoopUtil should not use package org.apache.hadoop
[ https://issues.apache.org/jira/browse/SPARK-1209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169846#comment-14169846 ] Sandy Ryza commented on SPARK-1209: --- Definitely worth changing, in my opinion. This has been bothering me for a while SparkHadoopUtil should not use package org.apache.hadoop Key: SPARK-1209 URL: https://issues.apache.org/jira/browse/SPARK-1209 Project: Spark Issue Type: Bug Affects Versions: 0.9.0 Reporter: Sandy Pérez González Assignee: Mark Grover It's private, so the change won't break compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3884) Don't set SPARK_SUBMIT_DRIVER_MEMORY if deploy mode is cluster
Sandy Ryza created SPARK-3884: - Summary: Don't set SPARK_SUBMIT_DRIVER_MEMORY if deploy mode is cluster Key: SPARK-3884 URL: https://issues.apache.org/jira/browse/SPARK-3884 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3884) If deploy mode is cluster, --driver-memory shouldn't apply to client JVM
[ https://issues.apache.org/jira/browse/SPARK-3884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3884: -- Summary: If deploy mode is cluster, --driver-memory shouldn't apply to client JVM (was: Don't set SPARK_SUBMIT_DRIVER_MEMORY if deploy mode is cluster) If deploy mode is cluster, --driver-memory shouldn't apply to client JVM Key: SPARK-3884 URL: https://issues.apache.org/jira/browse/SPARK-3884 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Sandy Ryza Assignee: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3884) If deploy mode is cluster, --driver-memory shouldn't apply to client JVM
[ https://issues.apache.org/jira/browse/SPARK-3884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14165776#comment-14165776 ] Sandy Ryza commented on SPARK-3884: --- Accidentally assigned this to myself, but others should feel free to pick it up If deploy mode is cluster, --driver-memory shouldn't apply to client JVM Key: SPARK-3884 URL: https://issues.apache.org/jira/browse/SPARK-3884 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Sandy Ryza Assignee: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3797) Run the shuffle service inside the YARN NodeManager as an AuxiliaryService
[ https://issues.apache.org/jira/browse/SPARK-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14161636#comment-14161636 ] Sandy Ryza commented on SPARK-3797: --- Not necessarily opposed to this, but wanted to bring up some of the drawbacks of running a Spark shuffle service inside YARN NodeManagers. And the alternative. * *Dependencies* . We will need to avoid dependency conflicts between Spark's shuffle service and the rest of the NodeManager. It's worth keeping in mind that the NodeManager may include Hadoop server-side dependencies we haven't dealt with in the past through depending on hadoop-client, and that its dependencies will also need to jive with other auxiliary services like MR's and Tez's. Unlike in Spark, where we place Spark jars in front of Hadoop jars and thus allow Spark versions to take precedence, NodeManagers presumably run with Hadoop jars in front. * *Resource management* . YARN will soon have support for some disk I/O isolation and scheduling (YARN-2139). Running inside the NodeManager means that we won't be able to account for serving shuffle data inside of this. * *Deployment* . Where currently installing Spark on YARN at most means placing a Spark assembly jar on HDFS, this would require deploying Spark bits on every node in the cluster. * *Rolling Upgrades* . Some proposed YARN work will allow containers to continue running while NodeManagers restart. With Spark depending on the NodeManager to serve data, these upgrades would interfere with running Spark applications in situations where they otherwise might not. The other option worth considering is to run the shuffle service in containers that sit beside the executor(s) on each node. This avoids all the problems above, but brings a couple of its own: * Under many cluster configurations, YARN expects each container to take up at least a minimum amount of memory and CPU. The shuffle service, which would use little of either of these, would sit on these resources unnecessarily. * Scheduling becomes more difficult. Spark would require two different containers to be scheduled on any node it wants to be functional on. Once YARN has container resizing (YARN-1197), this could be mitigated by running two processes inside a single container. If Spark wanted to kill an executor, it could order the executor process to kill itself and then shrink the container to the size of the shuffle service. Run the shuffle service inside the YARN NodeManager as an AuxiliaryService -- Key: SPARK-3797 URL: https://issues.apache.org/jira/browse/SPARK-3797 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Patrick Wendell Assignee: Andrew Or It's also worth considering running the shuffle service in a YARN container beside the executor(s) on each node. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3837) Warn when YARN is killing containers for exceeding memory limits
Sandy Ryza created SPARK-3837: - Summary: Warn when YARN is killing containers for exceeding memory limits Key: SPARK-3837 URL: https://issues.apache.org/jira/browse/SPARK-3837 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.1.0 Reporter: Sandy Ryza YARN now lets application masters know when it kills their containers for exceeding memory limits. Spark should log something when this happens. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3682) Add helpful warnings to the UI
[ https://issues.apache.org/jira/browse/SPARK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3682: -- Attachment: SPARK-3682Design.pdf Posting an initial design Add helpful warnings to the UI -- Key: SPARK-3682 URL: https://issues.apache.org/jira/browse/SPARK-3682 Project: Spark Issue Type: New Feature Components: Web UI Affects Versions: 1.1.0 Reporter: Sandy Ryza Attachments: SPARK-3682Design.pdf Spark has a zillion configuration options and a zillion different things that can go wrong with a job. Improvements like incremental and better metrics and the proposed spark replay debugger provide more insight into what's going on under the covers. However, it's difficult for non-advanced users to synthesize this information and understand where to direct their attention. It would be helpful to have some sort of central location on the UI users could go to that would provide indications about why an app/job is failing or performing poorly. Some helpful messages that we could provide: * Warn that the tasks in a particular stage are spending a long time in GC. * Warn that spark.shuffle.memoryFraction does not fit inside the young generation. * Warn that tasks in a particular stage are very short, and that the number of partitions should probably be decreased. * Warn that tasks in a particular stage are spilling a lot, and that the number of partitions should probably be increased. * Warn that a cached RDD that gets a lot of use does not fit in memory, and a lot of time is being spent recomputing it. To start, probably two kinds of warnings would be most helpful. * Warnings at the app level that report on misconfigurations, issues with the general health of executors. * Warnings at the job level that indicate why a job might be performing slowly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162685#comment-14162685 ] Sandy Ryza commented on SPARK-3174: --- bq. Maybe it makes sense to just call it `spark.dynamicAllocation.*` That sounds good to me. bq. I think in general we should limit the number of things that will affect adding/removing executors. Otherwise an application might get/lose many executors all of a sudden without a good understanding of why. Also anticipating what's needed in a future stage is usually fairly difficult, because you don't know a priori how long each stage is running. I don't see a good metric to decide how far in the future to anticipate for. Consider the (common) case of a user keeping a Hive session open and setting a low number of minimum executors in order to not sit on cluster resources when idle. Goal number 1 should be making queries return as fast as possible. A policy that, upon receiving a job, simply requested executors with enough slots to handle all the tasks required by the first stage would be a vast latency and user experience improvement over the exponential increase policy. Given that resource managers like YARN will mediate fairness between users and that Spark will be able to give executors back, there's not much advantage to being conservative or ramping up slowly in this case. Accurately anticipating resource needs is difficult, but not necessary. Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14160937#comment-14160937 ] Sandy Ryza commented on SPARK-3174: --- Thanks for posting the detailed design, Andrew. A few comments. I would expect properties underneath spark.executor.* to pertain to what goes on inside of executors. This is really more of a driver/scheduler feature, so a different prefix might make more sense. Because it's a large task and there's still significant value without it, I assume we'll hold off on implementing the graceful decommission until we're done with the first parts? This is probably out of scope for the first cut, but in the future it might be useful to include addition/removal policies that use what Spark knows about upcoming stages to anticipate the number of executors needed. Can we structure the config property names in a way that will make sense if we choose to add more advanced functionality like this? When cluster resources are constrained, we may find ourselves in situations where YARN is unable to allocate the additional resources we requested before the next time interval. I haven't thought about it extremely deeply, but it seems like there may be some pathological situations in which we request an enormous number of additional executors while waiting. It might make sense to do something like avoid increasing the number requested until we've actually received some? Last, any thoughts on what reasonable intervals would be? For the add interval, I imagine that we want it to be at least the amount of time required between invoking requestExecutors and being able to schedule tasks on the executors requested. Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3797) Enable running shuffle service in separate process from executor
[ https://issues.apache.org/jira/browse/SPARK-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3797: -- Description: This could either mean * Running the shuffle service inside the YARN NodeManager as an AuxiliaryService * Running the shuffle service in a YARN container on each node was: This could either mean * Running the shuffle service inside the YARN NodeManager as an auxiliary service * Running the shuffle service in a YARN container on each node Enable running shuffle service in separate process from executor Key: SPARK-3797 URL: https://issues.apache.org/jira/browse/SPARK-3797 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Patrick Wendell Assignee: Andrew Or This could either mean * Running the shuffle service inside the YARN NodeManager as an AuxiliaryService * Running the shuffle service in a YARN container on each node -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3797) Enable running shuffle service in separate process from executor
[ https://issues.apache.org/jira/browse/SPARK-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3797: -- Description: This could either mean * Running the shuffle service inside the YARN NodeManager as an auxiliary service * Running the shuffle service in a YARN container on each node Enable running shuffle service in separate process from executor Key: SPARK-3797 URL: https://issues.apache.org/jira/browse/SPARK-3797 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Patrick Wendell Assignee: Andrew Or This could either mean * Running the shuffle service inside the YARN NodeManager as an auxiliary service * Running the shuffle service in a YARN container on each node -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14160960#comment-14160960 ] Sandy Ryza commented on SPARK-3174: --- bq. for instance, lets say I do some ETL stuff where I want it to do dynamic, but then I need to run an ML algorithm or do some heavy caching where I want to shut it off. IIUC, the proposal only gets rid of executors once they are idle / empty of cached data. If that's the case, do you still see issues with leaving dynamic allocation on during the ML algorithm phase? Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14161048#comment-14161048 ] Sandy Ryza commented on SPARK-3174: --- Ah, misread. My opinion is that, for a first cut without the graceful decommission, we should go with option 2 and only remove executors that have no cached blocks. Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3797) Run the shuffle service inside the YARN NodeManager as an AuxiliaryService
[ https://issues.apache.org/jira/browse/SPARK-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3797: -- Summary: Run the shuffle service inside the YARN NodeManager as an AuxiliaryService (was: Enable running shuffle service in separate process from executor) Run the shuffle service inside the YARN NodeManager as an AuxiliaryService -- Key: SPARK-3797 URL: https://issues.apache.org/jira/browse/SPARK-3797 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Patrick Wendell Assignee: Andrew Or This could either mean * Running the shuffle service inside the YARN NodeManager as an AuxiliaryService * Running the shuffle service in a YARN container on each node -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3797) Run the shuffle service inside the YARN NodeManager as an AuxiliaryService
[ https://issues.apache.org/jira/browse/SPARK-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3797: -- Description: It's also worth considering running the shuffle service in a YARN container beside the executor(s) on each node. (was: This could either mean * Running the shuffle service inside the YARN NodeManager as an AuxiliaryService * Running the shuffle service in a YARN container on each node) Run the shuffle service inside the YARN NodeManager as an AuxiliaryService -- Key: SPARK-3797 URL: https://issues.apache.org/jira/browse/SPARK-3797 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Patrick Wendell Assignee: Andrew Or It's also worth considering running the shuffle service in a YARN container beside the executor(s) on each node. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3464) Graceful decommission of executors
[ https://issues.apache.org/jira/browse/SPARK-3464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14159252#comment-14159252 ] Sandy Ryza commented on SPARK-3464: --- Did you mean to resolve this as Fixed? Graceful decommission of executors -- Key: SPARK-3464 URL: https://issues.apache.org/jira/browse/SPARK-3464 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.1.0 Reporter: Sandy Ryza Assignee: Andrew Or In most cases, even when an application is utilizing only a small fraction of its available resources, executors will still have tasks running or blocks cached. It would be useful to have a mechanism for waiting for running tasks on an executor to finish and migrating its cached blocks elsewhere before discarding it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3464) Graceful decommission of executors
[ https://issues.apache.org/jira/browse/SPARK-3464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14159252#comment-14159252 ] Sandy Ryza edited comment on SPARK-3464 at 10/4/14 7:27 PM: [~pwendell] did you mean to resolve this as Fixed? was (Author: sandyr): Did you mean to resolve this as Fixed? Graceful decommission of executors -- Key: SPARK-3464 URL: https://issues.apache.org/jira/browse/SPARK-3464 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.1.0 Reporter: Sandy Ryza Assignee: Andrew Or In most cases, even when an application is utilizing only a small fraction of its available resources, executors will still have tasks running or blocks cached. It would be useful to have a mechanism for waiting for running tasks on an executor to finish and migrating its cached blocks elsewhere before discarding it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3561) Native Hadoop/YARN integration for batch/ETL workloads
[ https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14158571#comment-14158571 ] Sandy Ryza commented on SPARK-3561: --- I think there may be somewhat of a misunderstanding about the relationship between Spark and YARN. YARN is not an execution environment, but a cluster resource manager that has the ability to start processes on behalf of execution engines like Spark. Spark already supports YARN as a cluster resource manager, but YARN doesn't provide its own execution engine. YARN doesn't provide a stateless shuffle (although execution engines built atop it like MR and Tez do). If I understand, the broader intent is to decouple the Spark API from the execution engine it runs on top of. Changing the title to reflect this. That, the Spark API is currently very tightly integrated with its execution engine, and frankly, decoupling the two so that Spark would be able to run on top of execution engines with similar properties seems more trouble than its worth. Native Hadoop/YARN integration for batch/ETL workloads -- Key: SPARK-3561 URL: https://issues.apache.org/jira/browse/SPARK-3561 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.1.0 Reporter: Oleg Zhurakousky Labels: features Fix For: 1.2.0 Attachments: SPARK-3561.pdf Currently Spark provides integration with external resource-managers such as Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the current architecture of Spark-on-YARN can be enhanced to provide significantly better utilization of cluster resources for large scale, batch and/or ETL applications when run alongside other applications (Spark and others) and services in YARN. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3561) Decouple Spark's API from its execution engine
[ https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3561: -- Description: Currently Spark's API is tightly coupled with its backend execution engine. It could be useful to provide a point of pluggability between the two to allow Spark to run on other DAG execution engines with similar distributed memory abstractions. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well was: Currently Spark provides integration with external resource-managers such as Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the current architecture of Spark-on-YARN can be enhanced to provide significantly better utilization of cluster resources for large scale, batch and/or ETL applications when run alongside other applications (Spark and others) and services in YARN. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well Decouple Spark's API from its execution engine -- Key: SPARK-3561 URL: https://issues.apache.org/jira/browse/SPARK-3561 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.1.0 Reporter: Oleg Zhurakousky Labels: features Fix For: 1.2.0 Attachments: SPARK-3561.pdf Currently Spark's API is tightly coupled with its backend execution engine. It could be useful to provide a point of pluggability between the two to allow Spark to run on other DAG execution engines with similar distributed memory abstractions. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3561) Decouple Spark's API from its execution engine
[ https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3561: -- Description: Currently Spark's API is tightly coupled with its backend execution engine. It could be useful to provide a point of pluggability between the two to allow Spark to run on other DAG execution engines with similar distributed memory abstractions. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well was: Currently Spark's API is tightly coupled with its backend execution engine. It could be useful to provide a point of pluggability between the two to allow Spark to run on other DAG execution engines with similar distributed memory abstractions. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well Decouple Spark's API from its execution engine -- Key: SPARK-3561 URL: https://issues.apache.org/jira/browse/SPARK-3561 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.1.0 Reporter: Oleg Zhurakousky Labels: features Fix For: 1.2.0 Attachments: SPARK-3561.pdf Currently Spark's API is tightly coupled with its backend execution engine. It could be useful to provide a point of pluggability between the two to allow Spark to run on other DAG execution engines with similar distributed memory abstractions. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3561) Decouple Spark's API from its execution engine
[ https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14158571#comment-14158571 ] Sandy Ryza edited comment on SPARK-3561 at 10/3/14 11:00 PM: - I think there may be somewhat of a misunderstanding about the relationship between Spark and YARN. YARN is not an execution environment, but a cluster resource manager that has the ability to start processes on behalf of execution engines like Spark. Spark already supports YARN as a cluster resource manager, but YARN doesn't provide its own execution engine. YARN doesn't provide a stateless shuffle (although execution engines built atop it like MR and Tez do). If I understand, the broader intent is to decouple the Spark API from the execution engine it runs on top of. Changing the title to reflect this. That said, the Spark API is currently very tightly integrated with its execution engine, and frankly, decoupling the two so that Spark would be able to run on top of execution engines with similar properties seems more trouble than its worth. was (Author: sandyr): I think there may be somewhat of a misunderstanding about the relationship between Spark and YARN. YARN is not an execution environment, but a cluster resource manager that has the ability to start processes on behalf of execution engines like Spark. Spark already supports YARN as a cluster resource manager, but YARN doesn't provide its own execution engine. YARN doesn't provide a stateless shuffle (although execution engines built atop it like MR and Tez do). If I understand, the broader intent is to decouple the Spark API from the execution engine it runs on top of. Changing the title to reflect this. That, the Spark API is currently very tightly integrated with its execution engine, and frankly, decoupling the two so that Spark would be able to run on top of execution engines with similar properties seems more trouble than its worth. Decouple Spark's API from its execution engine -- Key: SPARK-3561 URL: https://issues.apache.org/jira/browse/SPARK-3561 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.1.0 Reporter: Oleg Zhurakousky Labels: features Fix For: 1.2.0 Attachments: SPARK-3561.pdf Currently Spark's user-facing API is tightly coupled with its backend execution engine. It could be useful to provide a point of pluggability between the two to allow Spark to run on other DAG execution engines with similar distributed memory abstractions. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-3422) JavaAPISuite.getHadoopInputSplits isn't used anywhere
[ https://issues.apache.org/jira/browse/SPARK-3422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza resolved SPARK-3422. --- Resolution: Fixed JavaAPISuite.getHadoopInputSplits isn't used anywhere - Key: SPARK-3422 URL: https://issues.apache.org/jira/browse/SPARK-3422 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3693) Cached Hadoop RDD always return rows with the same value
[ https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148303#comment-14148303 ] Sandy Ryza commented on SPARK-3693: --- Spark's documentation actually makes a note of this. {code} * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. * If you plan to directly cache Hadoop writable objects, you should first copy them using * a `map` function. {code} Having MemoryStore make a copy would degrade performance in other situations where the objects aren't used. While it's not pretty, I don't see a better approach than the current behavior where the user needs to explicitly makes a copy. Though we could possibly provide some utility to help if it requires a lot of user boilerplate? Cached Hadoop RDD always return rows with the same value Key: SPARK-3693 URL: https://issues.apache.org/jira/browse/SPARK-3693 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage: {code} public final class Test { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); ... JavaPairRDDBytesWritable, BytesWritable input = ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, WritableComparable.class, Writable.class); input = input.cache(); input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() { @Override public void call(Tuple2BytesWritable, BytesWritable row) throws Exception { if (row._1() != null) { System.out.println(Key: + row._1()); } if (row._2() != null) { System.out.println(Value: + row._2()); } } }); ctx.stop(); } } {code} In this case, row._2() always gives the same value. If we disable caching by removing input.cache(), the program gives the expected rows. Further analysis shows that MemoryStore (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236) is storing the references to (key, value) pairs returned by HadoopRDD.getNext() (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220), but this method always returns the same (key, value) object references, except each getNext() call updates values inside these objects. When there are no more records (key, value) objects are filled with empty strings (no values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same key, value object pairs, all values become NULL. Probably MemoryStore should instead store a copy of key, value pair rather than keeping a reference to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3682) Add helpful warnings to the UI
[ https://issues.apache.org/jira/browse/SPARK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3682: -- Description: Spark has a zillion configuration options and a zillion different things that can go wrong with a job. Improvements like incremental and better metrics and the proposed spark replay debugger provide more insight into what's going on under the covers. However, it's difficult for non-advanced users to synthesize this information and understand where to direct their attention. It would be helpful to have some sort of central location on the UI users could go to that would provide indications about why an app/job is failing or performing poorly. Some helpful messages that we could provide: * Warn that the tasks in a particular stage are spending a long time in GC. * Warn that spark.shuffle.memoryFraction does not fit inside the young generation. * Warn that tasks in a particular stage are very short, and that the number of partitions should probably be decreased. * Warn that tasks in a particular stage are spilling a lot, and that the number of partitions should probably be increased. * Warn that a cached RDD that gets a lot of use does not fit in memory, and a lot of time is being spent recomputing it. To start, probably two kinds of warnings would be most helpful. * Warnings at the app level that report on misconfigurations, issues with the general health of executors. * Warnings at the job level that indicate why a job might be performing slowly. was: Spark has a zillion configuration options and a zillion different things that can go wrong with a job. Improvements like incremental and better metrics and the proposed spark replay debugger provide more insight into what's going on under the covers. However, it's difficult for non-advanced users to synthesize this information and understand where to direct their attention. It would be helpful to have some sort of central location on the UI users could go to that would provide indications about why an app/job is failing or performing poorly. Some helpful messages that we could provide: * Warn that the tasks in a particular stage are spending a long time in GC. * Warn that spark.shuffle.memoryFraction does not fit inside the young generation. * Warn that tasks in a particular stage are very short, and that the number of partitions should probably be decreased. * Warn that tasks in a particular stage are spilling a lot, and that the number of partitions should probably be decreased. * Warn that a cached RDD that gets a lot of use does not fit in memory, and a lot of time is being spent recomputing it. To start, probably two kinds of warnings would be most helpful. * Warnings at the app level that report on misconfigurations, issues with the general health of executors. * Warnings at the job level that indicate why a job might be performing slowly. Add helpful warnings to the UI -- Key: SPARK-3682 URL: https://issues.apache.org/jira/browse/SPARK-3682 Project: Spark Issue Type: New Feature Components: Web UI Affects Versions: 1.1.0 Reporter: Sandy Ryza Spark has a zillion configuration options and a zillion different things that can go wrong with a job. Improvements like incremental and better metrics and the proposed spark replay debugger provide more insight into what's going on under the covers. However, it's difficult for non-advanced users to synthesize this information and understand where to direct their attention. It would be helpful to have some sort of central location on the UI users could go to that would provide indications about why an app/job is failing or performing poorly. Some helpful messages that we could provide: * Warn that the tasks in a particular stage are spending a long time in GC. * Warn that spark.shuffle.memoryFraction does not fit inside the young generation. * Warn that tasks in a particular stage are very short, and that the number of partitions should probably be decreased. * Warn that tasks in a particular stage are spilling a lot, and that the number of partitions should probably be increased. * Warn that a cached RDD that gets a lot of use does not fit in memory, and a lot of time is being spent recomputing it. To start, probably two kinds of warnings would be most helpful. * Warnings at the app level that report on misconfigurations, issues with the general health of executors. * Warnings at the job level that indicate why a job might be performing slowly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3682) Add helpful warnings to the UI
[ https://issues.apache.org/jira/browse/SPARK-3682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148379#comment-14148379 ] Sandy Ryza commented on SPARK-3682: --- Oops, that should have read increased. When a task fetches more shuffle data from the previous stage than it can fit in memory, it needs to spill the extra data to disk. Increasing the number of partitions makes it so that each task will be responsible for dealing with less data and will need to spill less. Add helpful warnings to the UI -- Key: SPARK-3682 URL: https://issues.apache.org/jira/browse/SPARK-3682 Project: Spark Issue Type: New Feature Components: Web UI Affects Versions: 1.1.0 Reporter: Sandy Ryza Spark has a zillion configuration options and a zillion different things that can go wrong with a job. Improvements like incremental and better metrics and the proposed spark replay debugger provide more insight into what's going on under the covers. However, it's difficult for non-advanced users to synthesize this information and understand where to direct their attention. It would be helpful to have some sort of central location on the UI users could go to that would provide indications about why an app/job is failing or performing poorly. Some helpful messages that we could provide: * Warn that the tasks in a particular stage are spending a long time in GC. * Warn that spark.shuffle.memoryFraction does not fit inside the young generation. * Warn that tasks in a particular stage are very short, and that the number of partitions should probably be decreased. * Warn that tasks in a particular stage are spilling a lot, and that the number of partitions should probably be increased. * Warn that a cached RDD that gets a lot of use does not fit in memory, and a lot of time is being spent recomputing it. To start, probably two kinds of warnings would be most helpful. * Warnings at the app level that report on misconfigurations, issues with the general health of executors. * Warnings at the job level that indicate why a job might be performing slowly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3682) Add helpful warnings to the UI
Sandy Ryza created SPARK-3682: - Summary: Add helpful warnings to the UI Key: SPARK-3682 URL: https://issues.apache.org/jira/browse/SPARK-3682 Project: Spark Issue Type: New Feature Components: Web UI Reporter: Sandy Ryza Spark has a zillion configuration options and a zillion different things that can go wrong with a job. Improvements like incremental and better metrics and the proposed spark replay debugger provide more insight into what's going on under the covers. However, it's difficult for non-advanced users to synthesize this information and understand where to direct their attention. It would be helpful to have some sort of central location on the UI users could go to that would provide indications about why an app/job is failing or performing poorly. Some helpful messages that we could provide: * Warn that the tasks in a particular stage are spending a long time in GC. * Warn that spark.shuffle.memoryFraction does not fit inside the young generation. * Warn that tasks in a particular stage are very short, and that the number of partitions should probably be decreased. * Warn that tasks in a particular stage are spilling a lot, and that the number of partitions should probably be decreased. * Warn that a cached RDD that gets a lot of use does not fit in memory, and a lot of time is being spent recomputing it. To start, probably two kinds of warnings would be most helpful. * Warnings at the app level that report on misconfigurations, issues with the general health of executors. * Warnings at the job level that indicate why a job might be performing slowly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-2131) Collect per-task filesystem-bytes-read/written metrics
[ https://issues.apache.org/jira/browse/SPARK-2131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza resolved SPARK-2131. --- Resolution: Duplicate Collect per-task filesystem-bytes-read/written metrics -- Key: SPARK-2131 URL: https://issues.apache.org/jira/browse/SPARK-2131 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-2142) Give better indicator of how GC cuts into task time
[ https://issues.apache.org/jira/browse/SPARK-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza resolved SPARK-2142. --- Resolution: Not a Problem I ran some tests that indicated that only stop-the-world GC time gets included in the GC Time metric. Give better indicator of how GC cuts into task time --- Key: SPARK-2142 URL: https://issues.apache.org/jira/browse/SPARK-2142 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Sandy Ryza The currently reported GC time is a somewhat misleading metric. If I see a task that has taken 10 minutes with a GC time of 9 minutes, the straightforward interpretation is that GC is making my job 90% slower. This is true if all that time is spent in a full GC. But if a GC thread is only running in the background, it might not be impacting my task at all. What a user really wants to know is whether GC is the cause of their task being slow. Counting the number of full GCs or measuring the amount of time spent in stop-the-world GCs could be a more useful metric. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3468) WebUI Timeline-View feature
[ https://issues.apache.org/jira/browse/SPARK-3468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14145829#comment-14145829 ] Sandy Ryza commented on SPARK-3468: --- This looks like a really cool addition. WebUI Timeline-View feature --- Key: SPARK-3468 URL: https://issues.apache.org/jira/browse/SPARK-3468 Project: Spark Issue Type: New Feature Components: Web UI Reporter: Kousuke Saruta Attachments: executors.png, stages.png, taskDetails.png, tasks.png I sometimes trouble-shoot and analyse the cause of long time spending job. At the time, I find the stages which spends long time or fails, then I find the tasks which spends long time or fails, next I analyse the proportion of each phase in a task. Another case, I find executors which spends long time for running a task and analyse the details of a task. In such situation, I think it's helpful to visualize timeline view of stages / tasks / executors and visualize details of proportion of activity for each task. Now I'm developing prototypes like captures I attached. I'll integrate these viewer into WebUI. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3642) Better document the nuances of shared variables
Sandy Ryza created SPARK-3642: - Summary: Better document the nuances of shared variables Key: SPARK-3642 URL: https://issues.apache.org/jira/browse/SPARK-3642 Project: Spark Issue Type: Improvement Components: Documentation Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143908#comment-14143908 ] Sandy Ryza commented on SPARK-3622: --- Is this a duplicate of SPARK-2688? Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3577) Add task metric to report spill time
[ https://issues.apache.org/jira/browse/SPARK-3577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142524#comment-14142524 ] Sandy Ryza commented on SPARK-3577: --- No problem. Yeah, I agree that a spill time metric would be useful. Add task metric to report spill time Key: SPARK-3577 URL: https://issues.apache.org/jira/browse/SPARK-3577 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Kay Ousterhout Priority: Minor The ExternalSorter passes its own ShuffleWriteMetrics into ExternalSorter. The write time recorded in those metrics is never used. We should probably add task metrics to report this spill time, since for shuffles, this would have previously been reported as part of shuffle write time (with the original hash-based sorter). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3612) Executor shouldn't quit if heartbeat message fails to reach the driver
[ https://issues.apache.org/jira/browse/SPARK-3612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142006#comment-14142006 ] Sandy Ryza commented on SPARK-3612: --- Yeah, we should catch this. Will post a patch. Executor shouldn't quit if heartbeat message fails to reach the driver -- Key: SPARK-3612 URL: https://issues.apache.org/jira/browse/SPARK-3612 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Reynold Xin The thread started by Executor.startDriverHeartbeater can actually terminate the whole executor if AkkaUtils.askWithReply[HeartbeatResponse] throws an exception. I don't think we should quit the executor this way. At the very least, we would want to log a more meaningful exception then simply {code} 14/09/20 06:38:12 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:379) 14/09/20 06:38:45 WARN AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:379) 14/09/20 06:39:18 WARN AkkaUtils: Error sending message in 3 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:379) 14/09/20 06:39:21 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Driver Heartbeater,5,main] org.apache.spark.SparkException: Error sending message [message = Heartbeat(281,[Lscala.Tuple2;@4d9294db,BlockManagerId(281, ip-172-31-7-55.eu-west-1.compute.internal, 52303))] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:190) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:379) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) ... 1 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3605) Typo in SchemaRDD JavaDoc
Sandy Ryza created SPARK-3605: - Summary: Typo in SchemaRDD JavaDoc Key: SPARK-3605 URL: https://issues.apache.org/jira/browse/SPARK-3605 Project: Spark Issue Type: Bug Components: SQL Reporter: Sandy Ryza Priority: Trivial Examples are loading data from Parquet files by using by using the -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14141063#comment-14141063 ] Sandy Ryza commented on SPARK-3573: --- Currently SchemaRDD does depend on Catalyst. Are you thinking we'd take that out? I wasn't thinking about any specific drawbacks, unless SQL might need to depend on MLLib as well? I guess I'm thinking about it more from the perspective of what mental model we expect users to have when dealing with Datasets. SchemaRDD brings along baggage like LogicalPlans - do users need to understand what that is? SQL and ML types sometimes line up, sometimes have fuzzy relationships, and sometimes can't be translated. How does the mapping get defined? What stops someone from annotating a String column with numeric? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(iteractor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(raw, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3560) In yarn-cluster mode, the same jars are distributed through multiple mechanisms.
[ https://issues.apache.org/jira/browse/SPARK-3560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandy Ryza updated SPARK-3560: -- Summary: In yarn-cluster mode, the same jars are distributed through multiple mechanisms. (was: In yarn-cluster mode, jars are distributed through multiple mechanisms.) In yarn-cluster mode, the same jars are distributed through multiple mechanisms. Key: SPARK-3560 URL: https://issues.apache.org/jira/browse/SPARK-3560 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.1.0 Reporter: Sandy Ryza Priority: Critical In yarn-cluster mode, jars given to spark-submit's --jars argument should be distributed to executors through the distributed cache, not through fetching. Currently, Spark tries to distribute the jars both ways, which can cause executor errors related to trying to overwrite symlinks without write permissions. It looks like this was introduced by SPARK-2260, which sets spark.jars in yarn-cluster mode. Setting spark.jars is necessary for standalone cluster deploy mode, but harmful for yarn cluster deploy mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3573) Dataset
[ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14139958#comment-14139958 ] Sandy Ryza commented on SPARK-3573: --- Currently SchemaRDD lives inside SQL. Would we move it to core if we plan to use it in components that aren't related to SQL? Dataset --- Key: SPARK-3573 URL: https://issues.apache.org/jira/browse/SPARK-3573 Project: Spark Issue Type: Sub-task Components: MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema. .Sample code Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree. The proposed pipeline with dataset looks like the following (need more refinements): {code} sqlContext.jsonFile(/path/to/training/events, 0.01).registerTempTable(event) val training = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;).cache() val indexer = new Indexer() val interactor = new Interactor() val fvAssembler = new FeatureVectorAssembler() val treeClassifer = new DecisionTreeClassifer() val paramMap = new ParamMap() .put(indexer.features, Map(userCountryIndex - userCountry)) .put(indexer.sortByFrequency, true) .put(iteractor.features, Map(genderMatch - Array(userGender, targetGender))) .put(fvAssembler.features, Map(features - Array(genderMatch, userCountryIndex, userFeatures))) .put(fvAssembler.dense, true) .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes features and label columns. val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier) val model = pipeline.fit(raw, paramMap) sqlContext.jsonFile(/path/to/events, 0.01).registerTempTable(event) val test = sqlContext.sql( SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures, ad.targetGender AS targetGender FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;) val prediction = model.transform(test).select('eventId, 'prediction) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3560) In yarn-cluster mode, jars are distributed through multiple mechanisms.
[ https://issues.apache.org/jira/browse/SPARK-3560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14136882#comment-14136882 ] Sandy Ryza commented on SPARK-3560: --- Right. I believe Min from LinkedIn who discovered the issue is planning to submit a patch. In yarn-cluster mode, jars are distributed through multiple mechanisms. --- Key: SPARK-3560 URL: https://issues.apache.org/jira/browse/SPARK-3560 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.1.0 Reporter: Sandy Ryza Priority: Critical In yarn-cluster mode, jars given to spark-submit's --jars argument should be distributed to executors through the distributed cache, not through fetching. Currently, Spark tries to distribute the jars both ways, which can cause executor errors related to trying to overwrite symlinks without write permissions. It looks like this was introduced by SPARK-2260, which sets spark.jars in yarn-cluster mode. Setting spark.jars is necessary for standalone cluster deploy mode, but harmful for yarn cluster deploy mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3574) Shuffle finish time always reported as -1
[ https://issues.apache.org/jira/browse/SPARK-3574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14138246#comment-14138246 ] Sandy Ryza commented on SPARK-3574: --- On it Shuffle finish time always reported as -1 - Key: SPARK-3574 URL: https://issues.apache.org/jira/browse/SPARK-3574 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: Kay Ousterhout shuffleFinishTime is always reported as -1. I think the way we fix this should be to set the shuffleFinishTime in each ShuffleWriteMetrics as the shuffles finish, but when aggregating the metrics, only report shuffleFinishTime as something other than -1 when *all* of the shuffles have completed. [~sandyr], it looks like this was introduced in your recent patch to incrementally report metrics. Any chance you can fix this? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3577) Shuffle write time incorrect for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-3577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14138245#comment-14138245 ] Sandy Ryza commented on SPARK-3577: --- On it Shuffle write time incorrect for sort-based shuffle --- Key: SPARK-3577 URL: https://issues.apache.org/jira/browse/SPARK-3577 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Kay Ousterhout Priority: Blocker After this change https://github.com/apache/spark/commit/4e982364426c7d65032e8006c63ca4f9a0d40470 (cc [~sandyr] [~pwendell]) the ExternalSorter passes its own ShuffleWriteMetrics into ExternalSorter. The write time recorded in those metrics is never used -- meaning that when someone is using sort-based shuffle, the shuffle write time will be recorded as 0. [~sandyr] do you have time to take a look at this? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3530) Pipeline and Parameters
[ https://issues.apache.org/jira/browse/SPARK-3530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14138319#comment-14138319 ] Sandy Ryza commented on SPARK-3530: --- bq. Isn't the fit multiple models at once part a bit of an early optimization ? I personally think this is a useful feature in nearly all situations and that parameter search is one of the most important problems for a machine learning framework to address. Avoiding premature optimization usually refers to getting bang for buck in terms of time spent. However, if this something we think might even be eventually useful, it's worth making API decisions that will accommodate it. Pipeline and Parameters --- Key: SPARK-3530 URL: https://issues.apache.org/jira/browse/SPARK-3530 Project: Spark Issue Type: Sub-task Components: ML, MLlib Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Critical This part of the design doc is for pipelines and parameters. I put the design doc at https://docs.google.com/document/d/1rVwXRjWKfIb-7PI6b86ipytwbUH7irSNLF1_6dLmh8o/edit?usp=sharing I will copy the proposed interfaces to this JIRA later. Some sample code can be viewed at: https://github.com/mengxr/spark-ml/ Please help review the design and post your comments here. Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org