+1 -------- Original message --------From: Ryan Blue <rb...@netflix.com> Date: 3/30/18 2:28 PM (GMT-08:00) To: Patrick Woody <patrick.woo...@gmail.com> Cc: Russell Spitzer <russell.spit...@gmail.com>, Wenchen Fan <cloud0...@gmail.com>, Ted Yu <yuzhih...@gmail.com>, Spark Dev List <dev@spark.apache.org> Subject: Re: DataSourceV2 write input requirements You're right. A global sort would change the clustering if it had more fields than the clustering. Then what about this: if there is no RequiredClustering, then the sort is a global sort. If RequiredClustering is present, then the clustering is applied and the sort is a partition-level sort. That rule would mean that within a partition you always get the sort, but an explicit clustering overrides the partitioning a sort might try to introduce. Does that sound reasonable? rb On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woo...@gmail.com> wrote: Does that methodology work in this specific case? The ordering must be a subset of the clustering to guarantee they exist in the same partition when doing a global sort I thought. Though I get the gist that if it does satisfy, then there is no reason to not choose the global sort.
On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote: > Can you expand on how the ordering containing the clustering expressions >would ensure the global sort? The idea was to basically assume that if the clustering can be satisfied by a global sort, then do the global sort. For example, if the clustering is Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort by columns a, b, and c. Technically, you could do this with a hash partitioner instead of a range partitioner and sort within each partition, but that doesn't make much sense because the partitioning would ensure that each partition has just one combination of the required clustering columns. Using a hash partitioner would make it so that the in-partition sort basically ignores the first few values, so it must be that the intent was a global sort. On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <patrick.woo...@gmail.com> wrote: Right, you could use this to store a global ordering if there is only one write (e.g., CTAS). I don’t think anything needs to change in that case, you would still have a clustering and an ordering, but the ordering would need to include all fields of the clustering. A way to pass in the partition ordinal for the source to store would be required. Can you expand on how the ordering containing the clustering expressions would ensure the global sort? Having an RangePartitioning would certainly satisfy, but it isn't required - is the suggestion that if Spark sees this overlap, then it plans a global sort? On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <russell.spit...@gmail.com> wrote: @RyanBlue I'm hoping that through the CBO effort we will continue to get more detailed statistics. Like on read we could be using sketch data structures to get estimates on unique values and density for each column. You may be right that the real way for this to be handled would be giving a "cost" back to a higher order optimizer which can decide which method to use rather than having the data source itself do it. This is probably in a far future version of the api. On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote: Cassandra can insert records with the same partition-key faster if they arrive in the same payload. But this is only beneficial if the incoming dataset has multiple entries for the same partition key. Thanks for the example, the recommended partitioning use case makes more sense now. I think we could have two interfaces, a RequiresClustering and a RecommendsClustering if we want to support this. But I’m skeptical it will be useful for two reasons: Do we want to optimize the low cardinality case? Shuffles are usually much cheaper at smaller sizes, so I’m not sure it is necessary to optimize this away. How do we know there isn’t just a few partition keys for all the records? It may look like a shuffle wouldn’t help, but we don’t know the partition keys until it is too late. Then there’s also the logic for avoiding the shuffle and how to calculate the cost, which sounds like something that needs some details from CBO. I would assume that given the estimated data size from Spark and options passed in from the user, the data source could make a more intelligent requirement on the write format than Spark independently. This is a good point. What would an implementation actually do here and how would information be passed? For my use cases, the store would produce the number of tasks based on the estimated incoming rows, because the source has the best idea of how the rows will compress. But, that’s just applying a multiplier most of the time. To be very useful, this would have to handle skew in the rows (think row with a type where total size depends on type) and that’s a bit harder. I think maybe an interface that can provide relative cost estimates based on partition keys would be helpful, but then keep the planning logic in Spark. This is probably something that we could add later as we find use cases that require it? I wouldn’t assume that a data source requiring a certain write format would give any guarantees around reading the same data? In the cases where it is a complete overwrite it would, but for independent writes it could still be useful for statistics or compression. Right, you could use this to store a global ordering if there is only one write (e.g., CTAS). I don’t think anything needs to change in that case, you would still have a clustering and an ordering, but the ordering would need to include all fields of the clustering. A way to pass in the partition ordinal for the source to store would be required. For the second point that ordering is useful for statistics and compression, I completely agree. Our best practices doc tells users to always add a global sort when writing because you get the benefit of a range partitioner to handle skew, plus the stats and compression you’re talking about to optimize for reads. I think the proposed API can request a global ordering from Spark already. My only point is that there isn’t much the source can do to guarantee ordering for reads when there is more than one write. On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <patrick.woo...@gmail.com> wrote: Spark would always apply the required clustering and sort order because they are required by the data source. It is reasonable for a source to reject data that isn’t properly prepared. For example, data must be written to HTable files with keys in order or else the files are invalid. Sorting should not be implemented in the sources themselves because Spark handles concerns like spilling to disk. Spark must prepare data correctly, which is why the interfaces start with “Requires”. This was in reference to Russell's suggestion that the data source could have a required sort, but only a recommended partitioning. I don't have an immediate recommending use case that would come to mind though. I'm definitely in sync that the data source itself shouldn't do work outside of the writes themselves. Considering the second use case you mentioned first, I don’t think it is a good idea for a table to put requirements on the number of tasks used for a write. The parallelism should be set appropriately for the data volume, which is for Spark or the user to determine. A minimum or maximum number of tasks could cause bad behavior. For your first use case, an explicit global ordering, the problem is that there can’t be an explicit global ordering for a table when it is populated by a series of independent writes. Each write could have a global order, but once those files are written, you have to deal with multiple sorted data sets. I think it makes sense to focus on order within data files, not order between data files. This is where I'm interested in learning about the separation of responsibilities for the data source and how "smart" it is supposed to be. For the first part, I would assume that given the estimated data size from Spark and options passed in from the user, the data source could make a more intelligent requirement on the write format than Spark independently. Somewhat analogous to how the current FileSource does bin packing of small files on the read side, restricting parallelism for the sake of overhead. For the second, I wouldn't assume that a data source requiring a certain write format would give any guarantees around reading the same data? In the cases where it is a complete overwrite it would, but for independent writes it could still be useful for statistics or compression. Thanks Pat On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> wrote: