Re: DataSourceV2 write input requirements

2018-04-06 Thread Ryan Blue
Since it sounds like there is consensus here, I've opened an issue for
this: https://issues.apache.org/jira/browse/SPARK-23889

On Sun, Apr 1, 2018 at 9:32 AM, Patrick Woody 
wrote:

> Yep, that sounds reasonable to me!
>
> On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu  wrote:
>
>> +1
>>
>>  Original message 
>> From: Ryan Blue 
>> Date: 3/30/18 2:28 PM (GMT-08:00)
>> To: Patrick Woody 
>> Cc: Russell Spitzer , Wenchen Fan <
>> cloud0...@gmail.com>, Ted Yu , Spark Dev List <
>> dev@spark.apache.org>
>> Subject: Re: DataSourceV2 write input requirements
>>
>> You're right. A global sort would change the clustering if it had more
>> fields than the clustering.
>>
>> Then what about this: if there is no RequiredClustering, then the sort is
>> a global sort. If RequiredClustering is present, then the clustering is
>> applied and the sort is a partition-level sort.
>>
>> That rule would mean that within a partition you always get the sort, but
>> an explicit clustering overrides the partitioning a sort might try to
>> introduce. Does that sound reasonable?
>>
>> rb
>>
>> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody > > wrote:
>>
>>> Does that methodology work in this specific case? The ordering must be a
>>> subset of the clustering to guarantee they exist in the same partition when
>>> doing a global sort I thought. Though I get the gist that if it does
>>> satisfy, then there is no reason to not choose the global sort.
>>>
>>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue  wrote:
>>>
>>>> > Can you expand on how the ordering containing the clustering
>>>> expressions would ensure the global sort?
>>>>
>>>> The idea was to basically assume that if the clustering can be
>>>> satisfied by a global sort, then do the global sort. For example, if the
>>>> clustering is Set("b", "a") and the sort is Seq("a", "b", "c") then do a
>>>> global sort by columns a, b, and c.
>>>>
>>>> Technically, you could do this with a hash partitioner instead of a
>>>> range partitioner and sort within each partition, but that doesn't make
>>>> much sense because the partitioning would ensure that each partition has
>>>> just one combination of the required clustering columns. Using a hash
>>>> partitioner would make it so that the in-partition sort basically ignores
>>>> the first few values, so it must be that the intent was a global sort.
>>>>
>>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <
>>>> patrick.woo...@gmail.com> wrote:
>>>>
>>>>> Right, you could use this to store a global ordering if there is only
>>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>>> partition ordinal for the source to store would be required.
>>>>>
>>>>>
>>>>> Can you expand on how the ordering containing the clustering
>>>>> expressions would ensure the global sort? Having an RangePartitioning 
>>>>> would
>>>>> certainly satisfy, but it isn't required - is the suggestion that if Spark
>>>>> sees this overlap, then it plans a global sort?
>>>>>
>>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>>>> russell.spit...@gmail.com> wrote:
>>>>>
>>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to
>>>>>> get more detailed statistics. Like on read we could be using sketch data
>>>>>> structures to get estimates on unique values and density for each column.
>>>>>> You may be right that the real way for this to be handled would be 
>>>>>> giving a
>>>>>> "cost" back to a higher order optimizer which can decide which method to
>>>>>> use rather than having the data source itself do it. This is probably in 
>>>>>> a
>>>>>> far future version of the api.
>>>>>>
>>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:
>>>>>>
>>>>>>> Cassandra can insert records with the same partition-key 

Re: DataSourceV2 write input requirements

2018-04-01 Thread Patrick Woody
Yep, that sounds reasonable to me!

On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu  wrote:

> +1
>
>  Original message 
> From: Ryan Blue 
> Date: 3/30/18 2:28 PM (GMT-08:00)
> To: Patrick Woody 
> Cc: Russell Spitzer , Wenchen Fan <
> cloud0...@gmail.com>, Ted Yu , Spark Dev List <
> dev@spark.apache.org>
> Subject: Re: DataSourceV2 write input requirements
>
> You're right. A global sort would change the clustering if it had more
> fields than the clustering.
>
> Then what about this: if there is no RequiredClustering, then the sort is
> a global sort. If RequiredClustering is present, then the clustering is
> applied and the sort is a partition-level sort.
>
> That rule would mean that within a partition you always get the sort, but
> an explicit clustering overrides the partitioning a sort might try to
> introduce. Does that sound reasonable?
>
> rb
>
> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody 
> wrote:
>
>> Does that methodology work in this specific case? The ordering must be a
>> subset of the clustering to guarantee they exist in the same partition when
>> doing a global sort I thought. Though I get the gist that if it does
>> satisfy, then there is no reason to not choose the global sort.
>>
>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue  wrote:
>>
>>> > Can you expand on how the ordering containing the clustering
>>> expressions would ensure the global sort?
>>>
>>> The idea was to basically assume that if the clustering can be satisfied
>>> by a global sort, then do the global sort. For example, if the clustering
>>> is Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort
>>> by columns a, b, and c.
>>>
>>> Technically, you could do this with a hash partitioner instead of a
>>> range partitioner and sort within each partition, but that doesn't make
>>> much sense because the partitioning would ensure that each partition has
>>> just one combination of the required clustering columns. Using a hash
>>> partitioner would make it so that the in-partition sort basically ignores
>>> the first few values, so it must be that the intent was a global sort.
>>>
>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody >> > wrote:
>>>
>>>> Right, you could use this to store a global ordering if there is only
>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>> partition ordinal for the source to store would be required.
>>>>
>>>>
>>>> Can you expand on how the ordering containing the clustering
>>>> expressions would ensure the global sort? Having an RangePartitioning would
>>>> certainly satisfy, but it isn't required - is the suggestion that if Spark
>>>> sees this overlap, then it plans a global sort?
>>>>
>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>>> russell.spit...@gmail.com> wrote:
>>>>
>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to
>>>>> get more detailed statistics. Like on read we could be using sketch data
>>>>> structures to get estimates on unique values and density for each column.
>>>>> You may be right that the real way for this to be handled would be giving 
>>>>> a
>>>>> "cost" back to a higher order optimizer which can decide which method to
>>>>> use rather than having the data source itself do it. This is probably in a
>>>>> far future version of the api.
>>>>>
>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:
>>>>>
>>>>>> Cassandra can insert records with the same partition-key faster if
>>>>>> they arrive in the same payload. But this is only beneficial if the
>>>>>> incoming dataset has multiple entries for the same partition key.
>>>>>>
>>>>>> Thanks for the example, the recommended partitioning use case makes
>>>>>> more sense now. I think we could have two interfaces, a
>>>>>> RequiresClustering and a RecommendsClustering if we want to support
>>>>>> this. But I’m skeptical it will be useful for two reasons:
>>>>>>
>>>&

Re: DataSourceV2 write input requirements

2018-03-30 Thread Ted Yu
+1
 Original message From: Ryan Blue  Date: 
3/30/18  2:28 PM  (GMT-08:00) To: Patrick Woody  Cc: 
Russell Spitzer , Wenchen Fan , 
Ted Yu , Spark Dev List  Subject: 
Re: DataSourceV2 write input requirements 
You're right. A global sort would change the clustering if it had more fields 
than the clustering.
Then what about this: if there is no RequiredClustering, then the sort is a 
global sort. If RequiredClustering is present, then the clustering is applied 
and the sort is a partition-level sort.
That rule would mean that within a partition you always get the sort, but an 
explicit clustering overrides the partitioning a sort might try to introduce. 
Does that sound reasonable?
rb
On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody  
wrote:
Does that methodology work in this specific case? The ordering must be a subset 
of the clustering to guarantee they exist in the same partition when doing a 
global sort I thought. Though I get the gist that if it does satisfy, then 
there is no reason to not choose the global sort.

On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue  wrote:
> Can you expand on how the ordering containing the clustering expressions 
>would ensure the global sort?
The idea was to basically assume that if the clustering can be satisfied by a 
global sort, then do the global sort. For example, if the clustering is 
Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort by 
columns a, b, and c.
Technically, you could do this with a hash partitioner instead of a range 
partitioner and sort within each partition, but that doesn't make much sense 
because the partitioning would ensure that each partition has just one 
combination of the required clustering columns. Using a hash partitioner would 
make it so that the in-partition sort basically ignores the first few values, 
so it must be that the intent was a global sort.
On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody  wrote:
Right, you could use this to store a global ordering if there is only 
one write (e.g., CTAS). I don’t think anything needs to change in that 
case, you would still have a clustering and an ordering, but the 
ordering would need to include all fields of the clustering. A way to 
pass in the partition ordinal for the source to store would be required.
Can you expand on how the ordering containing the clustering expressions would 
ensure the global sort? Having an RangePartitioning would certainly satisfy, 
but it isn't required - is the suggestion that if Spark sees this overlap, then 
it plans a global sort?

On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer  
wrote:
@RyanBlue I'm hoping that through the CBO effort we will continue to get more 
detailed statistics. Like on read we could be using sketch data structures to 
get estimates on unique values and density for each column. You may be right 
that the real way for this to be handled would be giving a "cost" back to a 
higher order optimizer which can decide which method to use rather than having 
the data source itself do it. This is probably in a far future version of the 
api.

On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:

Cassandra can insert records with the same partition-key faster if they arrive 
in the same payload. But this is only beneficial if the incoming dataset has 
multiple entries for the same partition key.

Thanks for the example, the recommended partitioning use case makes more sense 
now. I think we could have two interfaces, a RequiresClustering and a 
RecommendsClustering if we want to support this. But I’m skeptical it will be 
useful for two reasons:

Do we want to optimize the low cardinality case? Shuffles are usually much 
cheaper at smaller sizes, so I’m not sure it is necessary to optimize this away.
How do we know there isn’t just a few partition keys for all the records? It 
may look like a shuffle wouldn’t help, but we don’t know the partition keys 
until it is too late.

Then there’s also the logic for avoiding the shuffle and how to calculate the 
cost, which sounds like something that needs some details from CBO.

I would assume that given the estimated data size from Spark and options passed 
in from the user, the data source could make a more intelligent requirement on 
the write format than Spark independently. 

This is a good point.
What would an implementation actually do here and how would information be 
passed? For my use cases, the store would produce the number of tasks based on 
the estimated incoming rows, because the source has the best idea of how the 
rows will compress. But, that’s just applying a multiplier most of the time. To 
be very useful, this would have to handle skew in the rows (think row with a 
type where total size depends on type) and that’s a bit harder. I think maybe 
an interface that can provide relative cost estimates based on partition keys 
would be helpful, but then keep the 

Re: DataSourceV2 write input requirements

2018-03-30 Thread Ryan Blue
You're right. A global sort would change the clustering if it had more
fields than the clustering.

Then what about this: if there is no RequiredClustering, then the sort is a
global sort. If RequiredClustering is present, then the clustering is
applied and the sort is a partition-level sort.

That rule would mean that within a partition you always get the sort, but
an explicit clustering overrides the partitioning a sort might try to
introduce. Does that sound reasonable?

rb

On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody 
wrote:

> Does that methodology work in this specific case? The ordering must be a
> subset of the clustering to guarantee they exist in the same partition when
> doing a global sort I thought. Though I get the gist that if it does
> satisfy, then there is no reason to not choose the global sort.
>
> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue  wrote:
>
>> > Can you expand on how the ordering containing the clustering
>> expressions would ensure the global sort?
>>
>> The idea was to basically assume that if the clustering can be satisfied
>> by a global sort, then do the global sort. For example, if the clustering
>> is Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort
>> by columns a, b, and c.
>>
>> Technically, you could do this with a hash partitioner instead of a range
>> partitioner and sort within each partition, but that doesn't make much
>> sense because the partitioning would ensure that each partition has just
>> one combination of the required clustering columns. Using a hash
>> partitioner would make it so that the in-partition sort basically ignores
>> the first few values, so it must be that the intent was a global sort.
>>
>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody 
>> wrote:
>>
>>> Right, you could use this to store a global ordering if there is only
 one write (e.g., CTAS). I don’t think anything needs to change in that
 case, you would still have a clustering and an ordering, but the ordering
 would need to include all fields of the clustering. A way to pass in the
 partition ordinal for the source to store would be required.
