Since it sounds like there is consensus here, I've opened an issue for this: https://issues.apache.org/jira/browse/SPARK-23889
On Sun, Apr 1, 2018 at 9:32 AM, Patrick Woody <patrick.woo...@gmail.com> wrote: > Yep, that sounds reasonable to me! > > On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> +1 >> >> -------- Original message -------- >> From: Ryan Blue <rb...@netflix.com> >> Date: 3/30/18 2:28 PM (GMT-08:00) >> To: Patrick Woody <patrick.woo...@gmail.com> >> Cc: Russell Spitzer <russell.spit...@gmail.com>, Wenchen Fan < >> cloud0...@gmail.com>, Ted Yu <yuzhih...@gmail.com>, Spark Dev List < >> dev@spark.apache.org> >> Subject: Re: DataSourceV2 write input requirements >> >> You're right. A global sort would change the clustering if it had more >> fields than the clustering. >> >> Then what about this: if there is no RequiredClustering, then the sort is >> a global sort. If RequiredClustering is present, then the clustering is >> applied and the sort is a partition-level sort. >> >> That rule would mean that within a partition you always get the sort, but >> an explicit clustering overrides the partitioning a sort might try to >> introduce. Does that sound reasonable? >> >> rb >> >> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woo...@gmail.com >> > wrote: >> >>> Does that methodology work in this specific case? The ordering must be a >>> subset of the clustering to guarantee they exist in the same partition when >>> doing a global sort I thought. Though I get the gist that if it does >>> satisfy, then there is no reason to not choose the global sort. >>> >>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote: >>> >>>> > Can you expand on how the ordering containing the clustering >>>> expressions would ensure the global sort? >>>> >>>> The idea was to basically assume that if the clustering can be >>>> satisfied by a global sort, then do the global sort. For example, if the >>>> clustering is Set("b", "a") and the sort is Seq("a", "b", "c") then do a >>>> global sort by columns a, b, and c. >>>> >>>> Technically, you could do this with a hash partitioner instead of a >>>> range partitioner and sort within each partition, but that doesn't make >>>> much sense because the partitioning would ensure that each partition has >>>> just one combination of the required clustering columns. Using a hash >>>> partitioner would make it so that the in-partition sort basically ignores >>>> the first few values, so it must be that the intent was a global sort. >>>> >>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody < >>>> patrick.woo...@gmail.com> wrote: >>>> >>>>> Right, you could use this to store a global ordering if there is only >>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that >>>>>> case, you would still have a clustering and an ordering, but the ordering >>>>>> would need to include all fields of the clustering. A way to pass in the >>>>>> partition ordinal for the source to store would be required. >>>>> >>>>> >>>>> Can you expand on how the ordering containing the clustering >>>>> expressions would ensure the global sort? Having an RangePartitioning >>>>> would >>>>> certainly satisfy, but it isn't required - is the suggestion that if Spark >>>>> sees this overlap, then it plans a global sort? >>>>> >>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer < >>>>> russell.spit...@gmail.com> wrote: >>>>> >>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to >>>>>> get more detailed statistics. Like on read we could be using sketch data >>>>>> structures to get estimates on unique values and density for each column. >>>>>> You may be right that the real way for this to be handled would be >>>>>> giving a >>>>>> "cost" back to a higher order optimizer which can decide which method to >>>>>> use rather than having the data source itself do it. This is probably in >>>>>> a >>>>>> far future version of the api. >>>>>> >>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote: >>>>>> >>>>>>> Cassandra can insert records with the same partition-key faster if >>>>>>> they arrive in the same payload. But this is only beneficial if the >>>>>>> incoming dataset has multiple entries for the same partition key. >>>>>>> >>>>>>> Thanks for the example, the recommended partitioning use case makes >>>>>>> more sense now. I think we could have two interfaces, a >>>>>>> RequiresClustering and a RecommendsClustering if we want to support >>>>>>> this. But I’m skeptical it will be useful for two reasons: >>>>>>> >>>>>>> - Do we want to optimize the low cardinality case? Shuffles are >>>>>>> usually much cheaper at smaller sizes, so I’m not sure it is >>>>>>> necessary to >>>>>>> optimize this away. >>>>>>> - How do we know there isn’t just a few partition keys for all >>>>>>> the records? It may look like a shuffle wouldn’t help, but we don’t >>>>>>> know >>>>>>> the partition keys until it is too late. >>>>>>> >>>>>>> Then there’s also the logic for avoiding the shuffle and how to >>>>>>> calculate the cost, which sounds like something that needs some details >>>>>>> from CBO. >>>>>>> >>>>>>> I would assume that given the estimated data size from Spark and >>>>>>> options passed in from the user, the data source could make a more >>>>>>> intelligent requirement on the write format than Spark independently. >>>>>>> >>>>>>> This is a good point. >>>>>>> >>>>>>> What would an implementation actually do here and how would >>>>>>> information be passed? For my use cases, the store would produce the >>>>>>> number >>>>>>> of tasks based on the estimated incoming rows, because the source has >>>>>>> the >>>>>>> best idea of how the rows will compress. But, that’s just applying a >>>>>>> multiplier most of the time. To be very useful, this would have to >>>>>>> handle >>>>>>> skew in the rows (think row with a type where total size depends on >>>>>>> type) >>>>>>> and that’s a bit harder. I think maybe an interface that can provide >>>>>>> relative cost estimates based on partition keys would be helpful, but >>>>>>> then >>>>>>> keep the planning logic in Spark. >>>>>>> >>>>>>> This is probably something that we could add later as we find use >>>>>>> cases that require it? >>>>>>> >>>>>>> I wouldn’t assume that a data source requiring a certain write >>>>>>> format would give any guarantees around reading the same data? In the >>>>>>> cases >>>>>>> where it is a complete overwrite it would, but for independent writes it >>>>>>> could still be useful for statistics or compression. >>>>>>> >>>>>>> Right, you could use this to store a global ordering if there is >>>>>>> only one write (e.g., CTAS). I don’t think anything needs to change in >>>>>>> that >>>>>>> case, you would still have a clustering and an ordering, but the >>>>>>> ordering >>>>>>> would need to include all fields of the clustering. A way to pass in the >>>>>>> partition ordinal for the source to store would be required. >>>>>>> >>>>>>> For the second point that ordering is useful for statistics and >>>>>>> compression, I completely agree. Our best practices doc tells users to >>>>>>> always add a global sort when writing because you get the benefit of a >>>>>>> range partitioner to handle skew, plus the stats and compression you’re >>>>>>> talking about to optimize for reads. I think the proposed API can >>>>>>> request a >>>>>>> global ordering from Spark already. My only point is that there isn’t >>>>>>> much >>>>>>> the source can do to guarantee ordering for reads when there is more >>>>>>> than >>>>>>> one write. >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody < >>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>> >>>>>>>> Spark would always apply the required clustering and sort order >>>>>>>>> because they are required by the data source. It is reasonable for a >>>>>>>>> source >>>>>>>>> to reject data that isn’t properly prepared. For example, data must be >>>>>>>>> written to HTable files with keys in order or else the files are >>>>>>>>> invalid. >>>>>>>>> Sorting should not be implemented in the sources themselves because >>>>>>>>> Spark >>>>>>>>> handles concerns like spilling to disk. Spark must prepare data >>>>>>>>> correctly, >>>>>>>>> which is why the interfaces start with “Requires”. >>>>>>>> >>>>>>>> >>>>>>>> This was in reference to Russell's suggestion that the data source >>>>>>>> could have a required sort, but only a recommended partitioning. I >>>>>>>> don't >>>>>>>> have an immediate recommending use case that would come to mind >>>>>>>> though. I'm >>>>>>>> definitely in sync that the data source itself shouldn't do work >>>>>>>> outside of >>>>>>>> the writes themselves. >>>>>>>> >>>>>>>> Considering the second use case you mentioned first, I don’t think >>>>>>>>> it is a good idea for a table to put requirements on the number of >>>>>>>>> tasks >>>>>>>>> used for a write. The parallelism should be set appropriately for the >>>>>>>>> data >>>>>>>>> volume, which is for Spark or the user to determine. A minimum or >>>>>>>>> maximum >>>>>>>>> number of tasks could cause bad behavior. >>>>>>>> >>>>>>>> >>>>>>>> For your first use case, an explicit global ordering, the problem >>>>>>>>> is that there can’t be an explicit global ordering for a table when >>>>>>>>> it is >>>>>>>>> populated by a series of independent writes. Each write could have a >>>>>>>>> global >>>>>>>>> order, but once those files are written, you have to deal with >>>>>>>>> multiple >>>>>>>>> sorted data sets. I think it makes sense to focus on order within data >>>>>>>>> files, not order between data files. >>>>>>>> >>>>>>>> >>>>>>>> This is where I'm interested in learning about the separation of >>>>>>>> responsibilities for the data source and how "smart" it is supposed to >>>>>>>> be. >>>>>>>> >>>>>>>> For the first part, I would assume that given the estimated data >>>>>>>> size from Spark and options passed in from the user, the data source >>>>>>>> could >>>>>>>> make a more intelligent requirement on the write format than Spark >>>>>>>> independently. Somewhat analogous to how the current FileSource does >>>>>>>> bin >>>>>>>> packing of small files on the read side, restricting parallelism for >>>>>>>> the >>>>>>>> sake of overhead. >>>>>>>> >>>>>>>> For the second, I wouldn't assume that a data source requiring a >>>>>>>> certain write format would give any guarantees around reading the same >>>>>>>> data? In the cases where it is a complete overwrite it would, but for >>>>>>>> independent writes it could still be useful for statistics or >>>>>>>> compression. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Pat >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> > -- Ryan Blue Software Engineer Netflix