bq. this shuffle could outweigh the benefits of the organized data if the
cardinality is lower.

I wonder if you meant higher in place of the last word above.

Cheers

On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer <russell.spit...@gmail.com>
wrote:

> For added color, one thing that I may want to consider as a data source
> implementer is the cost / benefit of applying a particular clustering. For
> example, a dataset with low cardinality in the clustering key could benefit
> greatly from clustering on that key before writing to Cassandra since
> Cassandra can benefit from these sorts of batching. But the cost of
> performing this shuffle could outweigh the benefits of the organized data
> if the cardinality is lower.
>
> I imagine other sources might have similar benefit calculations. Doing a
> particular sort or clustering can provide increased throughput but only in
> certain situations based on some facts about the data.
>
>
> For a concrete example here.
>
> Cassandra can insert records with the same partition-key faster if they
> arrive in the same payload. But this is only beneficial if the incoming
> dataset has multiple entries for the same partition key. If the incoming
> source does not have any duplicates then there is no benefit to requiring a
> sort or partitioning.
>
> On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody <patrick.woo...@gmail.com>
> wrote:
>
>> Spark would always apply the required clustering and sort order because
>>> they are required by the data source. It is reasonable for a source to
>>> reject data that isn’t properly prepared. For example, data must be written
>>> to HTable files with keys in order or else the files are invalid. Sorting
>>> should not be implemented in the sources themselves because Spark handles
>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>> why the interfaces start with “Requires”.
>>
>>
>> This was in reference to Russell's suggestion that the data source could
>> have a required sort, but only a recommended partitioning. I don't have an
>> immediate recommending use case that would come to mind though. I'm
>> definitely in sync that the data source itself shouldn't do work outside of
>> the writes themselves.
>>
>>
>> Considering the second use case you mentioned first, I don’t think it is
>>> a good idea for a table to put requirements on the number of tasks used for
>>> a write. The parallelism should be set appropriately for the data volume,
>>> which is for Spark or the user to determine. A minimum or maximum number of
>>> tasks could cause bad behavior.
>>
>> For your first use case, an explicit global ordering, the problem is that
>>> there can’t be an explicit global ordering for a table when it is populated
>>> by a series of independent writes. Each write could have a global order,
>>> but once those files are written, you have to deal with multiple sorted
>>> data sets. I think it makes sense to focus on order within data files, not
>>> order between data files.
>>
>>
>> This is where I'm interested in learning about the separation of
>> responsibilities for the data source and how "smart" it is supposed to be.
>>
>> For the first part, I would assume that given the estimated data size
>> from Spark and options passed in from the user, the data source could make
>> a more intelligent requirement on the write format than Spark
>> independently. Somewhat analogous to how the current FileSource does bin
>> packing of small files on the read side, restricting parallelism for the
>> sake of overhead.
>>
>> For the second, I wouldn't assume that a data source requiring a certain
>> write format would give any guarantees around reading the same data? In the
>> cases where it is a complete overwrite it would, but for independent writes
>> it could still be useful for statistics or compression.
>>
>> Thanks
>> Pat
>>
>>
>>
>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> How would Spark determine whether or not to apply a recommendation - a
>>> cost threshold?
>>>
>>> Spark would always apply the required clustering and sort order because
>>> they are required by the data source. It is reasonable for a source to
>>> reject data that isn’t properly prepared. For example, data must be written
>>> to HTable files with keys in order or else the files are invalid. Sorting
>>> should not be implemented in the sources themselves because Spark handles
>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>> why the interfaces start with “Requires”.
>>>
>>> I’m not sure what the second half of your question means. What does
>>> Spark need to pass into the data source?
>>>
>>> Should a datasource be able to provide a Distribution proper rather than
>>> just the clustering expressions? Two use cases would be for explicit global
>>> sorting of the dataset and attempting to ensure a minimum write task
>>> size/number of write tasks.
>>>
>>> Considering the second use case you mentioned first, I don’t think it is
>>> a good idea for a table to put requirements on the number of tasks used for
>>> a write. The parallelism should be set appropriately for the data volume,
>>> which is for Spark or the user to determine. A minimum or maximum number of
>>> tasks could cause bad behavior.
>>>
>>> That said, I think there is a related use case for sharding. But that’s
>>> really just a clustering by an expression with the shard calculation, e.g., 
>>> hash(id_col,
>>> 64). The shards should be handled as a cluster, but it doesn’t matter
>>> how many tasks are used for it.
>>>
>>> For your first use case, an explicit global ordering, the problem is
>>> that there can’t be an explicit global ordering for a table when it is
>>> populated by a series of independent writes. Each write could have a global
>>> order, but once those files are written, you have to deal with multiple
>>> sorted data sets. I think it makes sense to focus on order within data
>>> files, not order between data files.
>>> ​
>>>
>>> On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <patrick.woo...@gmail.com
>>> > wrote:
>>>
>>>> How would Spark determine whether or not to apply a recommendation - a
>>>> cost threshold? And yes, it would be good to flesh out what information we
>>>> get from Spark in the datasource when providing these
>>>> recommendations/requirements - I could see statistics and the existing
>>>> outputPartitioning/Ordering of the child plan being used for providing the
>>>> requirement.
>>>>
>>>> Should a datasource be able to provide a Distribution proper rather
>>>> than just the clustering expressions? Two use cases would be for explicit
>>>> global sorting of the dataset and attempting to ensure a minimum write task
>>>> size/number of write tasks.
>>>>
>>>>
>>>>
>>>> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <
>>>> russell.spit...@gmail.com> wrote:
>>>>
>>>>> Thanks for the clarification, definitely would want to require Sort
>>>>> but only recommend partitioning ...  I think that would be useful to
>>>>> request based on details about the incoming dataset.
>>>>>
>>>>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> A required clustering would not, but a required sort would.
>>>>>> Clustering is asking for the input dataframe's partitioning, and sorting
>>>>>> would be how each partition is sorted.
>>>>>>
>>>>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>>>>>> russell.spit...@gmail.com> wrote:
>>>>>>
>>>>>>> I forgot since it's been a while, but does Clustering support allow
>>>>>>> requesting that partitions contain elements in order as well? That 
>>>>>>> would be
>>>>>>> a useful trick for me. IE
>>>>>>> Request/Require(SortedOn(Col1))
>>>>>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>>>>>>
>>>>>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, it makes sense that the existing interface is for
>>>>>>>> aggregation and not joins. Why are there requirements for the number of
>>>>>>>> partitions that are returned then?
>>>>>>>>
>>>>>>>> Does it makes sense to design the write-side `Requirement` classes
>>>>>>>> and the read-side reporting separately?
>>>>>>>>
>>>>>>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>>>>>>>>> expose hash function, so Join can't benefit from this interface, as 
>>>>>>>>> Join
>>>>>>>>> doesn't require a general ClusteredDistribution, but a more specific 
>>>>>>>>> one
>>>>>>>>> called HashClusteredDistribution.
>>>>>>>>>
>>>>>>>>> So currently only Aggregate can benefit from
>>>>>>>>> SupportsReportPartitioning and save shuffle. We can add a new 
>>>>>>>>> interface to
>>>>>>>>> expose the hash function to make it work for Join.
>>>>>>>>>
>>>>>>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I just took a look at SupportsReportPartitioning and I'm not sure
>>>>>>>>>> that it will work for real use cases. It doesn't specify, as far as 
>>>>>>>>>> I can
>>>>>>>>>> tell, a hash function for combining clusters into tasks or a way to 
>>>>>>>>>> provide
>>>>>>>>>> Spark a hash function for the other side of a join. It seems 
>>>>>>>>>> unlikely to me
>>>>>>>>>> that many data sources would have partitioning that happens to match 
>>>>>>>>>> the
>>>>>>>>>> other side of a join. And, it looks like task order matters? Maybe 
>>>>>>>>>> I'm
>>>>>>>>>> missing something?
>>>>>>>>>>
>>>>>>>>>> I think that we should design the write side independently based
>>>>>>>>>> on what data stores actually need, and take a look at the read side 
>>>>>>>>>> based
>>>>>>>>>> on what data stores can actually provide. Wenchen, was there a 
>>>>>>>>>> design doc
>>>>>>>>>> for partitioning on the read path?
>>>>>>>>>>
>>>>>>>>>> I completely agree with your point about a global sort. We
>>>>>>>>>> recommend to all of our data engineers to add a sort to most tables 
>>>>>>>>>> because
>>>>>>>>>> it introduces the range partitioner and does a skew calculation, in
>>>>>>>>>> addition to making data filtering much better when it is read. It's 
>>>>>>>>>> really
>>>>>>>>>> common for tables to be skewed by partition values.
>>>>>>>>>>
>>>>>>>>>> rb
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Ryan, Ted, Wenchen
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the quick replies.
>>>>>>>>>>>
>>>>>>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have
>>>>>>>>>>> to ensure something similar to requiredChildDistribution in 
>>>>>>>>>>> SparkPlan where
>>>>>>>>>>> we have the number of partitions as well if we'd want to further 
>>>>>>>>>>> report to
>>>>>>>>>>> SupportsReportPartitioning, yeah?
>>>>>>>>>>>
>>>>>>>>>>> Specifying an explicit global sort can also be useful for
>>>>>>>>>>> filtering purposes on Parquet row group stats if we have a time 
>>>>>>>>>>> based/high
>>>>>>>>>>> cardinality ID field. If my datasource or catalog knows about 
>>>>>>>>>>> previous
>>>>>>>>>>> queries on a table, it could be really useful to recommend more 
>>>>>>>>>>> appropriate
>>>>>>>>>>> formatting for consumers on the next materialization. The same 
>>>>>>>>>>> would be
>>>>>>>>>>> true of clustering on commonly joined fields.
>>>>>>>>>>>
>>>>>>>>>>> Thanks again
>>>>>>>>>>> Pat
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hmm. Ryan seems to be right.
>>>>>>>>>>>>
>>>>>>>>>>>> Looking at sql/core/src/main/java/org/
>>>>>>>>>>>> apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
>>>>>>>>>>>> :
>>>>>>>>>>>>
>>>>>>>>>>>> import org.apache.spark.sql.sources.v2.reader.partitioning.
>>>>>>>>>>>> Partitioning;
>>>>>>>>>>>> ...
>>>>>>>>>>>>   Partitioning outputPartitioning();
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <
>>>>>>>>>>>> cloud0...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Actually clustering is already supported, please take a look
>>>>>>>>>>>>> at SupportsReportPartitioning
>>>>>>>>>>>>>
>>>>>>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan
>>>>>>>>>>>>> proposed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Interesting.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Should requiredClustering return a Set of Expression's ?
>>>>>>>>>>>>>> This way, we can determine the order of Expression's by
>>>>>>>>>>>>>> looking at what requiredOrdering() returns.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Pat,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for starting the discussion on this, we’re really
>>>>>>>>>>>>>>> interested in it as well. I don’t think there is a proposed API 
>>>>>>>>>>>>>>> yet, but I
>>>>>>>>>>>>>>> was thinking something like this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> interface RequiresClustering {
>>>>>>>>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> interface RequiresSort {
>>>>>>>>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The reason why RequiresClustering should provide Expression
>>>>>>>>>>>>>>> is that it needs to be able to customize the implementation. 
>>>>>>>>>>>>>>> For example,
>>>>>>>>>>>>>>> writing to HTable would require building a key (or the data for 
>>>>>>>>>>>>>>> a key) and
>>>>>>>>>>>>>>> that might use a hash function that differs from Spark’s 
>>>>>>>>>>>>>>> built-ins.
>>>>>>>>>>>>>>> RequiresSort is fairly straightforward, but the interaction
>>>>>>>>>>>>>>> between the two requirements deserves some consideration. To 
>>>>>>>>>>>>>>> make the two
>>>>>>>>>>>>>>> compatible, I think that RequiresSort must be interpreted
>>>>>>>>>>>>>>> as a sort within each partition of the clustering, but could 
>>>>>>>>>>>>>>> possibly be
>>>>>>>>>>>>>>> used for a global sort when the two overlap.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For example, if I have a table partitioned by “day” and
>>>>>>>>>>>>>>> “category” then the RequiredClustering would be by day,
>>>>>>>>>>>>>>> category. A required sort might be day ASC, category DESC,
>>>>>>>>>>>>>>> name ASC. Because that sort satisfies the required
>>>>>>>>>>>>>>> clustering, it could be used for a global ordering. But, is 
>>>>>>>>>>>>>>> that useful?
>>>>>>>>>>>>>>> How would the global ordering matter beyond a sort within each 
>>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>>> i.e., how would the partition’s place in the global ordering be 
>>>>>>>>>>>>>>> passed?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To your other questions, you might want to have a look at
>>>>>>>>>>>>>>> the recent SPIP I’m working on to consolidate and clean up
>>>>>>>>>>>>>>> logical plans
>>>>>>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that 
>>>>>>>>>>>>>>> should help
>>>>>>>>>>>>>>> clarify what validation needs to take place. As for custom 
>>>>>>>>>>>>>>> catalyst rules,
>>>>>>>>>>>>>>> I’d like to hear about the use cases to see if we can build it 
>>>>>>>>>>>>>>> into these
>>>>>>>>>>>>>>> improvements.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> rb
>>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>>>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hey all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes
>>>>>>>>>>>>>>>> that we might have the data source inform Spark of 
>>>>>>>>>>>>>>>> requirements for the
>>>>>>>>>>>>>>>> input data's ordering and partitioning. Has there been a 
>>>>>>>>>>>>>>>> proposed API for
>>>>>>>>>>>>>>>> that yet?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Even one level up it would be helpful to understand how I
>>>>>>>>>>>>>>>> should be thinking about the responsibility of the data source 
>>>>>>>>>>>>>>>> writer, when
>>>>>>>>>>>>>>>> I should be inserting a custom catalyst rule, and how I should 
>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>> validation/assumptions of the table before attempting the 
>>>>>>>>>>>>>>>> write.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>> Pat
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>> Netflix
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Ryan Blue
>>>>>>>>>> Software Engineer
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>

Reply via email to