A required clustering would not, but a required sort would. Clustering is
asking for the input dataframe's partitioning, and sorting would be how
each partition is sorted.

On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <russell.spit...@gmail.com>
wrote:

> I forgot since it's been a while, but does Clustering support allow
> requesting that partitions contain elements in order as well? That would be
> a useful trick for me. IE
> Request/Require(SortedOn(Col1))
> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>
> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Thanks, it makes sense that the existing interface is for aggregation and
>> not joins. Why are there requirements for the number of partitions that are
>> returned then?
>>
>> Does it makes sense to design the write-side `Requirement` classes and
>> the read-side reporting separately?
>>
>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> wrote:
>>
>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>>> expose hash function, so Join can't benefit from this interface, as Join
>>> doesn't require a general ClusteredDistribution, but a more specific one
>>> called HashClusteredDistribution.
>>>
>>> So currently only Aggregate can benefit from SupportsReportPartitioning
>>> and save shuffle. We can add a new interface to expose the hash function to
>>> make it work for Join.
>>>
>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> I just took a look at SupportsReportPartitioning and I'm not sure that
>>>> it will work for real use cases. It doesn't specify, as far as I can tell,
>>>> a hash function for combining clusters into tasks or a way to provide Spark
>>>> a hash function for the other side of a join. It seems unlikely to me that
>>>> many data sources would have partitioning that happens to match the other
>>>> side of a join. And, it looks like task order matters? Maybe I'm missing
>>>> something?
>>>>
>>>> I think that we should design the write side independently based on
>>>> what data stores actually need, and take a look at the read side based on
>>>> what data stores can actually provide. Wenchen, was there a design doc for
>>>> partitioning on the read path?
>>>>
>>>> I completely agree with your point about a global sort. We recommend to
>>>> all of our data engineers to add a sort to most tables because it
>>>> introduces the range partitioner and does a skew calculation, in addition
>>>> to making data filtering much better when it is read. It's really common
>>>> for tables to be skewed by partition values.
>>>>
>>>> rb
>>>>
>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>>>> patrick.woo...@gmail.com> wrote:
>>>>
>>>>> Hey Ryan, Ted, Wenchen
>>>>>
>>>>> Thanks for the quick replies.
>>>>>
>>>>> @Ryan - the sorting portion makes sense, but I think we'd have to
>>>>> ensure something similar to requiredChildDistribution in SparkPlan where 
>>>>> we
>>>>> have the number of partitions as well if we'd want to further report to
>>>>> SupportsReportPartitioning, yeah?
>>>>>
>>>>> Specifying an explicit global sort can also be useful for filtering
>>>>> purposes on Parquet row group stats if we have a time based/high
>>>>> cardinality ID field. If my datasource or catalog knows about previous
>>>>> queries on a table, it could be really useful to recommend more 
>>>>> appropriate
>>>>> formatting for consumers on the next materialization. The same would be
>>>>> true of clustering on commonly joined fields.
>>>>>
>>>>> Thanks again
>>>>> Pat
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Hmm. Ryan seems to be right.
>>>>>>
>>>>>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/
>>>>>> reader/SupportsReportPartitioning.java :
>>>>>>
>>>>>> import org.apache.spark.sql.sources.v2.reader.partitioning.
>>>>>> Partitioning;
>>>>>> ...
>>>>>>   Partitioning outputPartitioning();
>>>>>>
>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Actually clustering is already supported, please take a look at
>>>>>>> SupportsReportPartitioning
>>>>>>>
>>>>>>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>>>>>>
>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Interesting.
>>>>>>>>
>>>>>>>> Should requiredClustering return a Set of Expression's ?
>>>>>>>> This way, we can determine the order of Expression's by looking at
>>>>>>>> what requiredOrdering() returns.
>>>>>>>>
>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>
>>>>>>>>> Hi Pat,
>>>>>>>>>
>>>>>>>>> Thanks for starting the discussion on this, we’re really
>>>>>>>>> interested in it as well. I don’t think there is a proposed API yet, 
>>>>>>>>> but I
>>>>>>>>> was thinking something like this:
>>>>>>>>>
>>>>>>>>> interface RequiresClustering {
>>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> interface RequiresSort {
>>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> The reason why RequiresClustering should provide Expression is
>>>>>>>>> that it needs to be able to customize the implementation. For example,
>>>>>>>>> writing to HTable would require building a key (or the data for a 
>>>>>>>>> key) and
>>>>>>>>> that might use a hash function that differs from Spark’s built-ins.
>>>>>>>>> RequiresSort is fairly straightforward, but the interaction
>>>>>>>>> between the two requirements deserves some consideration. To make the 
>>>>>>>>> two
>>>>>>>>> compatible, I think that RequiresSort must be interpreted as a
>>>>>>>>> sort within each partition of the clustering, but could possibly be 
>>>>>>>>> used
>>>>>>>>> for a global sort when the two overlap.
>>>>>>>>>
>>>>>>>>> For example, if I have a table partitioned by “day” and “category”
>>>>>>>>> then the RequiredClustering would be by day, category. A required
>>>>>>>>> sort might be day ASC, category DESC, name ASC. Because that sort
>>>>>>>>> satisfies the required clustering, it could be used for a global 
>>>>>>>>> ordering.
>>>>>>>>> But, is that useful? How would the global ordering matter beyond a 
>>>>>>>>> sort
>>>>>>>>> within each partition, i.e., how would the partition’s place in the 
>>>>>>>>> global
>>>>>>>>> ordering be passed?
>>>>>>>>>
>>>>>>>>> To your other questions, you might want to have a look at the
>>>>>>>>> recent SPIP I’m working on to consolidate and clean up logical
>>>>>>>>> plans
>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that should 
>>>>>>>>> help
>>>>>>>>> clarify what validation needs to take place. As for custom catalyst 
>>>>>>>>> rules,
>>>>>>>>> I’d like to hear about the use cases to see if we can build it into 
>>>>>>>>> these
>>>>>>>>> improvements.
>>>>>>>>>
>>>>>>>>> rb
>>>>>>>>> ​
>>>>>>>>>
>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey all,
>>>>>>>>>>
>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes that
>>>>>>>>>> we might have the data source inform Spark of requirements for the 
>>>>>>>>>> input
>>>>>>>>>> data's ordering and partitioning. Has there been a proposed API for 
>>>>>>>>>> that
>>>>>>>>>> yet?
>>>>>>>>>>
>>>>>>>>>> Even one level up it would be helpful to understand how I should
>>>>>>>>>> be thinking about the responsibility of the data source writer, when 
>>>>>>>>>> I
>>>>>>>>>> should be inserting a custom catalyst rule, and how I should handle
>>>>>>>>>> validation/assumptions of the table before attempting the write.
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>> Pat
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to