[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119367#comment-14119367 ] Reynold Xin commented on SPARK-3019: Sounds good to me. As I said in the design proposal, 2g limit is not a goal for this interface refactoring. We can build on ManagedBuffer (or something else similar to it) to support going over the 2g limit. Since the ManagedBuffer interface is so tiny, it is pretty easy to swap in/out or refactor. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119365#comment-14119365 ] Mridul Muralidharan commented on SPARK-3019: I will try to push the version we had last worked on for 2G fix (a pre-1.1 fork) to git later today/this week - and we can take a look at it. It might require some effort to rebase it to 1.1 since it is slightly dated; but that can be done if required : the main reason for the push would be to illustrate the reason why the interfaces exist in SPARK-1476 and how they are used : so that there is a better understanding of the required functional change. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119362#comment-14119362 ] Mridul Muralidharan commented on SPARK-3019: Just went over the proposal in some detail. [~rxin] did you take a look at the proposal in SPARK-1476 ? The ManagedBuffer detailed in this document does not satisfy most of the interface or functional requirements in 1476 - which would require us to redesign this interface when we need to support larger than 2 GB for blocks in spark : unless I missed something. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14118849#comment-14118849 ] Apache Spark commented on SPARK-3019: - User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/2240 > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14101767#comment-14101767 ] Reynold Xin commented on SPARK-3019: Also the rebalance of blocks and that should be delegated to a level above the block transfer interface. This interface is very simple: it handles fetching/receiving of blocks. Everything else should be kept out of it for separation of concerns. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14101431#comment-14101431 ] Reynold Xin commented on SPARK-3019: I don't think my planned implementation of the interface supports this, but the interface itself should support what you suggested, since fetchBlocks is a non-blocking call that returns an Iterator. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100043#comment-14100043 ] Mridul Muralidharan commented on SPARK-3019: Unfortunately, I never went into how MR does shuffle - though I was supposed to dig into this with Tom in Q1 - Q2 timeframe; so hopefully I am not way off base here ! bq. It's true that we can't start on a function which requires a full view of the coming in for a particular key, but we can start merging and combining. In case of spark, unlike MR, we cannot start merging/combining until all blocks are fetched. Well, technically we can - but we will end up repeating merge/combine multiple times for each new map output fetched, and it would be very suboptimal since we will be reading way more times from disk (hope I did not get this wrong /CC [~matei]). bq. MapReduce makes this assessment. Each reducer has a pool of memory for fetching data into, and avoids fetching more data than can fit into this pool. I was under the impression that Spark does something similar. In case of hash based shuffle, obviously this is not possible. In case of sort based shuffle, I can see this being possible : but it is not supported (iirc /CC [~matei]). > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099983#comment-14099983 ] Sandy Ryza commented on SPARK-3019: --- Thanks for the info Mridul. A few extra clarifications. bq. Until we read from all mappers, shuffle cant actually start. It's true that we can't start on a function which requires a full view of the coming in for a particular key, but we can start merging and combining. bq. While reading data off network, we cannot make an assessment if the read data can fit into memory or not. MapReduce makes this assessment. Each reducer has a pool of memory for fetching data into, and avoids fetching more data than can fit into this pool. I was under the impression that Spark does something similar. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099910#comment-14099910 ] Mridul Muralidharan commented on SPARK-3019: Btw, can we do something about block replication when replication factor > 1 ? Currently we silently loose replicas; and the block placement strategy for replication is fairly non existant. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099909#comment-14099909 ] Mridul Muralidharan commented on SPARK-3019: I am yet to go through the proposal in detail so will defer comments on that for later; but to get some clarity on discussion around Sandy's point : - Until we read from all mappers, shuffle cant actually start. Even if a single mapper's output is small enough to fit into memory (which it need not); num_mappers * avg_size_of_map_output_per_reducer could be way larger than available memory by orders. (This is fairly common for us for example). This was the reason we actually worked on 2G fix btw - individual blocks in a mapper and also the data per reducer for a mapper was larger than 2G :-) - While reading data off network, we cannot make an assessment if the read data can fit into memory or not (since there are other parallel read requests pending for this and other cores in the same executor). So spooling intermediate data to disk would become necessary at both mapper side (which it already does) and at reducer side (which we dont do currently - assume that a block can fit into reducer memory as part of doing a remote fetch). This becomes more relevant when we want to target bigger blocks of data and tackle skew in data (for shuffle) > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099888#comment-14099888 ] Sandy Ryza commented on SPARK-3019: --- I agree that it's not typically a problem, but I imagine there are not-horribly-uncommon skew situations where the data coming from a single mapper to a single reducer won't fit in memory. In those cases we would need to send the block to disk when we might otherwise be able to stream over its records in memory. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099571#comment-14099571 ] Reynold Xin commented on SPARK-3019: Sandy I don't know if that is a big problem in usual Spark applications. Majority of the shuffle blocks are very small, so it is very fast to fetch them and Spark can immediately start processing those fetched blocks. Do you see some jobs where the shuffle fetch wait time is large? > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099537#comment-14099537 ] Sandy Ryza commented on SPARK-3019: --- Just scanned this, so apologies if the answer is obvious from a closer reading: Does the proposal allow for streaming over blocks before they're fully read? I believe this is useful for the MapReduce shuffle implementation, and last time I looked at porting over some of the concepts from it, I noticed this was a limitation. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14098172#comment-14098172 ] Reynold Xin commented on SPARK-3019: cc [~mridulm80] > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14096658#comment-14096658 ] Reynold Xin commented on SPARK-3019: Possibly, although I think MapR FS is more optimized for this kind of workload, as they have done that previously for MR shuffle. Don't quote me on this one. > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14096654#comment-14096654 ] Hari Shreedharan commented on SPARK-3019: - Why specifically MapR FS? You could use HDFS for this as well right? > Pluggable block transfer (data plane communication) interface > - > > Key: SPARK-3019 > URL: https://issues.apache.org/jira/browse/SPARK-3019 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Reynold Xin > Attachments: PluggableBlockTransferServiceProposalforSpark - draft > 1.pdf > > > The attached design doc proposes a standard interface for block transferring, > which will make future engineering of this functionality easier, allowing the > Spark community to provide alternative implementations. > Block transferring is a critical function in Spark. All of the following > depend on it: > * shuffle > * torrent broadcast > * block replication in BlockManager > * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org