I forgot since it's been a while, but does Clustering support allow
requesting that partitions contain elements in order as well? That would be
a useful trick for me. IE
Request/Require(SortedOn(Col1))
Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid> wrote:

> Thanks, it makes sense that the existing interface is for aggregation and
> not joins. Why are there requirements for the number of partitions that are
> returned then?
>
> Does it makes sense to design the write-side `Requirement` classes and the
> read-side reporting separately?
>
> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose
>> hash function, so Join can't benefit from this interface, as Join doesn't
>> require a general ClusteredDistribution, but a more specific one
>> called HashClusteredDistribution.
>>
>> So currently only Aggregate can benefit from SupportsReportPartitioning
>> and save shuffle. We can add a new interface to expose the hash function to
>> make it work for Join.
>>
>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> I just took a look at SupportsReportPartitioning and I'm not sure that
>>> it will work for real use cases. It doesn't specify, as far as I can tell,
>>> a hash function for combining clusters into tasks or a way to provide Spark
>>> a hash function for the other side of a join. It seems unlikely to me that
>>> many data sources would have partitioning that happens to match the other
>>> side of a join. And, it looks like task order matters? Maybe I'm missing
>>> something?
>>>
>>> I think that we should design the write side independently based on what
>>> data stores actually need, and take a look at the read side based on what
>>> data stores can actually provide. Wenchen, was there a design doc for
>>> partitioning on the read path?
>>>
>>> I completely agree with your point about a global sort. We recommend to
>>> all of our data engineers to add a sort to most tables because it
>>> introduces the range partitioner and does a skew calculation, in addition
>>> to making data filtering much better when it is read. It's really common
>>> for tables to be skewed by partition values.
>>>
>>> rb
>>>
>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <patrick.woo...@gmail.com
>>> > wrote:
>>>
>>>> Hey Ryan, Ted, Wenchen
>>>>
>>>> Thanks for the quick replies.
>>>>
>>>> @Ryan - the sorting portion makes sense, but I think we'd have to
>>>> ensure something similar to requiredChildDistribution in SparkPlan where we
>>>> have the number of partitions as well if we'd want to further report to
>>>> SupportsReportPartitioning, yeah?
>>>>
>>>> Specifying an explicit global sort can also be useful for filtering
>>>> purposes on Parquet row group stats if we have a time based/high
>>>> cardinality ID field. If my datasource or catalog knows about previous
>>>> queries on a table, it could be really useful to recommend more appropriate
>>>> formatting for consumers on the next materialization. The same would be
>>>> true of clustering on commonly joined fields.
>>>>
>>>> Thanks again
>>>> Pat
>>>>
>>>>
>>>>
>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Hmm. Ryan seems to be right.
>>>>>
>>>>> Looking
>>>>> at 
>>>>> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
>>>>> :
>>>>>
>>>>> import
>>>>> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>>>>> ...
>>>>>   Partitioning outputPartitioning();
>>>>>
>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Actually clustering is already supported, please take a look at
>>>>>> SupportsReportPartitioning
>>>>>>
>>>>>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>>>>>
>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Interesting.
>>>>>>>
>>>>>>> Should requiredClustering return a Set of Expression's ?
>>>>>>> This way, we can determine the order of Expression's by looking at
>>>>>>> what requiredOrdering() returns.
>>>>>>>
>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>
>>>>>>>> Hi Pat,
>>>>>>>>
>>>>>>>> Thanks for starting the discussion on this, we’re really interested
>>>>>>>> in it as well. I don’t think there is a proposed API yet, but I was
>>>>>>>> thinking something like this:
>>>>>>>>
>>>>>>>> interface RequiresClustering {
>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>> }
>>>>>>>>
>>>>>>>> interface RequiresSort {
>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>> }
>>>>>>>>
>>>>>>>> The reason why RequiresClustering should provide Expression is
>>>>>>>> that it needs to be able to customize the implementation. For example,
>>>>>>>> writing to HTable would require building a key (or the data for a key) 
>>>>>>>> and
>>>>>>>> that might use a hash function that differs from Spark’s built-ins.
>>>>>>>> RequiresSort is fairly straightforward, but the interaction
>>>>>>>> between the two requirements deserves some consideration. To make the 
>>>>>>>> two
>>>>>>>> compatible, I think that RequiresSort must be interpreted as a
>>>>>>>> sort within each partition of the clustering, but could possibly be 
>>>>>>>> used
>>>>>>>> for a global sort when the two overlap.
>>>>>>>>
>>>>>>>> For example, if I have a table partitioned by “day” and “category”
>>>>>>>> then the RequiredClustering would be by day, category. A required
>>>>>>>> sort might be day ASC, category DESC, name ASC. Because that sort
>>>>>>>> satisfies the required clustering, it could be used for a global 
>>>>>>>> ordering.
>>>>>>>> But, is that useful? How would the global ordering matter beyond a sort
>>>>>>>> within each partition, i.e., how would the partition’s place in the 
>>>>>>>> global
>>>>>>>> ordering be passed?
>>>>>>>>
>>>>>>>> To your other questions, you might want to have a look at the
>>>>>>>> recent SPIP I’m working on to consolidate and clean up logical
>>>>>>>> plans
>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>> That proposes more specific uses for the DataSourceV2 API that should 
>>>>>>>> help
>>>>>>>> clarify what validation needs to take place. As for custom catalyst 
>>>>>>>> rules,
>>>>>>>> I’d like to hear about the use cases to see if we can build it into 
>>>>>>>> these
>>>>>>>> improvements.
>>>>>>>>
>>>>>>>> rb
>>>>>>>> ​
>>>>>>>>
>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hey all,
>>>>>>>>>
>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes that
>>>>>>>>> we might have the data source inform Spark of requirements for the 
>>>>>>>>> input
>>>>>>>>> data's ordering and partitioning. Has there been a proposed API for 
>>>>>>>>> that
>>>>>>>>> yet?
>>>>>>>>>
>>>>>>>>> Even one level up it would be helpful to understand how I should
>>>>>>>>> be thinking about the responsibility of the data source writer, when I
>>>>>>>>> should be inserting a custom catalyst rule, and how I should handle
>>>>>>>>> validation/assumptions of the table before attempting the write.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Pat
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to