>>>
>>>
>>> Can you expand on how the ordering containing the clustering expressions
>>> would ensure the global sort? Having an RangePartitioning would certainly
>>> satisfy, but it isn't required - is the suggestion that if Spark sees this
>>> overlap, then it plans a global sort?
>>>
>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>> russell.spit...@gmail.com> wrote:
>>>
 @RyanBlue I'm hoping that through the CBO effort we will continue to
 get more detailed statistics. Like on read we could be using sketch data
 structures to get estimates on unique values and density for each column.
 You may be right that the real way for this to be handled would be giving a
 "cost" back to a higher order optimizer which can decide which method to
 use rather than having the data source itself do it. This is probably in a
 far future version of the api.

 On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:

> Cassandra can insert records with the same partition-key faster if
> they arrive in the same payload. But this is only beneficial if the
> incoming dataset has multiple entries for the same partition key.
>
> Thanks for the example, the recommended partitioning use case makes
> more sense now. I think we could have two interfaces, a
> RequiresClustering and a RecommendsClustering if we want to support
> this. But I’m skeptical it will be useful for two reasons:
>
>- Do we want to optimize the low cardinality case? Shuffles are
>usually much cheaper at smaller sizes, so I’m not sure it is necessary 
> to
>optimize this away.
>- How do we know there isn’t just a few partition keys for all the
>records? It may look like a shuffle wouldn’t help, but we don’t know 
> the
>partition keys until it is too late.
>
> Then there’s also the logic for avoiding the shuffle and how to
> calculate the cost, which sounds like something that needs some details
> from CBO.
>
> I would assume that given the estimated data size from Spark and
> options passed in from the user, the data source could make a more
> intelligent requirement on the write format than Spark independently.
>
> This is a good point.
>
> What would an implementation actually do here and how would
> information be passed? For my use cases, the store would produce the 
> number
> of tasks based on the estimated incoming rows, because the source has the
> best idea of how the rows will compress. But, that’s just applying a
> multiplier most of the time. To be very useful, this would have to handle
> skew in the rows (think row with a type where total size depends on type)
> and that’s a bit harder. I think maybe an interface that can provide
>

Re: DataSourceV2 write input requirements

2018-03-30 Thread Patrick Woody
Does that methodology work in this specific case? The ordering must be a
subset of the clustering to guarantee they exist in the same partition when
doing a global sort I thought. Though I get the gist that if it does
satisfy, then there is no reason to not choose the global sort.

On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue  wrote:

> > Can you expand on how the ordering containing the clustering expressions
> would ensure the global sort?
>
> The idea was to basically assume that if the clustering can be satisfied
> by a global sort, then do the global sort. For example, if the clustering
> is Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort
> by columns a, b, and c.
>
> Technically, you could do this with a hash partitioner instead of a range
> partitioner and sort within each partition, but that doesn't make much
> sense because the partitioning would ensure that each partition has just
> one combination of the required clustering columns. Using a hash
> partitioner would make it so that the in-partition sort basically ignores
> the first few values, so it must be that the intent was a global sort.
>
> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody 
> wrote:
>
>> Right, you could use this to store a global ordering if there is only one
>>> write (e.g., CTAS). I don’t think anything needs to change in that case,
>>> you would still have a clustering and an ordering, but the ordering would
>>> need to include all fields of the clustering. A way to pass in the
>>> partition ordinal for the source to store would be required.
>>
>>
>> Can you expand on how the ordering containing the clustering expressions
>> would ensure the global sort? Having an RangePartitioning would certainly
>> satisfy, but it isn't required - is the suggestion that if Spark sees this
>> overlap, then it plans a global sort?
>>
>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>> russell.spit...@gmail.com> wrote:
>>
>>> @RyanBlue I'm hoping that through the CBO effort we will continue to get
>>> more detailed statistics. Like on read we could be using sketch data
>>> structures to get estimates on unique values and density for each column.
>>> You may be right that the real way for this to be handled would be giving a
>>> "cost" back to a higher order optimizer which can decide which method to
>>> use rather than having the data source itself do it. This is probably in a
>>> far future version of the api.
>>>
>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:
>>>
 Cassandra can insert records with the same partition-key faster if they
 arrive in the same payload. But this is only beneficial if the incoming
 dataset has multiple entries for the same partition key.

 Thanks for the example, the recommended partitioning use case makes
 more sense now. I think we could have two interfaces, a
 RequiresClustering and a RecommendsClustering if we want to support
 this. But I’m skeptical it will be useful for two reasons:

- Do we want to optimize the low cardinality case? Shuffles are
usually much cheaper at smaller sizes, so I’m not sure it is necessary 
 to
optimize this away.
- How do we know there isn’t just a few partition keys for all the
records? It may look like a shuffle wouldn’t help, but we don’t know the
partition keys until it is too late.

 Then there’s also the logic for avoiding the shuffle and how to
 calculate the cost, which sounds like something that needs some details
 from CBO.

 I would assume that given the estimated data size from Spark and
 options passed in from the user, the data source could make a more
 intelligent requirement on the write format than Spark independently.

 This is a good point.

 What would an implementation actually do here and how would information
 be passed? For my use cases, the store would produce the number of tasks
 based on the estimated incoming rows, because the source has the best idea
 of how the rows will compress. But, that’s just applying a multiplier most
 of the time. To be very useful, this would have to handle skew in the rows
 (think row with a type where total size depends on type) and that’s a bit
 harder. I think maybe an interface that can provide relative cost estimates
 based on partition keys would be helpful, but then keep the planning logic
 in Spark.

 This is probably something that we could add later as we find use cases
 that require it?

 I wouldn’t assume that a data source requiring a certain write format
 would give any guarantees around reading the same data? In the cases where
 it is a complete overwrite it would, but for independent writes it could
 still be useful for statistics or compression.

 Right, you could use this to store a global ordering if there is only
 one write (e.g., CTAS). I don’t think anything nee

Re: DataSourceV2 write input requirements

2018-03-30 Thread Ryan Blue
> Can you expand on how the ordering containing the clustering expressions
would ensure the global sort?

The idea was to basically assume that if the clustering can be satisfied by
a global sort, then do the global sort. For example, if the clustering is
Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort by
columns a, b, and c.

Technically, you could do this with a hash partitioner instead of a range
partitioner and sort within each partition, but that doesn't make much
sense because the partitioning would ensure that each partition has just
one combination of the required clustering columns. Using a hash
partitioner would make it so that the in-partition sort basically ignores
the first few values, so it must be that the intent was a global sort.

On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody 
wrote:

> Right, you could use this to store a global ordering if there is only one
>> write (e.g., CTAS). I don’t think anything needs to change in that case,
>> you would still have a clustering and an ordering, but the ordering would
>> need to include all fields of the clustering. A way to pass in the
>> partition ordinal for the source to store would be required.
>
>
> Can you expand on how the ordering containing the clustering expressions
> would ensure the global sort? Having an RangePartitioning would certainly
> satisfy, but it isn't required - is the suggestion that if Spark sees this
> overlap, then it plans a global sort?
>
> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> @RyanBlue I'm hoping that through the CBO effort we will continue to get
>> more detailed statistics. Like on read we could be using sketch data
>> structures to get estimates on unique values and density for each column.
>> You may be right that the real way for this to be handled would be giving a
>> "cost" back to a higher order optimizer which can decide which method to
>> use rather than having the data source itself do it. This is probably in a
>> far future version of the api.
>>
>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:
>>
>>> Cassandra can insert records with the same partition-key faster if they
>>> arrive in the same payload. But this is only beneficial if the incoming
>>> dataset has multiple entries for the same partition key.
>>>
>>> Thanks for the example, the recommended partitioning use case makes more
>>> sense now. I think we could have two interfaces, a RequiresClustering
>>> and a RecommendsClustering if we want to support this. But I’m
>>> skeptical it will be useful for two reasons:
>>>
>>>- Do we want to optimize the low cardinality case? Shuffles are
>>>usually much cheaper at smaller sizes, so I’m not sure it is necessary to
>>>optimize this away.
>>>- How do we know there isn’t just a few partition keys for all the
>>>records? It may look like a shuffle wouldn’t help, but we don’t know the
>>>partition keys until it is too late.
>>>
>>> Then there’s also the logic for avoiding the shuffle and how to
>>> calculate the cost, which sounds like something that needs some details
>>> from CBO.
>>>
>>> I would assume that given the estimated data size from Spark and options
>>> passed in from the user, the data source could make a more intelligent
>>> requirement on the write format than Spark independently.
>>>
>>> This is a good point.
>>>
>>> What would an implementation actually do here and how would information
>>> be passed? For my use cases, the store would produce the number of tasks
>>> based on the estimated incoming rows, because the source has the best idea
>>> of how the rows will compress. But, that’s just applying a multiplier most
>>> of the time. To be very useful, this would have to handle skew in the rows
>>> (think row with a type where total size depends on type) and that’s a bit
>>> harder. I think maybe an interface that can provide relative cost estimates
>>> based on partition keys would be helpful, but then keep the planning logic
>>> in Spark.
>>>
>>> This is probably something that we could add later as we find use cases
>>> that require it?
>>>
>>> I wouldn’t assume that a data source requiring a certain write format
>>> would give any guarantees around reading the same data? In the cases where
>>> it is a complete overwrite it would, but for independent writes it could
>>> still be useful for statistics or compression.
>>>
>>> Right, you could use this to store a global ordering if there is only
>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>> case, you would still have a clustering and an ordering, but the ordering
>>> would need to include all fields of the clustering. A way to pass in the
>>> partition ordinal for the source to store would be required.
>>>
>>> For the second point that ordering is useful for statistics and
>>> compression, I completely agree. Our best practices doc tells users to
>>> always add a global sort when writing because you ge

Re: DataSourceV2 write input requirements

2018-03-30 Thread Patrick Woody
>
> Right, you could use this to store a global ordering if there is only one
> write (e.g., CTAS). I don’t think anything needs to change in that case,
> you would still have a clustering and an ordering, but the ordering would
> need to include all fields of the clustering. A way to pass in the
> partition ordinal for the source to store would be required.


Can you expand on how the ordering containing the clustering expressions
would ensure the global sort? Having an RangePartitioning would certainly
satisfy, but it isn't required - is the suggestion that if Spark sees this
overlap, then it plans a global sort?

On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer  wrote:

> @RyanBlue I'm hoping that through the CBO effort we will continue to get
> more detailed statistics. Like on read we could be using sketch data
> structures to get estimates on unique values and density for each column.
> You may be right that the real way for this to be handled would be giving a
> "cost" back to a higher order optimizer which can decide which method to
> use rather than having the data source itself do it. This is probably in a
> far future version of the api.
>
> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:
>
>> Cassandra can insert records with the same partition-key faster if they
>> arrive in the same payload. But this is only beneficial if the incoming
>> dataset has multiple entries for the same partition key.
>>
>> Thanks for the example, the recommended partitioning use case makes more
>> sense now. I think we could have two interfaces, a RequiresClustering
>> and a RecommendsClustering if we want to support this. But I’m skeptical
>> it will be useful for two reasons:
>>
>>- Do we want to optimize the low cardinality case? Shuffles are
>>usually much cheaper at smaller sizes, so I’m not sure it is necessary to
>>optimize this away.
>>- How do we know there isn’t just a few partition keys for all the
>>records? It may look like a shuffle wouldn’t help, but we don’t know the
>>partition keys until it is too late.
>>
>> Then there’s also the logic for avoiding the shuffle and how to calculate
>> the cost, which sounds like something that needs some details from CBO.
>>
>> I would assume that given the estimated data size from Spark and options
>> passed in from the user, the data source could make a more intelligent
>> requirement on the write format than Spark independently.
>>
>> This is a good point.
>>
>> What would an implementation actually do here and how would information
>> be passed? For my use cases, the store would produce the number of tasks
>> based on the estimated incoming rows, because the source has the best idea
>> of how the rows will compress. But, that’s just applying a multiplier most
>> of the time. To be very useful, this would have to handle skew in the rows
>> (think row with a type where total size depends on type) and that’s a bit
>> harder. I think maybe an interface that can provide relative cost estimates
>> based on partition keys would be helpful, but then keep the planning logic
>> in Spark.
>>
>> This is probably something that we could add later as we find use cases
>> that require it?
>>
>> I wouldn’t assume that a data source requiring a certain write format
>> would give any guarantees around reading the same data? In the cases where
>> it is a complete overwrite it would, but for independent writes it could
>> still be useful for statistics or compression.
>>
>> Right, you could use this to store a global ordering if there is only one
>> write (e.g., CTAS). I don’t think anything needs to change in that case,
>> you would still have a clustering and an ordering, but the ordering would
>> need to include all fields of the clustering. A way to pass in the
>> partition ordinal for the source to store would be required.
>>
>> For the second point that ordering is useful for statistics and
>> compression, I completely agree. Our best practices doc tells users to
>> always add a global sort when writing because you get the benefit of a
>> range partitioner to handle skew, plus the stats and compression you’re
>> talking about to optimize for reads. I think the proposed API can request a
>> global ordering from Spark already. My only point is that there isn’t much
>> the source can do to guarantee ordering for reads when there is more than
>> one write.
>> ​
>>
>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody 
>> wrote:
>>
>>> Spark would always apply the required clustering and sort order because
 they are required by the data source. It is reasonable for a source to
 reject data that isn’t properly prepared. For example, data must be written
 to HTable files with keys in order or else the files are invalid. Sorting
 should not be implemented in the sources themselves because Spark handles
 concerns like spilling to disk. Spark must prepare data correctly, which is
 why the interfaces start with “Requires”.
>>>
>>>
>>> This

Re: DataSourceV2 write input requirements

2018-03-29 Thread Russell Spitzer
@RyanBlue I'm hoping that through the CBO effort we will continue to get
more detailed statistics. Like on read we could be using sketch data
structures to get estimates on unique values and density for each column.
You may be right that the real way for this to be handled would be giving a
"cost" back to a higher order optimizer which can decide which method to
use rather than having the data source itself do it. This is probably in a
far future version of the api.

On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue  wrote:

> Cassandra can insert records with the same partition-key faster if they
> arrive in the same payload. But this is only beneficial if the incoming
> dataset has multiple entries for the same partition key.
>
> Thanks for the example, the recommended partitioning use case makes more
> sense now. I think we could have two interfaces, a RequiresClustering and
> a RecommendsClustering if we want to support this. But I’m skeptical it
> will be useful for two reasons:
>
>- Do we want to optimize the low cardinality case? Shuffles are
>usually much cheaper at smaller sizes, so I’m not sure it is necessary to
>optimize this away.
>- How do we know there isn’t just a few partition keys for all the
>records? It may look like a shuffle wouldn’t help, but we don’t know the
>partition keys until it is too late.
>
> Then there’s also the logic for avoiding the shuffle and how to calculate
> the cost, which sounds like something that needs some details from CBO.
>
> I would assume that given the estimated data size from Spark and options
> passed in from the user, the data source could make a more intelligent
> requirement on the write format than Spark independently.
>
> This is a good point.
>
> What would an implementation actually do here and how would information be
> passed? For my use cases, the store would produce the number of tasks based
> on the estimated incoming rows, because the source has the best idea of how
> the rows will compress. But, that’s just applying a multiplier most of the
> time. To be very useful, this would have to handle skew in the rows (think
> row with a type where total size depends on type) and that’s a bit harder.
> I think maybe an interface that can provide relative cost estimates based
> on partition keys would be helpful, but then keep the planning logic in
> Spark.
>
> This is probably something that we could add later as we find use cases
> that require it?
>
> I wouldn’t assume that a data source requiring a certain write format
> would give any guarantees around reading the same data? In the cases where
> it is a complete overwrite it would, but for independent writes it could
> still be useful for statistics or compression.
>
> Right, you could use this to store a global ordering if there is only one
> write (e.g., CTAS). I don’t think anything needs to change in that case,
> you would still have a clustering and an ordering, but the ordering would
> need to include all fields of the clustering. A way to pass in the
> partition ordinal for the source to store would be required.
>
> For the second point that ordering is useful for statistics and
> compression, I completely agree. Our best practices doc tells users to
> always add a global sort when writing because you get the benefit of a
> range partitioner to handle skew, plus the stats and compression you’re
> talking about to optimize for reads. I think the proposed API can request a
> global ordering from Spark already. My only point is that there isn’t much
> the source can do to guarantee ordering for reads when there is more than
> one write.
> ​
>
> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody 
> wrote:
>
>> Spark would always apply the required clustering and sort order because
>>> they are required by the data source. It is reasonable for a source to
>>> reject data that isn’t properly prepared. For example, data must be written
>>> to HTable files with keys in order or else the files are invalid. Sorting
>>> should not be implemented in the sources themselves because Spark handles
>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>> why the interfaces start with “Requires”.
>>
>>
>> This was in reference to Russell's suggestion that the data source could
>> have a required sort, but only a recommended partitioning. I don't have an
>> immediate recommending use case that would come to mind though. I'm
>> definitely in sync that the data source itself shouldn't do work outside of
>> the writes themselves.
>>
>> Considering the second use case you mentioned first, I don’t think it is
>>> a good idea for a table to put requirements on the number of tasks used for
>>> a write. The parallelism should be set appropriately for the data volume,
>>> which is for Spark or the user to determine. A minimum or maximum number of
>>> tasks could cause bad behavior.
>>
>>
>> For your first use case, an explicit global ordering, the problem is that
>>> there can’

Re: DataSourceV2 write input requirements

2018-03-29 Thread Ryan Blue
Cassandra can insert records with the same partition-key faster if they
arrive in the same payload. But this is only beneficial if the incoming
dataset has multiple entries for the same partition key.

Thanks for the example, the recommended partitioning use case makes more
sense now. I think we could have two interfaces, a RequiresClustering and a
RecommendsClustering if we want to support this. But I’m skeptical it will
be useful for two reasons:

   - Do we want to optimize the low cardinality case? Shuffles are usually
   much cheaper at smaller sizes, so I’m not sure it is necessary to optimize
   this away.
   - How do we know there isn’t just a few partition keys for all the
   records? It may look like a shuffle wouldn’t help, but we don’t know the
   partition keys until it is too late.

Then there’s also the logic for avoiding the shuffle and how to calculate
the cost, which sounds like something that needs some details from CBO.

I would assume that given the estimated data size from Spark and options
passed in from the user, the data source could make a more intelligent
requirement on the write format than Spark independently.

This is a good point.

What would an implementation actually do here and how would information be
passed? For my use cases, the store would produce the number of tasks based
on the estimated incoming rows, because the source has the best idea of how
the rows will compress. But, that’s just applying a multiplier most of the
time. To be very useful, this would have to handle skew in the rows (think
row with a type where total size depends on type) and that’s a bit harder.
I think maybe an interface that can provide relative cost estimates based
on partition keys would be helpful, but then keep the planning logic in
Spark.

This is probably something that we could add later as we find use cases
that require it?

I wouldn’t assume that a data source requiring a certain write format would
give any guarantees around reading the same data? In the cases where it is
a complete overwrite it would, but for independent writes it could still be
useful for statistics or compression.

Right, you could use this to store a global ordering if there is only one
write (e.g., CTAS). I don’t think anything needs to change in that case,
you would still have a clustering and an ordering, but the ordering would
need to include all fields of the clustering. A way to pass in the
partition ordinal for the source to store would be required.

For the second point that ordering is useful for statistics and
compression, I completely agree. Our best practices doc tells users to
always add a global sort when writing because you get the benefit of a
range partitioner to handle skew, plus the stats and compression you’re
talking about to optimize for reads. I think the proposed API can request a
global ordering from Spark already. My only point is that there isn’t much
the source can do to guarantee ordering for reads when there is more than
one write.
​

On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody 
wrote:

> Spark would always apply the required clustering and sort order because
>> they are required by the data source. It is reasonable for a source to
>> reject data that isn’t properly prepared. For example, data must be written
>> to HTable files with keys in order or else the files are invalid. Sorting
>> should not be implemented in the sources themselves because Spark handles
>> concerns like spilling to disk. Spark must prepare data correctly, which is
>> why the interfaces start with “Requires”.
>
>
> This was in reference to Russell's suggestion that the data source could
> have a required sort, but only a recommended partitioning. I don't have an
> immediate recommending use case that would come to mind though. I'm
> definitely in sync that the data source itself shouldn't do work outside of
> the writes themselves.
>
> Considering the second use case you mentioned first, I don’t think it is a
>> good idea for a table to put requirements on the number of tasks used for a
>> write. The parallelism should be set appropriately for the data volume,
>> which is for Spark or the user to determine. A minimum or maximum number of
>> tasks could cause bad behavior.
>
>
> For your first use case, an explicit global ordering, the problem is that
>> there can’t be an explicit global ordering for a table when it is populated
>> by a series of independent writes. Each write could have a global order,
>> but once those files are written, you have to deal with multiple sorted
>> data sets. I think it makes sense to focus on order within data files, not
>> order between data files.
>
>
> This is where I'm interested in learning about the separation of
> responsibilities for the data source and how "smart" it is supposed to be.
>
> For the first part, I would assume that given the estimated data size from
> Spark and options passed in from the user, the data source could make a
> more intelligent requirement on 

Re: DataSourceV2 write input requirements

2018-03-28 Thread Russell Spitzer
Ah yeah sorry I got a bit mixed up.

On Wed, Mar 28, 2018 at 7:54 PM Ted Yu  wrote:

> bq. this shuffle could outweigh the benefits of the organized data if the
> cardinality is lower.
>
> I wonder if you meant higher in place of the last word above.
>
> Cheers
>
> On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> For added color, one thing that I may want to consider as a data source
>> implementer is the cost / benefit of applying a particular clustering. For
>> example, a dataset with low cardinality in the clustering key could benefit
>> greatly from clustering on that key before writing to Cassandra since
>> Cassandra can benefit from these sorts of batching. But the cost of
>> performing this shuffle could outweigh the benefits of the organized data
>> if the cardinality is lower.
>>
>> I imagine other sources might have similar benefit calculations. Doing a
>> particular sort or clustering can provide increased throughput but only in
>> certain situations based on some facts about the data.
>>
>>
>> For a concrete example here.
>>
>> Cassandra can insert records with the same partition-key faster if they
>> arrive in the same payload. But this is only beneficial if the incoming
>> dataset has multiple entries for the same partition key. If the incoming
>> source does not have any duplicates then there is no benefit to requiring a
>> sort or partitioning.
>>
>> On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody 
>> wrote:
>>
>>> Spark would always apply the required clustering and sort order because
 they are required by the data source. It is reasonable for a source to
 reject data that isn’t properly prepared. For example, data must be written
 to HTable files with keys in order or else the files are invalid. Sorting
 should not be implemented in the sources themselves because Spark handles
 concerns like spilling to disk. Spark must prepare data correctly, which is
 why the interfaces start with “Requires”.
>>>
>>>
>>> This was in reference to Russell's suggestion that the data source could
>>> have a required sort, but only a recommended partitioning. I don't have an
>>> immediate recommending use case that would come to mind though. I'm
>>> definitely in sync that the data source itself shouldn't do work outside of
>>> the writes themselves.
>>>
>>>
>>> Considering the second use case you mentioned first, I don’t think it is
 a good idea for a table to put requirements on the number of tasks used for
 a write. The parallelism should be set appropriately for the data volume,
 which is for Spark or the user to determine. A minimum or maximum number of
 tasks could cause bad behavior.
>>>
>>> For your first use case, an explicit global ordering, the problem is
 that there can’t be an explicit global ordering for a table when it is
 populated by a series of independent writes. Each write could have a global
 order, but once those files are written, you have to deal with multiple
 sorted data sets. I think it makes sense to focus on order within data
 files, not order between data files.
>>>
>>>
>>> This is where I'm interested in learning about the separation of
>>> responsibilities for the data source and how "smart" it is supposed to be.
>>>
>>> For the first part, I would assume that given the estimated data size
>>> from Spark and options passed in from the user, the data source could make
>>> a more intelligent requirement on the write format than Spark
>>> independently. Somewhat analogous to how the current FileSource does bin
>>> packing of small files on the read side, restricting parallelism for the
>>> sake of overhead.
>>>
>>> For the second, I wouldn't assume that a data source requiring a certain
>>> write format would give any guarantees around reading the same data? In the
>>> cases where it is a complete overwrite it would, but for independent writes
>>> it could still be useful for statistics or compression.
>>>
>>> Thanks
>>> Pat
>>>
>>>
>>>
>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:
>>>
 How would Spark determine whether or not to apply a recommendation - a
 cost threshold?

 Spark would always apply the required clustering and sort order because
 they are required by the data source. It is reasonable for a source to
 reject data that isn’t properly prepared. For example, data must be written
 to HTable files with keys in order or else the files are invalid. Sorting
 should not be implemented in the sources themselves because Spark handles
 concerns like spilling to disk. Spark must prepare data correctly, which is
 why the interfaces start with “Requires”.

 I’m not sure what the second half of your question means. What does
 Spark need to pass into the data source?

 Should a datasource be able to provide a Distribution proper rather
 than just the clustering expressions? Two use cases would be for explicit

Re: DataSourceV2 write input requirements

2018-03-28 Thread Ted Yu
bq. this shuffle could outweigh the benefits of the organized data if the
cardinality is lower.

I wonder if you meant higher in place of the last word above.

Cheers

On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer 
wrote:

> For added color, one thing that I may want to consider as a data source
> implementer is the cost / benefit of applying a particular clustering. For
> example, a dataset with low cardinality in the clustering key could benefit
> greatly from clustering on that key before writing to Cassandra since
> Cassandra can benefit from these sorts of batching. But the cost of
> performing this shuffle could outweigh the benefits of the organized data
> if the cardinality is lower.
>
> I imagine other sources might have similar benefit calculations. Doing a
> particular sort or clustering can provide increased throughput but only in
> certain situations based on some facts about the data.
>
>
> For a concrete example here.
>
> Cassandra can insert records with the same partition-key faster if they
> arrive in the same payload. But this is only beneficial if the incoming
> dataset has multiple entries for the same partition key. If the incoming
> source does not have any duplicates then there is no benefit to requiring a
> sort or partitioning.
>
> On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody 
> wrote:
>
>> Spark would always apply the required clustering and sort order because
>>> they are required by the data source. It is reasonable for a source to
>>> reject data that isn’t properly prepared. For example, data must be written
>>> to HTable files with keys in order or else the files are invalid. Sorting
>>> should not be implemented in the sources themselves because Spark handles
>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>> why the interfaces start with “Requires”.
>>
>>
>> This was in reference to Russell's suggestion that the data source could
>> have a required sort, but only a recommended partitioning. I don't have an
>> immediate recommending use case that would come to mind though. I'm
>> definitely in sync that the data source itself shouldn't do work outside of
>> the writes themselves.
>>
>>
>> Considering the second use case you mentioned first, I don’t think it is
>>> a good idea for a table to put requirements on the number of tasks used for
>>> a write. The parallelism should be set appropriately for the data volume,
>>> which is for Spark or the user to determine. A minimum or maximum number of
>>> tasks could cause bad behavior.
>>
>> For your first use case, an explicit global ordering, the problem is that
>>> there can’t be an explicit global ordering for a table when it is populated
>>> by a series of independent writes. Each write could have a global order,
>>> but once those files are written, you have to deal with multiple sorted
>>> data sets. I think it makes sense to focus on order within data files, not
>>> order between data files.
>>
>>
>> This is where I'm interested in learning about the separation of
>> responsibilities for the data source and how "smart" it is supposed to be.
>>
>> For the first part, I would assume that given the estimated data size
>> from Spark and options passed in from the user, the data source could make
>> a more intelligent requirement on the write format than Spark
>> independently. Somewhat analogous to how the current FileSource does bin
>> packing of small files on the read side, restricting parallelism for the
>> sake of overhead.
>>
>> For the second, I wouldn't assume that a data source requiring a certain
>> write format would give any guarantees around reading the same data? In the
>> cases where it is a complete overwrite it would, but for independent writes
>> it could still be useful for statistics or compression.
>>
>> Thanks
>> Pat
>>
>>
>>
>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:
>>
>>> How would Spark determine whether or not to apply a recommendation - a
>>> cost threshold?
>>>
>>> Spark would always apply the required clustering and sort order because
>>> they are required by the data source. It is reasonable for a source to
>>> reject data that isn’t properly prepared. For example, data must be written
>>> to HTable files with keys in order or else the files are invalid. Sorting
>>> should not be implemented in the sources themselves because Spark handles
>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>> why the interfaces start with “Requires”.
>>>
>>> I’m not sure what the second half of your question means. What does
>>> Spark need to pass into the data source?
>>>
>>> Should a datasource be able to provide a Distribution proper rather than
>>> just the clustering expressions? Two use cases would be for explicit global
>>> sorting of the dataset and attempting to ensure a minimum write task
>>> size/number of write tasks.
>>>
>>> Considering the second use case you mentioned first, I don’t think it is
>>> a good idea for a table 

Re: DataSourceV2 write input requirements

2018-03-28 Thread Russell Spitzer
For added color, one thing that I may want to consider as a data source
implementer is the cost / benefit of applying a particular clustering. For
example, a dataset with low cardinality in the clustering key could benefit
greatly from clustering on that key before writing to Cassandra since
Cassandra can benefit from these sorts of batching. But the cost of
performing this shuffle could outweigh the benefits of the organized data
if the cardinality is lower.

I imagine other sources might have similar benefit calculations. Doing a
particular sort or clustering can provide increased throughput but only in
certain situations based on some facts about the data.


For a concrete example here.

Cassandra can insert records with the same partition-key faster if they
arrive in the same payload. But this is only beneficial if the incoming
dataset has multiple entries for the same partition key. If the incoming
source does not have any duplicates then there is no benefit to requiring a
sort or partitioning.

On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody 
wrote:

> Spark would always apply the required clustering and sort order because
>> they are required by the data source. It is reasonable for a source to
>> reject data that isn’t properly prepared. For example, data must be written
>> to HTable files with keys in order or else the files are invalid. Sorting
>> should not be implemented in the sources themselves because Spark handles
>> concerns like spilling to disk. Spark must prepare data correctly, which is
>> why the interfaces start with “Requires”.
>
>
> This was in reference to Russell's suggestion that the data source could
> have a required sort, but only a recommended partitioning. I don't have an
> immediate recommending use case that would come to mind though. I'm
> definitely in sync that the data source itself shouldn't do work outside of
> the writes themselves.
>
>
> Considering the second use case you mentioned first, I don’t think it is a
>> good idea for a table to put requirements on the number of tasks used for a
>> write. The parallelism should be set appropriately for the data volume,
>> which is for Spark or the user to determine. A minimum or maximum number of
>> tasks could cause bad behavior.
>
> For your first use case, an explicit global ordering, the problem is that
>> there can’t be an explicit global ordering for a table when it is populated
>> by a series of independent writes. Each write could have a global order,
>> but once those files are written, you have to deal with multiple sorted
>> data sets. I think it makes sense to focus on order within data files, not
>> order between data files.
>
>
> This is where I'm interested in learning about the separation of
> responsibilities for the data source and how "smart" it is supposed to be.
>
> For the first part, I would assume that given the estimated data size from
> Spark and options passed in from the user, the data source could make a
> more intelligent requirement on the write format than Spark independently.
> Somewhat analogous to how the current FileSource does bin packing of small
> files on the read side, restricting parallelism for the sake of overhead.
>
> For the second, I wouldn't assume that a data source requiring a certain
> write format would give any guarantees around reading the same data? In the
> cases where it is a complete overwrite it would, but for independent writes
> it could still be useful for statistics or compression.
>
> Thanks
> Pat
>
>
>
> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:
>
>> How would Spark determine whether or not to apply a recommendation - a
>> cost threshold?
>>
>> Spark would always apply the required clustering and sort order because
>> they are required by the data source. It is reasonable for a source to
>> reject data that isn’t properly prepared. For example, data must be written
>> to HTable files with keys in order or else the files are invalid. Sorting
>> should not be implemented in the sources themselves because Spark handles
>> concerns like spilling to disk. Spark must prepare data correctly, which is
>> why the interfaces start with “Requires”.
>>
>> I’m not sure what the second half of your question means. What does Spark
>> need to pass into the data source?
>>
>> Should a datasource be able to provide a Distribution proper rather than
>> just the clustering expressions? Two use cases would be for explicit global
>> sorting of the dataset and attempting to ensure a minimum write task
>> size/number of write tasks.
>>
>> Considering the second use case you mentioned first, I don’t think it is
>> a good idea for a table to put requirements on the number of tasks used for
>> a write. The parallelism should be set appropriately for the data volume,
>> which is for Spark or the user to determine. A minimum or maximum number of
>> tasks could cause bad behavior.
>>
>> That said, I think there is a related use case for sharding. But that’s
>> really just a clust

Re: DataSourceV2 write input requirements

2018-03-28 Thread Patrick Woody
>
> Spark would always apply the required clustering and sort order because
> they are required by the data source. It is reasonable for a source to
> reject data that isn’t properly prepared. For example, data must be written
> to HTable files with keys in order or else the files are invalid. Sorting
> should not be implemented in the sources themselves because Spark handles
> concerns like spilling to disk. Spark must prepare data correctly, which is
> why the interfaces start with “Requires”.


This was in reference to Russell's suggestion that the data source could
have a required sort, but only a recommended partitioning. I don't have an
immediate recommending use case that would come to mind though. I'm
definitely in sync that the data source itself shouldn't do work outside of
the writes themselves.

Considering the second use case you mentioned first, I don’t think it is a
> good idea for a table to put requirements on the number of tasks used for a
> write. The parallelism should be set appropriately for the data volume,
> which is for Spark or the user to determine. A minimum or maximum number of
> tasks could cause bad behavior.


For your first use case, an explicit global ordering, the problem is that
> there can’t be an explicit global ordering for a table when it is populated
> by a series of independent writes. Each write could have a global order,
> but once those files are written, you have to deal with multiple sorted
> data sets. I think it makes sense to focus on order within data files, not
> order between data files.


This is where I'm interested in learning about the separation of
responsibilities for the data source and how "smart" it is supposed to be.

For the first part, I would assume that given the estimated data size from
Spark and options passed in from the user, the data source could make a
more intelligent requirement on the write format than Spark independently.
Somewhat analogous to how the current FileSource does bin packing of small
files on the read side, restricting parallelism for the sake of overhead.

For the second, I wouldn't assume that a data source requiring a certain
write format would give any guarantees around reading the same data? In the
cases where it is a complete overwrite it would, but for independent writes
it could still be useful for statistics or compression.

Thanks
Pat



On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:

> How would Spark determine whether or not to apply a recommendation - a
> cost threshold?
>
> Spark would always apply the required clustering and sort order because
> they are required by the data source. It is reasonable for a source to
> reject data that isn’t properly prepared. For example, data must be written
> to HTable files with keys in order or else the files are invalid. Sorting
> should not be implemented in the sources themselves because Spark handles
> concerns like spilling to disk. Spark must prepare data correctly, which is
> why the interfaces start with “Requires”.
>
> I’m not sure what the second half of your question means. What does Spark
> need to pass into the data source?
>
> Should a datasource be able to provide a Distribution proper rather than
> just the clustering expressions? Two use cases would be for explicit global
> sorting of the dataset and attempting to ensure a minimum write task
> size/number of write tasks.
>
> Considering the second use case you mentioned first, I don’t think it is a
> good idea for a table to put requirements on the number of tasks used for a
> write. The parallelism should be set appropriately for the data volume,
> which is for Spark or the user to determine. A minimum or maximum number of
> tasks could cause bad behavior.
>
> That said, I think there is a related use case for sharding. But that’s
> really just a clustering by an expression with the shard calculation, e.g., 
> hash(id_col,
> 64). The shards should be handled as a cluster, but it doesn’t matter how
> many tasks are used for it.
>
> For your first use case, an explicit global ordering, the problem is that
> there can’t be an explicit global ordering for a table when it is populated
> by a series of independent writes. Each write could have a global order,
> but once those files are written, you have to deal with multiple sorted
> data sets. I think it makes sense to focus on order within data files, not
> order between data files.
> ​
>
> On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody 
> wrote:
>
>> How would Spark determine whether or not to apply a recommendation - a
>> cost threshold? And yes, it would be good to flesh out what information we
>> get from Spark in the datasource when providing these
>> recommendations/requirements - I could see statistics and the existing
>> outputPartitioning/Ordering of the child plan being used for providing the
>> requirement.
>>
>> Should a datasource be able to provide a Distribution proper rather than
>> just the clustering expressions? Two use cases would

Re: DataSourceV2 write input requirements

2018-03-28 Thread Ryan Blue
How would Spark determine whether or not to apply a recommendation - a cost
threshold?

Spark would always apply the required clustering and sort order because
they are required by the data source. It is reasonable for a source to
reject data that isn’t properly prepared. For example, data must be written
to HTable files with keys in order or else the files are invalid. Sorting
should not be implemented in the sources themselves because Spark handles
concerns like spilling to disk. Spark must prepare data correctly, which is
why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark
need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than
just the clustering expressions? Two use cases would be for explicit global
sorting of the dataset and attempting to ensure a minimum write task
size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a
good idea for a table to put requirements on the number of tasks used for a
write. The parallelism should be set appropriately for the data volume,
which is for Spark or the user to determine. A minimum or maximum number of
tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s
really just a clustering by an expression with the shard calculation,
e.g., hash(id_col,
64). The shards should be handled as a cluster, but it doesn’t matter how
many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that
there can’t be an explicit global ordering for a table when it is populated
by a series of independent writes. Each write could have a global order,
but once those files are written, you have to deal with multiple sorted
data sets. I think it makes sense to focus on order within data files, not
order between data files.
​

On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody 
wrote:

> How would Spark determine whether or not to apply a recommendation - a
> cost threshold? And yes, it would be good to flesh out what information we
> get from Spark in the datasource when providing these
> recommendations/requirements - I could see statistics and the existing
> outputPartitioning/Ordering of the child plan being used for providing the
> requirement.
>
> Should a datasource be able to provide a Distribution proper rather than
> just the clustering expressions? Two use cases would be for explicit global
> sorting of the dataset and attempting to ensure a minimum write task
> size/number of write tasks.
>
>
>
> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> Thanks for the clarification, definitely would want to require Sort but
>> only recommend partitioning ...  I think that would be useful to request
>> based on details about the incoming dataset.
>>
>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue  wrote:
>>
>>> A required clustering would not, but a required sort would. Clustering
>>> is asking for the input dataframe's partitioning, and sorting would be how
>>> each partition is sorted.
>>>
>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>>> russell.spit...@gmail.com> wrote:
>>>
 I forgot since it's been a while, but does Clustering support allow
 requesting that partitions contain elements in order as well? That would be
 a useful trick for me. IE
 Request/Require(SortedOn(Col1))
 Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

 On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue 
 wrote:

> Thanks, it makes sense that the existing interface is for aggregation
> and not joins. Why are there requirements for the number of partitions 
> that
> are returned then?
>
> Does it makes sense to design the write-side `Requirement` classes and
> the read-side reporting separately?
>
> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan 
> wrote:
>
>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>> expose hash function, so Join can't benefit from this interface, as Join
>> doesn't require a general ClusteredDistribution, but a more specific one
>> called HashClusteredDistribution.
>>
>> So currently only Aggregate can benefit from
>> SupportsReportPartitioning and save shuffle. We can add a new interface 
>> to
>> expose the hash function to make it work for Join.
>>
>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:
>>
>>> I just took a look at SupportsReportPartitioning and I'm not sure
>>> that it will work for real use cases. It doesn't specify, as far as I 
>>> can
>>> tell, a hash function for combining clusters into tasks or a way to 
>>> provide
>>> Spark a hash function for the other side of a join. It seems unlikely 
>>> to me
>>> that many data sources would have partitioning that happens to match the
>>> other

Re: DataSourceV2 write input requirements

2018-03-28 Thread Patrick Woody
How would Spark determine whether or not to apply a recommendation - a cost
threshold? And yes, it would be good to flesh out what information we get
from Spark in the datasource when providing these
recommendations/requirements - I could see statistics and the existing
outputPartitioning/Ordering of the child plan being used for providing the
requirement.

Should a datasource be able to provide a Distribution proper rather than
just the clustering expressions? Two use cases would be for explicit global
sorting of the dataset and attempting to ensure a minimum write task
size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer 
wrote:

> Thanks for the clarification, definitely would want to require Sort but
> only recommend partitioning ...  I think that would be useful to request
> based on details about the incoming dataset.
>
> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue  wrote:
>
>> A required clustering would not, but a required sort would. Clustering is
>> asking for the input dataframe's partitioning, and sorting would be how
>> each partition is sorted.
>>
>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>> russell.spit...@gmail.com> wrote:
>>
>>> I forgot since it's been a while, but does Clustering support allow
>>> requesting that partitions contain elements in order as well? That would be
>>> a useful trick for me. IE
>>> Request/Require(SortedOn(Col1))
>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>>
>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue 
>>> wrote:
>>>
 Thanks, it makes sense that the existing interface is for aggregation
 and not joins. Why are there requirements for the number of partitions that
 are returned then?

 Does it makes sense to design the write-side `Requirement` classes and
 the read-side reporting separately?

 On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan 
 wrote:

> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
> expose hash function, so Join can't benefit from this interface, as Join
> doesn't require a general ClusteredDistribution, but a more specific one
> called HashClusteredDistribution.
>
> So currently only Aggregate can benefit from
> SupportsReportPartitioning and save shuffle. We can add a new interface to
> expose the hash function to make it work for Join.
>
> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:
>
>> I just took a look at SupportsReportPartitioning and I'm not sure
>> that it will work for real use cases. It doesn't specify, as far as I can
>> tell, a hash function for combining clusters into tasks or a way to 
>> provide
>> Spark a hash function for the other side of a join. It seems unlikely to 
>> me
>> that many data sources would have partitioning that happens to match the
>> other side of a join. And, it looks like task order matters? Maybe I'm
>> missing something?
>>
>> I think that we should design the write side independently based on
>> what data stores actually need, and take a look at the read side based on
>> what data stores can actually provide. Wenchen, was there a design doc 
>> for
>> partitioning on the read path?
>>
>> I completely agree with your point about a global sort. We recommend
>> to all of our data engineers to add a sort to most tables because it
>> introduces the range partitioner and does a skew calculation, in addition
>> to making data filtering much better when it is read. It's really common
>> for tables to be skewed by partition values.
>>
>> rb
>>
>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>> patrick.woo...@gmail.com> wrote:
>>
>>> Hey Ryan, Ted, Wenchen
>>>
>>> Thanks for the quick replies.
>>>
>>> @Ryan - the sorting portion makes sense, but I think we'd have to
>>> ensure something similar to requiredChildDistribution in SparkPlan 
>>> where we
>>> have the number of partitions as well if we'd want to further report to
>>> SupportsReportPartitioning, yeah?
>>>
>>> Specifying an explicit global sort can also be useful for filtering
>>> purposes on Parquet row group stats if we have a time based/high
>>> cardinality ID field. If my datasource or catalog knows about previous
>>> queries on a table, it could be really useful to recommend more 
>>> appropriate
>>> formatting for consumers on the next materialization. The same would be
>>> true of clustering on commonly joined fields.
>>>
>>> Thanks again
>>> Pat
>>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu 
>>> wrote:
>>>
 Hmm. Ryan seems to be right.

 Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/
 reader/SupportsReportPartitioning.java :

 import org.apache.spark.sql.sources.v2.reader.partitioning.
>>

Re: DataSourceV2 write input requirements

2018-03-27 Thread Russell Spitzer
Thanks for the clarification, definitely would want to require Sort but
only recommend partitioning ...  I think that would be useful to request
based on details about the incoming dataset.

On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue  wrote:

> A required clustering would not, but a required sort would. Clustering is
> asking for the input dataframe's partitioning, and sorting would be how
> each partition is sorted.
>
> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> I forgot since it's been a while, but does Clustering support allow
>> requesting that partitions contain elements in order as well? That would be
>> a useful trick for me. IE
>> Request/Require(SortedOn(Col1))
>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>
>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue 
>> wrote:
>>
>>> Thanks, it makes sense that the existing interface is for aggregation
>>> and not joins. Why are there requirements for the number of partitions that
>>> are returned then?
>>>
>>> Does it makes sense to design the write-side `Requirement` classes and
>>> the read-side reporting separately?
>>>
>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan 
>>> wrote:
>>>
 Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
 expose hash function, so Join can't benefit from this interface, as Join
 doesn't require a general ClusteredDistribution, but a more specific one
 called HashClusteredDistribution.

 So currently only Aggregate can benefit from SupportsReportPartitioning
 and save shuffle. We can add a new interface to expose the hash function to
 make it work for Join.

 On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:

> I just took a look at SupportsReportPartitioning and I'm not sure that
> it will work for real use cases. It doesn't specify, as far as I can tell,
> a hash function for combining clusters into tasks or a way to provide 
> Spark
> a hash function for the other side of a join. It seems unlikely to me that
> many data sources would have partitioning that happens to match the other
> side of a join. And, it looks like task order matters? Maybe I'm missing
> something?
>
> I think that we should design the write side independently based on
> what data stores actually need, and take a look at the read side based on
> what data stores can actually provide. Wenchen, was there a design doc for
> partitioning on the read path?
>
> I completely agree with your point about a global sort. We recommend
> to all of our data engineers to add a sort to most tables because it
> introduces the range partitioner and does a skew calculation, in addition
> to making data filtering much better when it is read. It's really common
> for tables to be skewed by partition values.
>
> rb
>
> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
> patrick.woo...@gmail.com> wrote:
>
>> Hey Ryan, Ted, Wenchen
>>
>> Thanks for the quick replies.
>>
>> @Ryan - the sorting portion makes sense, but I think we'd have to
>> ensure something similar to requiredChildDistribution in SparkPlan where 
>> we
>> have the number of partitions as well if we'd want to further report to
>> SupportsReportPartitioning, yeah?
>>
>> Specifying an explicit global sort can also be useful for filtering
>> purposes on Parquet row group stats if we have a time based/high
>> cardinality ID field. If my datasource or catalog knows about previous
>> queries on a table, it could be really useful to recommend more 
>> appropriate
>> formatting for consumers on the next materialization. The same would be
>> true of clustering on commonly joined fields.
>>
>> Thanks again
>> Pat
>>
>>
>>
>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:
>>
>>> Hmm. Ryan seems to be right.
>>>
>>> Looking
>>> at 
>>> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
>>> :
>>>
>>> import
>>> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>>> ...
>>>   Partitioning outputPartitioning();
>>>
>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan 
>>> wrote:
>>>
 Actually clustering is already supported, please take a look at
 SupportsReportPartitioning

 Ordering is not proposed yet, might be similar to what Ryan
 proposed.

 On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu 
 wrote:

> Interesting.
>
> Should requiredClustering return a Set of Expression's ?
> This way, we can determine the order of Expression's by looking at
> what requiredOrdering() returns.
>
> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
> rb...@netflix.com.invalid> wrote:
>>

Re: DataSourceV2 write input requirements

2018-03-27 Thread Ryan Blue
A required clustering would not, but a required sort would. Clustering is
asking for the input dataframe's partitioning, and sorting would be how
each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer 
wrote:

> I forgot since it's been a while, but does Clustering support allow
> requesting that partitions contain elements in order as well? That would be
> a useful trick for me. IE
> Request/Require(SortedOn(Col1))
> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>
> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue 
> wrote:
>
>> Thanks, it makes sense that the existing interface is for aggregation and
>> not joins. Why are there requirements for the number of partitions that are
>> returned then?
>>
>> Does it makes sense to design the write-side `Requirement` classes and
>> the read-side reporting separately?
>>
>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan  wrote:
>>
>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>>> expose hash function, so Join can't benefit from this interface, as Join
>>> doesn't require a general ClusteredDistribution, but a more specific one
>>> called HashClusteredDistribution.
>>>
>>> So currently only Aggregate can benefit from SupportsReportPartitioning
>>> and save shuffle. We can add a new interface to expose the hash function to
>>> make it work for Join.
>>>
>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:
>>>
 I just took a look at SupportsReportPartitioning and I'm not sure that
 it will work for real use cases. It doesn't specify, as far as I can tell,
 a hash function for combining clusters into tasks or a way to provide Spark
 a hash function for the other side of a join. It seems unlikely to me that
 many data sources would have partitioning that happens to match the other
 side of a join. And, it looks like task order matters? Maybe I'm missing
 something?

 I think that we should design the write side independently based on
 what data stores actually need, and take a look at the read side based on
 what data stores can actually provide. Wenchen, was there a design doc for
 partitioning on the read path?

 I completely agree with your point about a global sort. We recommend to
 all of our data engineers to add a sort to most tables because it
 introduces the range partitioner and does a skew calculation, in addition
 to making data filtering much better when it is read. It's really common
 for tables to be skewed by partition values.

 rb

 On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
 patrick.woo...@gmail.com> wrote:

