A required clustering would not, but a required sort would. Clustering is asking for the input dataframe's partitioning, and sorting would be how each partition is sorted.
On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <russell.spit...@gmail.com> wrote: > I forgot since it's been a while, but does Clustering support allow > requesting that partitions contain elements in order as well? That would be > a useful trick for me. IE > Request/Require(SortedOn(Col1)) > Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2)) > > On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid> > wrote: > >> Thanks, it makes sense that the existing interface is for aggregation and >> not joins. Why are there requirements for the number of partitions that are >> returned then? >> >> Does it makes sense to design the write-side `Requirement` classes and >> the read-side reporting separately? >> >> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> wrote: >> >>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't >>> expose hash function, so Join can't benefit from this interface, as Join >>> doesn't require a general ClusteredDistribution, but a more specific one >>> called HashClusteredDistribution. >>> >>> So currently only Aggregate can benefit from SupportsReportPartitioning >>> and save shuffle. We can add a new interface to expose the hash function to >>> make it work for Join. >>> >>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote: >>> >>>> I just took a look at SupportsReportPartitioning and I'm not sure that >>>> it will work for real use cases. It doesn't specify, as far as I can tell, >>>> a hash function for combining clusters into tasks or a way to provide Spark >>>> a hash function for the other side of a join. It seems unlikely to me that >>>> many data sources would have partitioning that happens to match the other >>>> side of a join. And, it looks like task order matters? Maybe I'm missing >>>> something? >>>> >>>> I think that we should design the write side independently based on >>>> what data stores actually need, and take a look at the read side based on >>>> what data stores can actually provide. Wenchen, was there a design doc for >>>> partitioning on the read path? >>>> >>>> I completely agree with your point about a global sort. We recommend to >>>> all of our data engineers to add a sort to most tables because it >>>> introduces the range partitioner and does a skew calculation, in addition >>>> to making data filtering much better when it is read. It's really common >>>> for tables to be skewed by partition values. >>>> >>>> rb >>>> >>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody < >>>> patrick.woo...@gmail.com> wrote: >>>> >>>>> Hey Ryan, Ted, Wenchen >>>>> >>>>> Thanks for the quick replies. >>>>> >>>>> @Ryan - the sorting portion makes sense, but I think we'd have to >>>>> ensure something similar to requiredChildDistribution in SparkPlan where >>>>> we >>>>> have the number of partitions as well if we'd want to further report to >>>>> SupportsReportPartitioning, yeah? >>>>> >>>>> Specifying an explicit global sort can also be useful for filtering >>>>> purposes on Parquet row group stats if we have a time based/high >>>>> cardinality ID field. If my datasource or catalog knows about previous >>>>> queries on a table, it could be really useful to recommend more >>>>> appropriate >>>>> formatting for consumers on the next materialization. The same would be >>>>> true of clustering on commonly joined fields. >>>>> >>>>> Thanks again >>>>> Pat >>>>> >>>>> >>>>> >>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Hmm. Ryan seems to be right. >>>>>> >>>>>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/ >>>>>> reader/SupportsReportPartitioning.java : >>>>>> >>>>>> import org.apache.spark.sql.sources.v2.reader.partitioning. >>>>>> Partitioning; >>>>>> ... >>>>>> Partitioning outputPartitioning(); >>>>>> >>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Actually clustering is already supported, please take a look at >>>>>>> SupportsReportPartitioning >>>>>>> >>>>>>> Ordering is not proposed yet, might be similar to what Ryan proposed. >>>>>>> >>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>>> Interesting. >>>>>>>> >>>>>>>> Should requiredClustering return a Set of Expression's ? >>>>>>>> This way, we can determine the order of Expression's by looking at >>>>>>>> what requiredOrdering() returns. >>>>>>>> >>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue < >>>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>>> >>>>>>>>> Hi Pat, >>>>>>>>> >>>>>>>>> Thanks for starting the discussion on this, we’re really >>>>>>>>> interested in it as well. I don’t think there is a proposed API yet, >>>>>>>>> but I >>>>>>>>> was thinking something like this: >>>>>>>>> >>>>>>>>> interface RequiresClustering { >>>>>>>>> List<Expression> requiredClustering(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> interface RequiresSort { >>>>>>>>> List<SortOrder> requiredOrdering(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> The reason why RequiresClustering should provide Expression is >>>>>>>>> that it needs to be able to customize the implementation. For example, >>>>>>>>> writing to HTable would require building a key (or the data for a >>>>>>>>> key) and >>>>>>>>> that might use a hash function that differs from Spark’s built-ins. >>>>>>>>> RequiresSort is fairly straightforward, but the interaction >>>>>>>>> between the two requirements deserves some consideration. To make the >>>>>>>>> two >>>>>>>>> compatible, I think that RequiresSort must be interpreted as a >>>>>>>>> sort within each partition of the clustering, but could possibly be >>>>>>>>> used >>>>>>>>> for a global sort when the two overlap. >>>>>>>>> >>>>>>>>> For example, if I have a table partitioned by “day” and “category” >>>>>>>>> then the RequiredClustering would be by day, category. A required >>>>>>>>> sort might be day ASC, category DESC, name ASC. Because that sort >>>>>>>>> satisfies the required clustering, it could be used for a global >>>>>>>>> ordering. >>>>>>>>> But, is that useful? How would the global ordering matter beyond a >>>>>>>>> sort >>>>>>>>> within each partition, i.e., how would the partition’s place in the >>>>>>>>> global >>>>>>>>> ordering be passed? >>>>>>>>> >>>>>>>>> To your other questions, you might want to have a look at the >>>>>>>>> recent SPIP I’m working on to consolidate and clean up logical >>>>>>>>> plans >>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>. >>>>>>>>> That proposes more specific uses for the DataSourceV2 API that should >>>>>>>>> help >>>>>>>>> clarify what validation needs to take place. As for custom catalyst >>>>>>>>> rules, >>>>>>>>> I’d like to hear about the use cases to see if we can build it into >>>>>>>>> these >>>>>>>>> improvements. >>>>>>>>> >>>>>>>>> rb >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody < >>>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hey all, >>>>>>>>>> >>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes that >>>>>>>>>> we might have the data source inform Spark of requirements for the >>>>>>>>>> input >>>>>>>>>> data's ordering and partitioning. Has there been a proposed API for >>>>>>>>>> that >>>>>>>>>> yet? >>>>>>>>>> >>>>>>>>>> Even one level up it would be helpful to understand how I should >>>>>>>>>> be thinking about the responsibility of the data source writer, when >>>>>>>>>> I >>>>>>>>>> should be inserting a custom catalyst rule, and how I should handle >>>>>>>>>> validation/assumptions of the table before attempting the write. >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> Pat >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Ryan Blue >>>>>>>>> Software Engineer >>>>>>>>> Netflix >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix