I forgot since it's been a while, but does Clustering support allow requesting that partitions contain elements in order as well? That would be a useful trick for me. IE Request/Require(SortedOn(Col1)) Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid> wrote: > Thanks, it makes sense that the existing interface is for aggregation and > not joins. Why are there requirements for the number of partitions that are > returned then? > > Does it makes sense to design the write-side `Requirement` classes and the > read-side reporting separately? > > On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> wrote: > >> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't expose >> hash function, so Join can't benefit from this interface, as Join doesn't >> require a general ClusteredDistribution, but a more specific one >> called HashClusteredDistribution. >> >> So currently only Aggregate can benefit from SupportsReportPartitioning >> and save shuffle. We can add a new interface to expose the hash function to >> make it work for Join. >> >> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote: >> >>> I just took a look at SupportsReportPartitioning and I'm not sure that >>> it will work for real use cases. It doesn't specify, as far as I can tell, >>> a hash function for combining clusters into tasks or a way to provide Spark >>> a hash function for the other side of a join. It seems unlikely to me that >>> many data sources would have partitioning that happens to match the other >>> side of a join. And, it looks like task order matters? Maybe I'm missing >>> something? >>> >>> I think that we should design the write side independently based on what >>> data stores actually need, and take a look at the read side based on what >>> data stores can actually provide. Wenchen, was there a design doc for >>> partitioning on the read path? >>> >>> I completely agree with your point about a global sort. We recommend to >>> all of our data engineers to add a sort to most tables because it >>> introduces the range partitioner and does a skew calculation, in addition >>> to making data filtering much better when it is read. It's really common >>> for tables to be skewed by partition values. >>> >>> rb >>> >>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <patrick.woo...@gmail.com >>> > wrote: >>> >>>> Hey Ryan, Ted, Wenchen >>>> >>>> Thanks for the quick replies. >>>> >>>> @Ryan - the sorting portion makes sense, but I think we'd have to >>>> ensure something similar to requiredChildDistribution in SparkPlan where we >>>> have the number of partitions as well if we'd want to further report to >>>> SupportsReportPartitioning, yeah? >>>> >>>> Specifying an explicit global sort can also be useful for filtering >>>> purposes on Parquet row group stats if we have a time based/high >>>> cardinality ID field. If my datasource or catalog knows about previous >>>> queries on a table, it could be really useful to recommend more appropriate >>>> formatting for consumers on the next materialization. The same would be >>>> true of clustering on commonly joined fields. >>>> >>>> Thanks again >>>> Pat >>>> >>>> >>>> >>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Hmm. Ryan seems to be right. >>>>> >>>>> Looking >>>>> at >>>>> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java >>>>> : >>>>> >>>>> import >>>>> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; >>>>> ... >>>>> Partitioning outputPartitioning(); >>>>> >>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com> >>>>> wrote: >>>>> >>>>>> Actually clustering is already supported, please take a look at >>>>>> SupportsReportPartitioning >>>>>> >>>>>> Ordering is not proposed yet, might be similar to what Ryan proposed. >>>>>> >>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> Interesting. >>>>>>> >>>>>>> Should requiredClustering return a Set of Expression's ? >>>>>>> This way, we can determine the order of Expression's by looking at >>>>>>> what requiredOrdering() returns. >>>>>>> >>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue < >>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>> >>>>>>>> Hi Pat, >>>>>>>> >>>>>>>> Thanks for starting the discussion on this, we’re really interested >>>>>>>> in it as well. I don’t think there is a proposed API yet, but I was >>>>>>>> thinking something like this: >>>>>>>> >>>>>>>> interface RequiresClustering { >>>>>>>> List<Expression> requiredClustering(); >>>>>>>> } >>>>>>>> >>>>>>>> interface RequiresSort { >>>>>>>> List<SortOrder> requiredOrdering(); >>>>>>>> } >>>>>>>> >>>>>>>> The reason why RequiresClustering should provide Expression is >>>>>>>> that it needs to be able to customize the implementation. For example, >>>>>>>> writing to HTable would require building a key (or the data for a key) >>>>>>>> and >>>>>>>> that might use a hash function that differs from Spark’s built-ins. >>>>>>>> RequiresSort is fairly straightforward, but the interaction >>>>>>>> between the two requirements deserves some consideration. To make the >>>>>>>> two >>>>>>>> compatible, I think that RequiresSort must be interpreted as a >>>>>>>> sort within each partition of the clustering, but could possibly be >>>>>>>> used >>>>>>>> for a global sort when the two overlap. >>>>>>>> >>>>>>>> For example, if I have a table partitioned by “day” and “category” >>>>>>>> then the RequiredClustering would be by day, category. A required >>>>>>>> sort might be day ASC, category DESC, name ASC. Because that sort >>>>>>>> satisfies the required clustering, it could be used for a global >>>>>>>> ordering. >>>>>>>> But, is that useful? How would the global ordering matter beyond a sort >>>>>>>> within each partition, i.e., how would the partition’s place in the >>>>>>>> global >>>>>>>> ordering be passed? >>>>>>>> >>>>>>>> To your other questions, you might want to have a look at the >>>>>>>> recent SPIP I’m working on to consolidate and clean up logical >>>>>>>> plans >>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>. >>>>>>>> That proposes more specific uses for the DataSourceV2 API that should >>>>>>>> help >>>>>>>> clarify what validation needs to take place. As for custom catalyst >>>>>>>> rules, >>>>>>>> I’d like to hear about the use cases to see if we can build it into >>>>>>>> these >>>>>>>> improvements. >>>>>>>> >>>>>>>> rb >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody < >>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hey all, >>>>>>>>> >>>>>>>>> I saw in some of the discussions around DataSourceV2 writes that >>>>>>>>> we might have the data source inform Spark of requirements for the >>>>>>>>> input >>>>>>>>> data's ordering and partitioning. Has there been a proposed API for >>>>>>>>> that >>>>>>>>> yet? >>>>>>>>> >>>>>>>>> Even one level up it would be helpful to understand how I should >>>>>>>>> be thinking about the responsibility of the data source writer, when I >>>>>>>>> should be inserting a custom catalyst rule, and how I should handle >>>>>>>>> validation/assumptions of the table before attempting the write. >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> Pat >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Ryan Blue >>>>>>>> Software Engineer >>>>>>>> Netflix >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> >> > > > -- > Ryan Blue > Software Engineer > Netflix >