+1
-------- Original message --------From: Ryan Blue <rb...@netflix.com> Date: 
3/30/18  2:28 PM  (GMT-08:00) To: Patrick Woody <patrick.woo...@gmail.com> Cc: 
Russell Spitzer <russell.spit...@gmail.com>, Wenchen Fan <cloud0...@gmail.com>, 
Ted Yu <yuzhih...@gmail.com>, Spark Dev List <dev@spark.apache.org> Subject: 
Re: DataSourceV2 write input requirements 
You're right. A global sort would change the clustering if it had more fields 
than the clustering.
Then what about this: if there is no RequiredClustering, then the sort is a 
global sort. If RequiredClustering is present, then the clustering is applied 
and the sort is a partition-level sort.
That rule would mean that within a partition you always get the sort, but an 
explicit clustering overrides the partitioning a sort might try to introduce. 
Does that sound reasonable?
rb
On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woo...@gmail.com> 
wrote:
Does that methodology work in this specific case? The ordering must be a subset 
of the clustering to guarantee they exist in the same partition when doing a 
global sort I thought. Though I get the gist that if it does satisfy, then 
there is no reason to not choose the global sort.

On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote:
> Can you expand on how the ordering containing the clustering expressions 
>would ensure the global sort?
The idea was to basically assume that if the clustering can be satisfied by a 
global sort, then do the global sort. For example, if the clustering is 
Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort by 
columns a, b, and c.
Technically, you could do this with a hash partitioner instead of a range 
partitioner and sort within each partition, but that doesn't make much sense 
because the partitioning would ensure that each partition has just one 
combination of the required clustering columns. Using a hash partitioner would 
make it so that the in-partition sort basically ignores the first few values, 
so it must be that the intent was a global sort.
On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <patrick.woo...@gmail.com> wrote:
Right, you could use this to store a global ordering if there is only 
one write (e.g., CTAS). I don’t think anything needs to change in that 
case, you would still have a clustering and an ordering, but the 
ordering would need to include all fields of the clustering. A way to 
pass in the partition ordinal for the source to store would be required.
Can you expand on how the ordering containing the clustering expressions would 
ensure the global sort? Having an RangePartitioning would certainly satisfy, 
but it isn't required - is the suggestion that if Spark sees this overlap, then 
it plans a global sort?

On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <russell.spit...@gmail.com> 
wrote:
@RyanBlue I'm hoping that through the CBO effort we will continue to get more 
detailed statistics. Like on read we could be using sketch data structures to 
get estimates on unique values and density for each column. You may be right 
that the real way for this to be handled would be giving a "cost" back to a 
higher order optimizer which can decide which method to use rather than having 
the data source itself do it. This is probably in a far future version of the 
api.

On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote:

Cassandra can insert records with the same partition-key faster if they arrive 
in the same payload. But this is only beneficial if the incoming dataset has 
multiple entries for the same partition key.

Thanks for the example, the recommended partitioning use case makes more sense 
now. I think we could have two interfaces, a RequiresClustering and a 
RecommendsClustering if we want to support this. But I’m skeptical it will be 
useful for two reasons:

Do we want to optimize the low cardinality case? Shuffles are usually much 
cheaper at smaller sizes, so I’m not sure it is necessary to optimize this away.
How do we know there isn’t just a few partition keys for all the records? It 
may look like a shuffle wouldn’t help, but we don’t know the partition keys 
until it is too late.

Then there’s also the logic for avoiding the shuffle and how to calculate the 
cost, which sounds like something that needs some details from CBO.

I would assume that given the estimated data size from Spark and options passed 
in from the user, the data source could make a more intelligent requirement on 
the write format than Spark independently. 

This is a good point.
What would an implementation actually do here and how would information be 
passed? For my use cases, the store would produce the number of tasks based on 
the estimated incoming rows, because the source has the best idea of how the 
rows will compress. But, that’s just applying a multiplier most of the time. To 
be very useful, this would have to handle skew in the rows (think row with a 
type where total size depends on type) and that’s a bit harder. I think maybe 
an interface that can provide relative cost estimates based on partition keys 
would be helpful, but then keep the planning logic in Spark.
This is probably something that we could add later as we find use cases that 
require it?

I wouldn’t assume that a data source requiring a certain write format would 
give any guarantees around reading the same data? In the cases where it is a 
complete overwrite it would, but for independent writes it could still be 
useful for statistics or compression.

Right, you could use this to store a global ordering if there is only one write 
(e.g., CTAS). I don’t think anything needs to change in that case, you would 
still have a clustering and an ordering, but the ordering would need to include 
all fields of the clustering. A way to pass in the partition ordinal for the 
source to store would be required.
For the second point that ordering is useful for statistics and compression, I 
completely agree. Our best practices doc tells users to always add a global 
sort when writing because you get the benefit of a range partitioner to handle 
skew, plus the stats and compression you’re talking about to optimize for 
reads. I think the proposed API can request a global ordering from Spark 
already. My only point is that there isn’t much the source can do to guarantee 
ordering for reads when there is more than one write.
​
On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <patrick.woo...@gmail.com> wrote:
Spark would always apply the 
required clustering and sort order because they are required by the data
 source. It is reasonable for a source to reject data that isn’t 
properly prepared. For example, data must be written to HTable files 
with keys in order or else the files are invalid. Sorting should not be 
implemented in the sources themselves because Spark handles concerns 
like spilling to disk. Spark must prepare data correctly, which is why 
the interfaces start with “Requires”.

This was in reference to Russell's suggestion that the data source could have a 
required sort, but only a recommended partitioning. I don't have an immediate 
recommending use case that would come to mind though. I'm definitely in sync 
that the data source itself shouldn't do work outside of the writes themselves.

Considering the second use 
case you mentioned first, I don’t think it is a good idea for a table to
 put requirements on the number of tasks used for a write. The 
parallelism should be set appropriately for the data volume, which is 
for Spark or the user to determine. A minimum or maximum number of tasks
 could cause bad behavior.
For your first use case, an 
explicit global ordering, the problem is that there can’t be an explicit
 global ordering for a table when it is populated by a series of 
independent writes. Each write could have a global order, but once those
 files are written, you have to deal with multiple sorted data sets. I 
think it makes sense to focus on order within data files, not order 
between data files.
This is where I'm interested in learning about the separation of 
responsibilities for the data source and how "smart" it is supposed to be.

For the first part, I would assume that given the estimated data size from 
Spark and options passed in from the user, the data source could make a more 
intelligent requirement on the write format than Spark independently. Somewhat 
analogous to how the current FileSource does bin packing of small files on the 
read side, restricting parallelism for the sake of overhead.

For the second, I wouldn't assume that a data source requiring a certain write 
format would give any guarantees around reading the same data? In the cases 
where it is a complete overwrite it would, but for independent writes it could 
still be useful for statistics or compression.

Thanks
Pat

 
On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> wrote:

Reply via email to