Since it sounds like there is consensus here, I've opened an issue for
this: https://issues.apache.org/jira/browse/SPARK-23889

On Sun, Apr 1, 2018 at 9:32 AM, Patrick Woody <patrick.woo...@gmail.com>
wrote:

> Yep, that sounds reasonable to me!
>
> On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> +1
>>
>> -------- Original message --------
>> From: Ryan Blue <rb...@netflix.com>
>> Date: 3/30/18 2:28 PM (GMT-08:00)
>> To: Patrick Woody <patrick.woo...@gmail.com>
>> Cc: Russell Spitzer <russell.spit...@gmail.com>, Wenchen Fan <
>> cloud0...@gmail.com>, Ted Yu <yuzhih...@gmail.com>, Spark Dev List <
>> dev@spark.apache.org>
>> Subject: Re: DataSourceV2 write input requirements
>>
>> You're right. A global sort would change the clustering if it had more
>> fields than the clustering.
>>
>> Then what about this: if there is no RequiredClustering, then the sort is
>> a global sort. If RequiredClustering is present, then the clustering is
>> applied and the sort is a partition-level sort.
>>
>> That rule would mean that within a partition you always get the sort, but
>> an explicit clustering overrides the partitioning a sort might try to
>> introduce. Does that sound reasonable?
>>
>> rb
>>
>> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woo...@gmail.com
>> > wrote:
>>
>>> Does that methodology work in this specific case? The ordering must be a
>>> subset of the clustering to guarantee they exist in the same partition when
>>> doing a global sort I thought. Though I get the gist that if it does
>>> satisfy, then there is no reason to not choose the global sort.
>>>
>>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> > Can you expand on how the ordering containing the clustering
>>>> expressions would ensure the global sort?
>>>>
>>>> The idea was to basically assume that if the clustering can be
>>>> satisfied by a global sort, then do the global sort. For example, if the
>>>> clustering is Set("b", "a") and the sort is Seq("a", "b", "c") then do a
>>>> global sort by columns a, b, and c.
>>>>
>>>> Technically, you could do this with a hash partitioner instead of a
>>>> range partitioner and sort within each partition, but that doesn't make
>>>> much sense because the partitioning would ensure that each partition has
>>>> just one combination of the required clustering columns. Using a hash
>>>> partitioner would make it so that the in-partition sort basically ignores
>>>> the first few values, so it must be that the intent was a global sort.
>>>>
>>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <
>>>> patrick.woo...@gmail.com> wrote:
>>>>
>>>>> Right, you could use this to store a global ordering if there is only
>>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>>> partition ordinal for the source to store would be required.
>>>>>
>>>>>
>>>>> Can you expand on how the ordering containing the clustering
>>>>> expressions would ensure the global sort? Having an RangePartitioning 
>>>>> would
>>>>> certainly satisfy, but it isn't required - is the suggestion that if Spark
>>>>> sees this overlap, then it plans a global sort?
>>>>>
>>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>>>> russell.spit...@gmail.com> wrote:
>>>>>
>>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to
>>>>>> get more detailed statistics. Like on read we could be using sketch data
>>>>>> structures to get estimates on unique values and density for each column.
>>>>>> You may be right that the real way for this to be handled would be 
>>>>>> giving a
>>>>>> "cost" back to a higher order optimizer which can decide which method to
>>>>>> use rather than having the data source itself do it. This is probably in 
>>>>>> a
>>>>>> far future version of the api.
>>>>>>
>>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> Cassandra can insert records with the same partition-key faster if
>>>>>>> they arrive in the same payload. But this is only beneficial if the
>>>>>>> incoming dataset has multiple entries for the same partition key.
>>>>>>>
>>>>>>> Thanks for the example, the recommended partitioning use case makes
>>>>>>> more sense now. I think we could have two interfaces, a
>>>>>>> RequiresClustering and a RecommendsClustering if we want to support
>>>>>>> this. But I’m skeptical it will be useful for two reasons:
>>>>>>>
>>>>>>>    - Do we want to optimize the low cardinality case? Shuffles are
>>>>>>>    usually much cheaper at smaller sizes, so I’m not sure it is 
>>>>>>> necessary to
>>>>>>>    optimize this away.
>>>>>>>    - How do we know there isn’t just a few partition keys for all
>>>>>>>    the records? It may look like a shuffle wouldn’t help, but we don’t 
>>>>>>> know
>>>>>>>    the partition keys until it is too late.
>>>>>>>
>>>>>>> Then there’s also the logic for avoiding the shuffle and how to
>>>>>>> calculate the cost, which sounds like something that needs some details
>>>>>>> from CBO.
>>>>>>>
>>>>>>> I would assume that given the estimated data size from Spark and
>>>>>>> options passed in from the user, the data source could make a more
>>>>>>> intelligent requirement on the write format than Spark independently.
>>>>>>>
>>>>>>> This is a good point.
>>>>>>>
>>>>>>> What would an implementation actually do here and how would
>>>>>>> information be passed? For my use cases, the store would produce the 
>>>>>>> number
>>>>>>> of tasks based on the estimated incoming rows, because the source has 
>>>>>>> the
>>>>>>> best idea of how the rows will compress. But, that’s just applying a
>>>>>>> multiplier most of the time. To be very useful, this would have to 
>>>>>>> handle
>>>>>>> skew in the rows (think row with a type where total size depends on 
>>>>>>> type)
>>>>>>> and that’s a bit harder. I think maybe an interface that can provide
>>>>>>> relative cost estimates based on partition keys would be helpful, but 
>>>>>>> then
>>>>>>> keep the planning logic in Spark.
>>>>>>>
>>>>>>> This is probably something that we could add later as we find use
>>>>>>> cases that require it?
>>>>>>>
>>>>>>> I wouldn’t assume that a data source requiring a certain write
>>>>>>> format would give any guarantees around reading the same data? In the 
>>>>>>> cases
>>>>>>> where it is a complete overwrite it would, but for independent writes it
>>>>>>> could still be useful for statistics or compression.
>>>>>>>
>>>>>>> Right, you could use this to store a global ordering if there is
>>>>>>> only one write (e.g., CTAS). I don’t think anything needs to change in 
>>>>>>> that
>>>>>>> case, you would still have a clustering and an ordering, but the 
>>>>>>> ordering
>>>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>>>> partition ordinal for the source to store would be required.
>>>>>>>
>>>>>>> For the second point that ordering is useful for statistics and
>>>>>>> compression, I completely agree. Our best practices doc tells users to
>>>>>>> always add a global sort when writing because you get the benefit of a
>>>>>>> range partitioner to handle skew, plus the stats and compression you’re
>>>>>>> talking about to optimize for reads. I think the proposed API can 
>>>>>>> request a
>>>>>>> global ordering from Spark already. My only point is that there isn’t 
>>>>>>> much
>>>>>>> the source can do to guarantee ordering for reads when there is more 
>>>>>>> than
>>>>>>> one write.
>>>>>>> ​
>>>>>>>
>>>>>>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <
>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Spark would always apply the required clustering and sort order
>>>>>>>>> because they are required by the data source. It is reasonable for a 
>>>>>>>>> source
>>>>>>>>> to reject data that isn’t properly prepared. For example, data must be
>>>>>>>>> written to HTable files with keys in order or else the files are 
>>>>>>>>> invalid.
>>>>>>>>> Sorting should not be implemented in the sources themselves because 
>>>>>>>>> Spark
>>>>>>>>> handles concerns like spilling to disk. Spark must prepare data 
>>>>>>>>> correctly,
>>>>>>>>> which is why the interfaces start with “Requires”.
>>>>>>>>
>>>>>>>>
>>>>>>>> This was in reference to Russell's suggestion that the data source
>>>>>>>> could have a required sort, but only a recommended partitioning. I 
>>>>>>>> don't
>>>>>>>> have an immediate recommending use case that would come to mind 
>>>>>>>> though. I'm
>>>>>>>> definitely in sync that the data source itself shouldn't do work 
>>>>>>>> outside of
>>>>>>>> the writes themselves.
>>>>>>>>
>>>>>>>> Considering the second use case you mentioned first, I don’t think
>>>>>>>>> it is a good idea for a table to put requirements on the number of 
>>>>>>>>> tasks
>>>>>>>>> used for a write. The parallelism should be set appropriately for the 
>>>>>>>>> data
>>>>>>>>> volume, which is for Spark or the user to determine. A minimum or 
>>>>>>>>> maximum
>>>>>>>>> number of tasks could cause bad behavior.
>>>>>>>>
>>>>>>>>
>>>>>>>> For your first use case, an explicit global ordering, the problem
>>>>>>>>> is that there can’t be an explicit global ordering for a table when 
>>>>>>>>> it is
>>>>>>>>> populated by a series of independent writes. Each write could have a 
>>>>>>>>> global
>>>>>>>>> order, but once those files are written, you have to deal with 
>>>>>>>>> multiple
>>>>>>>>> sorted data sets. I think it makes sense to focus on order within data
>>>>>>>>> files, not order between data files.
>>>>>>>>
>>>>>>>>
>>>>>>>> This is where I'm interested in learning about the separation of
>>>>>>>> responsibilities for the data source and how "smart" it is supposed to 
>>>>>>>> be.
>>>>>>>>
>>>>>>>> For the first part, I would assume that given the estimated data
>>>>>>>> size from Spark and options passed in from the user, the data source 
>>>>>>>> could
>>>>>>>> make a more intelligent requirement on the write format than Spark
>>>>>>>> independently. Somewhat analogous to how the current FileSource does 
>>>>>>>> bin
>>>>>>>> packing of small files on the read side, restricting parallelism for 
>>>>>>>> the
>>>>>>>> sake of overhead.
>>>>>>>>
>>>>>>>> For the second, I wouldn't assume that a data source requiring a
>>>>>>>> certain write format would give any guarantees around reading the same
>>>>>>>> data? In the cases where it is a complete overwrite it would, but for
>>>>>>>> independent writes it could still be useful for statistics or 
>>>>>>>> compression.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Pat
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to