> Hey Ryan, Ted, Wenchen
>
> Thanks for the quick replies.
>
> @Ryan - the sorting portion makes sense, but I think we'd have to
> ensure something similar to requiredChildDistribution in SparkPlan where 
> we
> have the number of partitions as well if we'd want to further report to
> SupportsReportPartitioning, yeah?
>
> Specifying an explicit global sort can also be useful for filtering
> purposes on Parquet row group stats if we have a time based/high
> cardinality ID field. If my datasource or catalog knows about previous
> queries on a table, it could be really useful to recommend more 
> appropriate
> formatting for consumers on the next materialization. The same would be
> true of clustering on commonly joined fields.
>
> Thanks again
> Pat
>
>
>
> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:
>
>> Hmm. Ryan seems to be right.
>>
>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/
>> reader/SupportsReportPartitioning.java :
>>
>> import org.apache.spark.sql.sources.v2.reader.partitioning.
>> Partitioning;
>> ...
>>   Partitioning outputPartitioning();
>>
>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan 
>> wrote:
>>
>>> Actually clustering is already supported, please take a look at
>>> SupportsReportPartitioning
>>>
>>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>>
>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>>>
 Interesting.

 Should requiredClustering return a Set of Expression's ?
 This way, we can determine the order of Expression's by looking at
 what requiredOrdering() returns.

 On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
 rb...@netflix.com.invalid> wrote:

> Hi Pat,
>
> Thanks for starting the discussion on this, we’re really
> interested in it as well. I don’t think there is a proposed API yet, 
> but I
> was thinking something like this:
>
> interface RequiresClustering {
>   List requiredClustering();
> }
>
> interface RequiresSort {
>   List requiredOrdering();
>>

Re: DataSourceV2 write input requirements

2018-03-27 Thread Russell Spitzer
I forgot since it's been a while, but does Clustering support allow
requesting that partitions contain elements in order as well? That would be
a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue  wrote:

> Thanks, it makes sense that the existing interface is for aggregation and
> not joins. Why are there requirements for the number of partitions that are
> returned then?
>
> Does it makes sense to design the write-side `Requirement` classes and the
> read-side reporting separately?
>
> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan  wrote:
>
>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose
>> hash function, so Join can't benefit from this interface, as Join doesn't
>> require a general ClusteredDistribution, but a more specific one
>> called HashClusteredDistribution.
>>
>> So currently only Aggregate can benefit from SupportsReportPartitioning
>> and save shuffle. We can add a new interface to expose the hash function to
>> make it work for Join.
>>
>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:
>>
>>> I just took a look at SupportsReportPartitioning and I'm not sure that
>>> it will work for real use cases. It doesn't specify, as far as I can tell,
>>> a hash function for combining clusters into tasks or a way to provide Spark
>>> a hash function for the other side of a join. It seems unlikely to me that
>>> many data sources would have partitioning that happens to match the other
>>> side of a join. And, it looks like task order matters? Maybe I'm missing
>>> something?
>>>
>>> I think that we should design the write side independently based on what
>>> data stores actually need, and take a look at the read side based on what
>>> data stores can actually provide. Wenchen, was there a design doc for
>>> partitioning on the read path?
>>>
>>> I completely agree with your point about a global sort. We recommend to
>>> all of our data engineers to add a sort to most tables because it
>>> introduces the range partitioner and does a skew calculation, in addition
>>> to making data filtering much better when it is read. It's really common
>>> for tables to be skewed by partition values.
>>>
>>> rb
>>>
>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody >> > wrote:
>>>
 Hey Ryan, Ted, Wenchen

 Thanks for the quick replies.

 @Ryan - the sorting portion makes sense, but I think we'd have to
 ensure something similar to requiredChildDistribution in SparkPlan where we
 have the number of partitions as well if we'd want to further report to
 SupportsReportPartitioning, yeah?

 Specifying an explicit global sort can also be useful for filtering
 purposes on Parquet row group stats if we have a time based/high
 cardinality ID field. If my datasource or catalog knows about previous
 queries on a table, it could be really useful to recommend more appropriate
 formatting for consumers on the next materialization. The same would be
 true of clustering on commonly joined fields.

 Thanks again
 Pat



 On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:

> Hmm. Ryan seems to be right.
>
> Looking
> at 
> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
> :
>
> import
> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
> ...
>   Partitioning outputPartitioning();
>
> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan 
> wrote:
>
>> Actually clustering is already supported, please take a look at
>> SupportsReportPartitioning
>>
>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>
>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>>
>>> Interesting.
>>>
>>> Should requiredClustering return a Set of Expression's ?
>>> This way, we can determine the order of Expression's by looking at
>>> what requiredOrdering() returns.
>>>
>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>> rb...@netflix.com.invalid> wrote:
>>>
 Hi Pat,

 Thanks for starting the discussion on this, we’re really interested
 in it as well. I don’t think there is a proposed API yet, but I was
 thinking something like this:

 interface RequiresClustering {
   List requiredClustering();
 }

 interface RequiresSort {
   List requiredOrdering();
 }

 The reason why RequiresClustering should provide Expression is
 that it needs to be able to customize the implementation. For example,
 writing to HTable would require building a key (or the data for a key) 
 and
 that might use a hash function that differs from Spark’s built-ins.
 RequiresSort is fairly straightforward, but the in

Re: DataSourceV2 write input requirements

2018-03-27 Thread Ryan Blue
Thanks, it makes sense that the existing interface is for aggregation and
not joins. Why are there requirements for the number of partitions that are
returned then?

Does it makes sense to design the write-side `Requirement` classes and the
read-side reporting separately?

On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan  wrote:

> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose
> hash function, so Join can't benefit from this interface, as Join doesn't
> require a general ClusteredDistribution, but a more specific one called
> HashClusteredDistribution.
>
> So currently only Aggregate can benefit from SupportsReportPartitioning
> and save shuffle. We can add a new interface to expose the hash function to
> make it work for Join.
>
> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:
>
>> I just took a look at SupportsReportPartitioning and I'm not sure that it
>> will work for real use cases. It doesn't specify, as far as I can tell, a
>> hash function for combining clusters into tasks or a way to provide Spark a
>> hash function for the other side of a join. It seems unlikely to me that
>> many data sources would have partitioning that happens to match the other
>> side of a join. And, it looks like task order matters? Maybe I'm missing
>> something?
>>
>> I think that we should design the write side independently based on what
>> data stores actually need, and take a look at the read side based on what
>> data stores can actually provide. Wenchen, was there a design doc for
>> partitioning on the read path?
>>
>> I completely agree with your point about a global sort. We recommend to
>> all of our data engineers to add a sort to most tables because it
>> introduces the range partitioner and does a skew calculation, in addition
>> to making data filtering much better when it is read. It's really common
>> for tables to be skewed by partition values.
>>
>> rb
>>
>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody 
>> wrote:
>>
>>> Hey Ryan, Ted, Wenchen
>>>
>>> Thanks for the quick replies.
>>>
>>> @Ryan - the sorting portion makes sense, but I think we'd have to ensure
>>> something similar to requiredChildDistribution in SparkPlan where we have
>>> the number of partitions as well if we'd want to further report to
>>> SupportsReportPartitioning, yeah?
>>>
>>> Specifying an explicit global sort can also be useful for filtering
>>> purposes on Parquet row group stats if we have a time based/high
>>> cardinality ID field. If my datasource or catalog knows about previous
>>> queries on a table, it could be really useful to recommend more appropriate
>>> formatting for consumers on the next materialization. The same would be
>>> true of clustering on commonly joined fields.
>>>
>>> Thanks again
>>> Pat
>>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:
>>>
 Hmm. Ryan seems to be right.

 Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/re
 ader/SupportsReportPartitioning.java :

 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitio
 ning;
 ...
   Partitioning outputPartitioning();

 On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan 
 wrote:

> Actually clustering is already supported, please take a look at
> SupportsReportPartitioning
>
> Ordering is not proposed yet, might be similar to what Ryan proposed.
>
> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>
>> Interesting.
>>
>> Should requiredClustering return a Set of Expression's ?
>> This way, we can determine the order of Expression's by looking at
>> what requiredOrdering() returns.
>>
>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue > > wrote:
>>
>>> Hi Pat,
>>>
>>> Thanks for starting the discussion on this, we’re really interested
>>> in it as well. I don’t think there is a proposed API yet, but I was
>>> thinking something like this:
>>>
>>> interface RequiresClustering {
>>>   List requiredClustering();
>>> }
>>>
>>> interface RequiresSort {
>>>   List requiredOrdering();
>>> }
>>>
>>> The reason why RequiresClustering should provide Expression is that
>>> it needs to be able to customize the implementation. For example, 
>>> writing
>>> to HTable would require building a key (or the data for a key) and that
>>> might use a hash function that differs from Spark’s built-ins.
>>> RequiresSort is fairly straightforward, but the interaction between
>>> the two requirements deserves some consideration. To make the two
>>> compatible, I think that RequiresSort must be interpreted as a sort
>>> within each partition of the clustering, but could possibly be used for 
>>> a
>>> global sort when the two overlap.
>>>
>>> For example, if I have a table partitioned by “day” and “category”
>>> then the RequiredClustering would be by day, category. A required
>>> sort might 

Re: DataSourceV2 write input requirements

2018-03-27 Thread Wenchen Fan
Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose
hash function, so Join can't benefit from this interface, as Join doesn't
require a general ClusteredDistribution, but a more specific one
called HashClusteredDistribution.

So currently only Aggregate can benefit from SupportsReportPartitioning and
save shuffle. We can add a new interface to expose the hash function to
make it work for Join.

On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:

> I just took a look at SupportsReportPartitioning and I'm not sure that it
> will work for real use cases. It doesn't specify, as far as I can tell, a
> hash function for combining clusters into tasks or a way to provide Spark a
> hash function for the other side of a join. It seems unlikely to me that
> many data sources would have partitioning that happens to match the other
> side of a join. And, it looks like task order matters? Maybe I'm missing
> something?
>
> I think that we should design the write side independently based on what
> data stores actually need, and take a look at the read side based on what
> data stores can actually provide. Wenchen, was there a design doc for
> partitioning on the read path?
>
> I completely agree with your point about a global sort. We recommend to
> all of our data engineers to add a sort to most tables because it
> introduces the range partitioner and does a skew calculation, in addition
> to making data filtering much better when it is read. It's really common
> for tables to be skewed by partition values.
>
> rb
>
> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody 
> wrote:
>
>> Hey Ryan, Ted, Wenchen
>>
>> Thanks for the quick replies.
>>
>> @Ryan - the sorting portion makes sense, but I think we'd have to ensure
>> something similar to requiredChildDistribution in SparkPlan where we have
>> the number of partitions as well if we'd want to further report to
>> SupportsReportPartitioning, yeah?
>>
>> Specifying an explicit global sort can also be useful for filtering
>> purposes on Parquet row group stats if we have a time based/high
>> cardinality ID field. If my datasource or catalog knows about previous
>> queries on a table, it could be really useful to recommend more appropriate
>> formatting for consumers on the next materialization. The same would be
>> true of clustering on commonly joined fields.
>>
>> Thanks again
>> Pat
>>
>>
>>
>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:
>>
>>> Hmm. Ryan seems to be right.
>>>
>>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/re
>>> ader/SupportsReportPartitioning.java :
>>>
>>> import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>>> ...
>>>   Partitioning outputPartitioning();
>>>
>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan 
>>> wrote:
>>>
 Actually clustering is already supported, please take a look at
 SupportsReportPartitioning

 Ordering is not proposed yet, might be similar to what Ryan proposed.

 On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:

> Interesting.
>
> Should requiredClustering return a Set of Expression's ?
> This way, we can determine the order of Expression's by looking at
> what requiredOrdering() returns.
>
> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
> wrote:
>
>> Hi Pat,
>>
>> Thanks for starting the discussion on this, we’re really interested
>> in it as well. I don’t think there is a proposed API yet, but I was
>> thinking something like this:
>>
>> interface RequiresClustering {
>>   List requiredClustering();
>> }
>>
>> interface RequiresSort {
>>   List requiredOrdering();
>> }
>>
>> The reason why RequiresClustering should provide Expression is that
>> it needs to be able to customize the implementation. For example, writing
>> to HTable would require building a key (or the data for a key) and that
>> might use a hash function that differs from Spark’s built-ins.
>> RequiresSort is fairly straightforward, but the interaction between
>> the two requirements deserves some consideration. To make the two
>> compatible, I think that RequiresSort must be interpreted as a sort
>> within each partition of the clustering, but could possibly be used for a
>> global sort when the two overlap.
>>
>> For example, if I have a table partitioned by “day” and “category”
>> then the RequiredClustering would be by day, category. A required
>> sort might be day ASC, category DESC, name ASC. Because that sort
>> satisfies the required clustering, it could be used for a global 
>> ordering.
>> But, is that useful? How would the global ordering matter beyond a sort
>> within each partition, i.e., how would the partition’s place in the 
>> global
>> ordering be passed?
>>
>> To your other questions, you might want to have a look at the recent
>> SPIP I’m working on to consolidate and cle

Re: DataSourceV2 write input requirements

2018-03-27 Thread Ryan Blue
I just took a look at SupportsReportPartitioning and I'm not sure that it
will work for real use cases. It doesn't specify, as far as I can tell, a
hash function for combining clusters into tasks or a way to provide Spark a
hash function for the other side of a join. It seems unlikely to me that
many data sources would have partitioning that happens to match the other
side of a join. And, it looks like task order matters? Maybe I'm missing
something?

I think that we should design the write side independently based on what
data stores actually need, and take a look at the read side based on what
data stores can actually provide. Wenchen, was there a design doc for
partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all
of our data engineers to add a sort to most tables because it introduces
the range partitioner and does a skew calculation, in addition to making
data filtering much better when it is read. It's really common for tables
to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody 
wrote:

> Hey Ryan, Ted, Wenchen
>
> Thanks for the quick replies.
>
> @Ryan - the sorting portion makes sense, but I think we'd have to ensure
> something similar to requiredChildDistribution in SparkPlan where we have
> the number of partitions as well if we'd want to further report to
> SupportsReportPartitioning, yeah?
>
> Specifying an explicit global sort can also be useful for filtering
> purposes on Parquet row group stats if we have a time based/high
> cardinality ID field. If my datasource or catalog knows about previous
> queries on a table, it could be really useful to recommend more appropriate
> formatting for consumers on the next materialization. The same would be
> true of clustering on commonly joined fields.
>
> Thanks again
> Pat
>
>
>
> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:
>
>> Hmm. Ryan seems to be right.
>>
>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/re
>> ader/SupportsReportPartitioning.java :
>>
>> import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>> ...
>>   Partitioning outputPartitioning();
>>
>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan  wrote:
>>
>>> Actually clustering is already supported, please take a look at
>>> SupportsReportPartitioning
>>>
>>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>>
>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>>>
 Interesting.

 Should requiredClustering return a Set of Expression's ?
 This way, we can determine the order of Expression's by looking at what 
 requiredOrdering()
 returns.

 On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
 wrote:

> Hi Pat,
>
> Thanks for starting the discussion on this, we’re really interested in
> it as well. I don’t think there is a proposed API yet, but I was thinking
> something like this:
>
> interface RequiresClustering {
>   List requiredClustering();
> }
>
> interface RequiresSort {
>   List requiredOrdering();
> }
>
> The reason why RequiresClustering should provide Expression is that
> it needs to be able to customize the implementation. For example, writing
> to HTable would require building a key (or the data for a key) and that
> might use a hash function that differs from Spark’s built-ins.
> RequiresSort is fairly straightforward, but the interaction between
> the two requirements deserves some consideration. To make the two
> compatible, I think that RequiresSort must be interpreted as a sort
> within each partition of the clustering, but could possibly be used for a
> global sort when the two overlap.
>
> For example, if I have a table partitioned by “day” and “category”
> then the RequiredClustering would be by day, category. A required
> sort might be day ASC, category DESC, name ASC. Because that sort
> satisfies the required clustering, it could be used for a global ordering.
> But, is that useful? How would the global ordering matter beyond a sort
> within each partition, i.e., how would the partition’s place in the global
> ordering be passed?
>
> To your other questions, you might want to have a look at the recent
> SPIP I’m working on to consolidate and clean up logical plans
> .
> That proposes more specific uses for the DataSourceV2 API that should help
> clarify what validation needs to take place. As for custom catalyst rules,
> I’d like to hear about the use cases to see if we can build it into these
> improvements.
>
> rb
> ​
>
> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
> patrick.woo...@gmail.com> wrote:
>
>> Hey all,
>>
>> I saw in some of the discussions around Dat

Re: DataSourceV2 write input requirements

2018-03-26 Thread Wenchen Fan
Yea it is for read-side only. I think for the write-side, implementations
can provide some options to allow users to set partitioning/ordering, or
the data source has a natural partitioning/ordering which doesn't require
any interface.

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody 
wrote:

> Hey Ryan, Ted, Wenchen
>
> Thanks for the quick replies.
>
> @Ryan - the sorting portion makes sense, but I think we'd have to ensure
> something similar to requiredChildDistribution in SparkPlan where we have
> the number of partitions as well if we'd want to further report to
> SupportsReportPartitioning, yeah?
>
> Specifying an explicit global sort can also be useful for filtering
> purposes on Parquet row group stats if we have a time based/high
> cardinality ID field. If my datasource or catalog knows about previous
> queries on a table, it could be really useful to recommend more appropriate
> formatting for consumers on the next materialization. The same would be
> true of clustering on commonly joined fields.
>
> Thanks again
> Pat
>
>
>
> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:
>
>> Hmm. Ryan seems to be right.
>>
>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/re
>> ader/SupportsReportPartitioning.java :
>>
>> import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>> ...
>>   Partitioning outputPartitioning();
>>
>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan  wrote:
>>
>>> Actually clustering is already supported, please take a look at
>>> SupportsReportPartitioning
>>>
>>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>>
>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>>>
 Interesting.

 Should requiredClustering return a Set of Expression's ?
 This way, we can determine the order of Expression's by looking at what 
 requiredOrdering()
 returns.

 On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
 wrote:

> Hi Pat,
>
> Thanks for starting the discussion on this, we’re really interested in
> it as well. I don’t think there is a proposed API yet, but I was thinking
> something like this:
>
> interface RequiresClustering {
>   List requiredClustering();
> }
>
> interface RequiresSort {
>   List requiredOrdering();
> }
>
> The reason why RequiresClustering should provide Expression is that
> it needs to be able to customize the implementation. For example, writing
> to HTable would require building a key (or the data for a key) and that
> might use a hash function that differs from Spark’s built-ins.
> RequiresSort is fairly straightforward, but the interaction between
> the two requirements deserves some consideration. To make the two
> compatible, I think that RequiresSort must be interpreted as a sort
> within each partition of the clustering, but could possibly be used for a
> global sort when the two overlap.
>
> For example, if I have a table partitioned by “day” and “category”
> then the RequiredClustering would be by day, category. A required
> sort might be day ASC, category DESC, name ASC. Because that sort
> satisfies the required clustering, it could be used for a global ordering.
> But, is that useful? How would the global ordering matter beyond a sort
> within each partition, i.e., how would the partition’s place in the global
> ordering be passed?
>
> To your other questions, you might want to have a look at the recent
> SPIP I’m working on to consolidate and clean up logical plans
> .
> That proposes more specific uses for the DataSourceV2 API that should help
> clarify what validation needs to take place. As for custom catalyst rules,
> I’d like to hear about the use cases to see if we can build it into these
> improvements.
>
> rb
> ​
>
> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
> patrick.woo...@gmail.com> wrote:
>
>> Hey all,
>>
>> I saw in some of the discussions around DataSourceV2 writes that we
>> might have the data source inform Spark of requirements for the input
>> data's ordering and partitioning. Has there been a proposed API for that
>> yet?
>>
>> Even one level up it would be helpful to understand how I should be
>> thinking about the responsibility of the data source writer, when I 
>> should
>> be inserting a custom catalyst rule, and how I should handle
>> validation/assumptions of the table before attempting the write.
>>
>> Thanks!
>> Pat
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


>>>
>>
>


Re: DataSourceV2 write input requirements

2018-03-26 Thread Patrick Woody
Hey Ryan, Ted, Wenchen

Thanks for the quick replies.

@Ryan - the sorting portion makes sense, but I think we'd have to ensure
something similar to requiredChildDistribution in SparkPlan where we have
the number of partitions as well if we'd want to further report to
SupportsReportPartitioning, yeah?

Specifying an explicit global sort can also be useful for filtering
purposes on Parquet row group stats if we have a time based/high
cardinality ID field. If my datasource or catalog knows about previous
queries on a table, it could be really useful to recommend more appropriate
formatting for consumers on the next materialization. The same would be
true of clustering on commonly joined fields.

Thanks again
Pat



On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu  wrote:

> Hmm. Ryan seems to be right.
>
> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/
> SupportsReportPartitioning.java :
>
> import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
> ...
>   Partitioning outputPartitioning();
>
> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan  wrote:
>
>> Actually clustering is already supported, please take a look at
>> SupportsReportPartitioning
>>
>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>
>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>>
>>> Interesting.
>>>
>>> Should requiredClustering return a Set of Expression's ?
>>> This way, we can determine the order of Expression's by looking at what 
>>> requiredOrdering()
>>> returns.
>>>
>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
>>> wrote:
>>>
 Hi Pat,

 Thanks for starting the discussion on this, we’re really interested in
 it as well. I don’t think there is a proposed API yet, but I was thinking
 something like this:

 interface RequiresClustering {
   List requiredClustering();
 }

 interface RequiresSort {
   List requiredOrdering();
 }

 The reason why RequiresClustering should provide Expression is that it
 needs to be able to customize the implementation. For example, writing to
 HTable would require building a key (or the data for a key) and that might
 use a hash function that differs from Spark’s built-ins. RequiresSort
 is fairly straightforward, but the interaction between the two requirements
 deserves some consideration. To make the two compatible, I think that
 RequiresSort must be interpreted as a sort within each partition of
 the clustering, but could possibly be used for a global sort when the two
 overlap.

 For example, if I have a table partitioned by “day” and “category” then
 the RequiredClustering would be by day, category. A required sort
 might be day ASC, category DESC, name ASC. Because that sort satisfies
 the required clustering, it could be used for a global ordering. But, is
 that useful? How would the global ordering matter beyond a sort within each
 partition, i.e., how would the partition’s place in the global ordering be
 passed?

 To your other questions, you might want to have a look at the recent
 SPIP I’m working on to consolidate and clean up logical plans
 .
 That proposes more specific uses for the DataSourceV2 API that should help
 clarify what validation needs to take place. As for custom catalyst rules,
 I’d like to hear about the use cases to see if we can build it into these
 improvements.

 rb
 ​

 On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
 patrick.woo...@gmail.com> wrote:

> Hey all,
>
> I saw in some of the discussions around DataSourceV2 writes that we
> might have the data source inform Spark of requirements for the input
> data's ordering and partitioning. Has there been a proposed API for that
> yet?
>
> Even one level up it would be helpful to understand how I should be
> thinking about the responsibility of the data source writer, when I should
> be inserting a custom catalyst rule, and how I should handle
> validation/assumptions of the table before attempting the write.
>
> Thanks!
> Pat
>



 --
 Ryan Blue
 Software Engineer
 Netflix

>>>
>>>
>>
>


Re: DataSourceV2 write input requirements

2018-03-26 Thread Ted Yu
Hmm. Ryan seems to be right.

Looking
at 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
:

import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
...
  Partitioning outputPartitioning();

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan  wrote:

> Actually clustering is already supported, please take a look at
> SupportsReportPartitioning
>
> Ordering is not proposed yet, might be similar to what Ryan proposed.
>
> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>
>> Interesting.
>>
>> Should requiredClustering return a Set of Expression's ?
>> This way, we can determine the order of Expression's by looking at what 
>> requiredOrdering()
>> returns.
>>
>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
>> wrote:
>>
>>> Hi Pat,
>>>
>>> Thanks for starting the discussion on this, we’re really interested in
>>> it as well. I don’t think there is a proposed API yet, but I was thinking
>>> something like this:
>>>
>>> interface RequiresClustering {
>>>   List requiredClustering();
>>> }
>>>
>>> interface RequiresSort {
>>>   List requiredOrdering();
>>> }
>>>
>>> The reason why RequiresClustering should provide Expression is that it
>>> needs to be able to customize the implementation. For example, writing to
>>> HTable would require building a key (or the data for a key) and that might
>>> use a hash function that differs from Spark’s built-ins. RequiresSort
>>> is fairly straightforward, but the interaction between the two requirements
>>> deserves some consideration. To make the two compatible, I think that
>>> RequiresSort must be interpreted as a sort within each partition of the
>>> clustering, but could possibly be used for a global sort when the two
>>> overlap.
>>>
>>> For example, if I have a table partitioned by “day” and “category” then
>>> the RequiredClustering would be by day, category. A required sort might
>>> be day ASC, category DESC, name ASC. Because that sort satisfies the
>>> required clustering, it could be used for a global ordering. But, is that
>>> useful? How would the global ordering matter beyond a sort within each
>>> partition, i.e., how would the partition’s place in the global ordering be
>>> passed?
>>>
>>> To your other questions, you might want to have a look at the recent
>>> SPIP I’m working on to consolidate and clean up logical plans
>>> .
>>> That proposes more specific uses for the DataSourceV2 API that should help
>>> clarify what validation needs to take place. As for custom catalyst rules,
>>> I’d like to hear about the use cases to see if we can build it into these
>>> improvements.
>>>
>>> rb
>>> ​
>>>
>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody >> > wrote:
>>>
 Hey all,

 I saw in some of the discussions around DataSourceV2 writes that we
 might have the data source inform Spark of requirements for the input
 data's ordering and partitioning. Has there been a proposed API for that
 yet?

 Even one level up it would be helpful to understand how I should be
 thinking about the responsibility of the data source writer, when I should
 be inserting a custom catalyst rule, and how I should handle
 validation/assumptions of the table before attempting the write.

 Thanks!
 Pat

>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>


Re: DataSourceV2 write input requirements

2018-03-26 Thread Ryan Blue
Wenchen, I thought SupportsReportPartitioning was for the read side. It
works with the write side as well?

On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan  wrote:

> Actually clustering is already supported, please take a look at
> SupportsReportPartitioning
>
> Ordering is not proposed yet, might be similar to what Ryan proposed.
>
> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:
>
>> Interesting.
>>
>> Should requiredClustering return a Set of Expression's ?
>> This way, we can determine the order of Expression's by looking at what 
>> requiredOrdering()
>> returns.
>>
>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
>> wrote:
>>
>>> Hi Pat,
>>>
>>> Thanks for starting the discussion on this, we’re really interested in
>>> it as well. I don’t think there is a proposed API yet, but I was thinking
>>> something like this:
>>>
>>> interface RequiresClustering {
>>>   List requiredClustering();
>>> }
>>>
>>> interface RequiresSort {
>>>   List requiredOrdering();
>>> }
>>>
>>> The reason why RequiresClustering should provide Expression is that it
>>> needs to be able to customize the implementation. For example, writing to
>>> HTable would require building a key (or the data for a key) and that might
>>> use a hash function that differs from Spark’s built-ins. RequiresSort
>>> is fairly straightforward, but the interaction between the two requirements
>>> deserves some consideration. To make the two compatible, I think that
>>> RequiresSort must be interpreted as a sort within each partition of the
>>> clustering, but could possibly be used for a global sort when the two
>>> overlap.
>>>
>>> For example, if I have a table partitioned by “day” and “category” then
>>> the RequiredClustering would be by day, category. A required sort might
>>> be day ASC, category DESC, name ASC. Because that sort satisfies the
>>> required clustering, it could be used for a global ordering. But, is that
>>> useful? How would the global ordering matter beyond a sort within each
>>> partition, i.e., how would the partition’s place in the global ordering be
>>> passed?
>>>
>>> To your other questions, you might want to have a look at the recent
>>> SPIP I’m working on to consolidate and clean up logical plans
>>> .
>>> That proposes more specific uses for the DataSourceV2 API that should help
>>> clarify what validation needs to take place. As for custom catalyst rules,
>>> I’d like to hear about the use cases to see if we can build it into these
>>> improvements.
>>>
>>> rb
>>> ​
>>>
>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody >> > wrote:
>>>
 Hey all,

 I saw in some of the discussions around DataSourceV2 writes that we
 might have the data source inform Spark of requirements for the input
 data's ordering and partitioning. Has there been a proposed API for that
 yet?

 Even one level up it would be helpful to understand how I should be
 thinking about the responsibility of the data source writer, when I should
 be inserting a custom catalyst rule, and how I should handle
 validation/assumptions of the table before attempting the write.

 Thanks!
 Pat

>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 write input requirements

2018-03-26 Thread Wenchen Fan
Actually clustering is already supported, please take a look at
SupportsReportPartitioning

Ordering is not proposed yet, might be similar to what Ryan proposed.

On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu  wrote:

> Interesting.
>
> Should requiredClustering return a Set of Expression's ?
> This way, we can determine the order of Expression's by looking at what 
> requiredOrdering()
> returns.
>
> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
> wrote:
>
>> Hi Pat,
>>
>> Thanks for starting the discussion on this, we’re really interested in it
>> as well. I don’t think there is a proposed API yet, but I was thinking
>> something like this:
>>
>> interface RequiresClustering {
>>   List requiredClustering();
>> }
>>
>> interface RequiresSort {
>>   List requiredOrdering();
>> }
>>
>> The reason why RequiresClustering should provide Expression is that it
>> needs to be able to customize the implementation. For example, writing to
>> HTable would require building a key (or the data for a key) and that might
>> use a hash function that differs from Spark’s built-ins. RequiresSort is
>> fairly straightforward, but the interaction between the two requirements
>> deserves some consideration. To make the two compatible, I think that
>> RequiresSort must be interpreted as a sort within each partition of the
>> clustering, but could possibly be used for a global sort when the two
>> overlap.
>>
>> For example, if I have a table partitioned by “day” and “category” then
>> the RequiredClustering would be by day, category. A required sort might
>> be day ASC, category DESC, name ASC. Because that sort satisfies the
>> required clustering, it could be used for a global ordering. But, is that
>> useful? How would the global ordering matter beyond a sort within each
>> partition, i.e., how would the partition’s place in the global ordering be
>> passed?
>>
>> To your other questions, you might want to have a look at the recent SPIP
>> I’m working on to consolidate and clean up logical plans
>> .
>> That proposes more specific uses for the DataSourceV2 API that should help
>> clarify what validation needs to take place. As for custom catalyst rules,
>> I’d like to hear about the use cases to see if we can build it into these
>> improvements.
>>
>> rb
>> ​
>>
>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody 
>> wrote:
>>
>>> Hey all,
>>>
>>> I saw in some of the discussions around DataSourceV2 writes that we
>>> might have the data source inform Spark of requirements for the input
>>> data's ordering and partitioning. Has there been a proposed API for that
>>> yet?
>>>
>>> Even one level up it would be helpful to understand how I should be
>>> thinking about the responsibility of the data source writer, when I should
>>> be inserting a custom catalyst rule, and how I should handle
>>> validation/assumptions of the table before attempting the write.
>>>
>>> Thanks!
>>> Pat
>>>
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


Re: DataSourceV2 write input requirements

2018-03-26 Thread Ted Yu
Interesting.

Should requiredClustering return a Set of Expression's ?
This way, we can determine the order of Expression's by looking at
what requiredOrdering()
returns.

On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue 
wrote:

> Hi Pat,
>
> Thanks for starting the discussion on this, we’re really interested in it
> as well. I don’t think there is a proposed API yet, but I was thinking
> something like this:
>
> interface RequiresClustering {
>   List requiredClustering();
> }
>
> interface RequiresSort {
>   List requiredOrdering();
> }
>
> The reason why RequiresClustering should provide Expression is that it
> needs to be able to customize the implementation. For example, writing to
> HTable would require building a key (or the data for a key) and that might
> use a hash function that differs from Spark’s built-ins. RequiresSort is
> fairly straightforward, but the interaction between the two requirements
> deserves some consideration. To make the two compatible, I think that
> RequiresSort must be interpreted as a sort within each partition of the
> clustering, but could possibly be used for a global sort when the two
> overlap.
>
> For example, if I have a table partitioned by “day” and “category” then
> the RequiredClustering would be by day, category. A required sort might
> be day ASC, category DESC, name ASC. Because that sort satisfies the
> required clustering, it could be used for a global ordering. But, is that
> useful? How would the global ordering matter beyond a sort within each
> partition, i.e., how would the partition’s place in the global ordering be
> passed?
>
> To your other questions, you might want to have a look at the recent SPIP
> I’m working on to consolidate and clean up logical plans
> .
> That proposes more specific uses for the DataSourceV2 API that should help
> clarify what validation needs to take place. As for custom catalyst rules,
> I’d like to hear about the use cases to see if we can build it into these
> improvements.
>
> rb
> ​
>
> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody 
> wrote:
>
>> Hey all,
>>
>> I saw in some of the discussions around DataSourceV2 writes that we might
>> have the data source inform Spark of requirements for the input data's
>> ordering and partitioning. Has there been a proposed API for that yet?
>>
>> Even one level up it would be helpful to understand how I should be
>> thinking about the responsibility of the data source writer, when I should
>> be inserting a custom catalyst rule, and how I should handle
>> validation/assumptions of the table before attempting the write.
>>
>> Thanks!
>> Pat
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: DataSourceV2 write input requirements

2018-03-26 Thread Ryan Blue
Hi Pat,

Thanks for starting the discussion on this, we’re really interested in it
as well. I don’t think there is a proposed API yet, but I was thinking
something like this:

interface RequiresClustering {
  List requiredClustering();
}

interface RequiresSort {
  List requiredOrdering();
}

The reason why RequiresClustering should provide Expression is that it
needs to be able to customize the implementation. For example, writing to
HTable would require building a key (or the data for a key) and that might
use a hash function that differs from Spark’s built-ins. RequiresSort is
fairly straightforward, but the interaction between the two requirements
deserves some consideration. To make the two compatible, I think that
RequiresSort must be interpreted as a sort within each partition of the
clustering, but could possibly be used for a global sort when the two
overlap.

For example, if I have a table partitioned by “day” and “category” then the
RequiredClustering would be by day, category. A required sort might be day
ASC, category DESC, name ASC. Because that sort satisfies the required
clustering, it could be used for a global ordering. But, is that useful?
How would the global ordering matter beyond a sort within each partition,
i.e., how would the partition’s place in the global ordering be passed?

To your other questions, you might want to have a look at the recent SPIP
I’m working on to consolidate and clean up logical plans
.
That proposes more specific uses for the DataSourceV2 API that should help
clarify what validation needs to take place. As for custom catalyst rules,
I’d like to hear about the use cases to see if we can build it into these
improvements.

rb
​

On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody 
wrote:

> Hey all,
>
> I saw in some of the discussions around DataSourceV2 writes that we might
> have the data source inform Spark of requirements for the input data's
> ordering and partitioning. Has there been a proposed API for that yet?
>
> Even one level up it would be helpful to understand how I should be
> thinking about the responsibility of the data source writer, when I should
> be inserting a custom catalyst rule, and how I should handle
> validation/assumptions of the table before attempting the write.
>
> Thanks!
> Pat
>



-- 
Ryan Blue
Software Engineer
Netflix


DataSourceV2 write input requirements

2018-03-26 Thread Patrick Woody
Hey all,

I saw in some of the discussions around DataSourceV2 writes that we might
have the data source inform Spark of requirements for the input data's
ordering and partitioning. Has there been a proposed API for that yet?

Even one level up it would be helpful to understand how I should be
thinking about the responsibility of the data source writer, when I should
be inserting a custom catalyst rule, and how I should handle
validation/assumptions of the table before attempting the write.

Thanks!
Pat