Ah yeah sorry I got a bit mixed up. On Wed, Mar 28, 2018 at 7:54 PM Ted Yu <yuzhih...@gmail.com> wrote:
> bq. this shuffle could outweigh the benefits of the organized data if the > cardinality is lower. > > I wonder if you meant higher in place of the last word above. > > Cheers > > On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer < > russell.spit...@gmail.com> wrote: > >> For added color, one thing that I may want to consider as a data source >> implementer is the cost / benefit of applying a particular clustering. For >> example, a dataset with low cardinality in the clustering key could benefit >> greatly from clustering on that key before writing to Cassandra since >> Cassandra can benefit from these sorts of batching. But the cost of >> performing this shuffle could outweigh the benefits of the organized data >> if the cardinality is lower. >> >> I imagine other sources might have similar benefit calculations. Doing a >> particular sort or clustering can provide increased throughput but only in >> certain situations based on some facts about the data. >> >> >> For a concrete example here. >> >> Cassandra can insert records with the same partition-key faster if they >> arrive in the same payload. But this is only beneficial if the incoming >> dataset has multiple entries for the same partition key. If the incoming >> source does not have any duplicates then there is no benefit to requiring a >> sort or partitioning. >> >> On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody <patrick.woo...@gmail.com> >> wrote: >> >>> Spark would always apply the required clustering and sort order because >>>> they are required by the data source. It is reasonable for a source to >>>> reject data that isn’t properly prepared. For example, data must be written >>>> to HTable files with keys in order or else the files are invalid. Sorting >>>> should not be implemented in the sources themselves because Spark handles >>>> concerns like spilling to disk. Spark must prepare data correctly, which is >>>> why the interfaces start with “Requires”. >>> >>> >>> This was in reference to Russell's suggestion that the data source could >>> have a required sort, but only a recommended partitioning. I don't have an >>> immediate recommending use case that would come to mind though. I'm >>> definitely in sync that the data source itself shouldn't do work outside of >>> the writes themselves. >>> >>> >>> Considering the second use case you mentioned first, I don’t think it is >>>> a good idea for a table to put requirements on the number of tasks used for >>>> a write. The parallelism should be set appropriately for the data volume, >>>> which is for Spark or the user to determine. A minimum or maximum number of >>>> tasks could cause bad behavior. >>> >>> For your first use case, an explicit global ordering, the problem is >>>> that there can’t be an explicit global ordering for a table when it is >>>> populated by a series of independent writes. Each write could have a global >>>> order, but once those files are written, you have to deal with multiple >>>> sorted data sets. I think it makes sense to focus on order within data >>>> files, not order between data files. >>> >>> >>> This is where I'm interested in learning about the separation of >>> responsibilities for the data source and how "smart" it is supposed to be. >>> >>> For the first part, I would assume that given the estimated data size >>> from Spark and options passed in from the user, the data source could make >>> a more intelligent requirement on the write format than Spark >>> independently. Somewhat analogous to how the current FileSource does bin >>> packing of small files on the read side, restricting parallelism for the >>> sake of overhead. >>> >>> For the second, I wouldn't assume that a data source requiring a certain >>> write format would give any guarantees around reading the same data? In the >>> cases where it is a complete overwrite it would, but for independent writes >>> it could still be useful for statistics or compression. >>> >>> Thanks >>> Pat >>> >>> >>> >>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> wrote: >>> >>>> How would Spark determine whether or not to apply a recommendation - a >>>> cost threshold? >>>> >>>> Spark would always apply the required clustering and sort order because >>>> they are required by the data source. It is reasonable for a source to >>>> reject data that isn’t properly prepared. For example, data must be written >>>> to HTable files with keys in order or else the files are invalid. Sorting >>>> should not be implemented in the sources themselves because Spark handles >>>> concerns like spilling to disk. Spark must prepare data correctly, which is >>>> why the interfaces start with “Requires”. >>>> >>>> I’m not sure what the second half of your question means. What does >>>> Spark need to pass into the data source? >>>> >>>> Should a datasource be able to provide a Distribution proper rather >>>> than just the clustering expressions? Two use cases would be for explicit >>>> global sorting of the dataset and attempting to ensure a minimum write task >>>> size/number of write tasks. >>>> >>>> Considering the second use case you mentioned first, I don’t think it >>>> is a good idea for a table to put requirements on the number of tasks used >>>> for a write. The parallelism should be set appropriately for the data >>>> volume, which is for Spark or the user to determine. A minimum or maximum >>>> number of tasks could cause bad behavior. >>>> >>>> That said, I think there is a related use case for sharding. But that’s >>>> really just a clustering by an expression with the shard calculation, >>>> e.g., hash(id_col, >>>> 64). The shards should be handled as a cluster, but it doesn’t matter >>>> how many tasks are used for it. >>>> >>>> For your first use case, an explicit global ordering, the problem is >>>> that there can’t be an explicit global ordering for a table when it is >>>> populated by a series of independent writes. Each write could have a global >>>> order, but once those files are written, you have to deal with multiple >>>> sorted data sets. I think it makes sense to focus on order within data >>>> files, not order between data files. >>>> >>>> >>>> On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody < >>>> patrick.woo...@gmail.com> wrote: >>>> >>>>> How would Spark determine whether or not to apply a recommendation - a >>>>> cost threshold? And yes, it would be good to flesh out what information we >>>>> get from Spark in the datasource when providing these >>>>> recommendations/requirements - I could see statistics and the existing >>>>> outputPartitioning/Ordering of the child plan being used for providing the >>>>> requirement. >>>>> >>>>> Should a datasource be able to provide a Distribution proper rather >>>>> than just the clustering expressions? Two use cases would be for explicit >>>>> global sorting of the dataset and attempting to ensure a minimum write >>>>> task >>>>> size/number of write tasks. >>>>> >>>>> >>>>> >>>>> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer < >>>>> russell.spit...@gmail.com> wrote: >>>>> >>>>>> Thanks for the clarification, definitely would want to require Sort >>>>>> but only recommend partitioning ... I think that would be useful to >>>>>> request based on details about the incoming dataset. >>>>>> >>>>>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote: >>>>>> >>>>>>> A required clustering would not, but a required sort would. >>>>>>> Clustering is asking for the input dataframe's partitioning, and sorting >>>>>>> would be how each partition is sorted. >>>>>>> >>>>>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer < >>>>>>> russell.spit...@gmail.com> wrote: >>>>>>> >>>>>>>> I forgot since it's been a while, but does Clustering support allow >>>>>>>> requesting that partitions contain elements in order as well? That >>>>>>>> would be >>>>>>>> a useful trick for me. IE >>>>>>>> Request/Require(SortedOn(Col1)) >>>>>>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2)) >>>>>>>> >>>>>>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks, it makes sense that the existing interface is for >>>>>>>>> aggregation and not joins. Why are there requirements for the number >>>>>>>>> of >>>>>>>>> partitions that are returned then? >>>>>>>>> >>>>>>>>> Does it makes sense to design the write-side `Requirement` classes >>>>>>>>> and the read-side reporting separately? >>>>>>>>> >>>>>>>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Ryan, yea you are right that SupportsReportPartitioning >>>>>>>>>> doesn't expose hash function, so Join can't benefit from this >>>>>>>>>> interface, as >>>>>>>>>> Join doesn't require a general ClusteredDistribution, but a more >>>>>>>>>> specific >>>>>>>>>> one called HashClusteredDistribution. >>>>>>>>>> >>>>>>>>>> So currently only Aggregate can benefit from >>>>>>>>>> SupportsReportPartitioning and save shuffle. We can add a new >>>>>>>>>> interface to >>>>>>>>>> expose the hash function to make it work for Join. >>>>>>>>>> >>>>>>>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I just took a look at SupportsReportPartitioning and I'm not >>>>>>>>>>> sure that it will work for real use cases. It doesn't specify, as >>>>>>>>>>> far as I >>>>>>>>>>> can tell, a hash function for combining clusters into tasks or a >>>>>>>>>>> way to >>>>>>>>>>> provide Spark a hash function for the other side of a join. It seems >>>>>>>>>>> unlikely to me that many data sources would have partitioning that >>>>>>>>>>> happens >>>>>>>>>>> to match the other side of a join. And, it looks like task order >>>>>>>>>>> matters? >>>>>>>>>>> Maybe I'm missing something? >>>>>>>>>>> >>>>>>>>>>> I think that we should design the write side independently based >>>>>>>>>>> on what data stores actually need, and take a look at the read side >>>>>>>>>>> based >>>>>>>>>>> on what data stores can actually provide. Wenchen, was there a >>>>>>>>>>> design doc >>>>>>>>>>> for partitioning on the read path? >>>>>>>>>>> >>>>>>>>>>> I completely agree with your point about a global sort. We >>>>>>>>>>> recommend to all of our data engineers to add a sort to most tables >>>>>>>>>>> because >>>>>>>>>>> it introduces the range partitioner and does a skew calculation, in >>>>>>>>>>> addition to making data filtering much better when it is read. It's >>>>>>>>>>> really >>>>>>>>>>> common for tables to be skewed by partition values. >>>>>>>>>>> >>>>>>>>>>> rb >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody < >>>>>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey Ryan, Ted, Wenchen >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the quick replies. >>>>>>>>>>>> >>>>>>>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have >>>>>>>>>>>> to ensure something similar to requiredChildDistribution in >>>>>>>>>>>> SparkPlan where >>>>>>>>>>>> we have the number of partitions as well if we'd want to further >>>>>>>>>>>> report to >>>>>>>>>>>> SupportsReportPartitioning, yeah? >>>>>>>>>>>> >>>>>>>>>>>> Specifying an explicit global sort can also be useful for >>>>>>>>>>>> filtering purposes on Parquet row group stats if we have a time >>>>>>>>>>>> based/high >>>>>>>>>>>> cardinality ID field. If my datasource or catalog knows about >>>>>>>>>>>> previous >>>>>>>>>>>> queries on a table, it could be really useful to recommend more >>>>>>>>>>>> appropriate >>>>>>>>>>>> formatting for consumers on the next materialization. The same >>>>>>>>>>>> would be >>>>>>>>>>>> true of clustering on commonly joined fields. >>>>>>>>>>>> >>>>>>>>>>>> Thanks again >>>>>>>>>>>> Pat >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hmm. Ryan seems to be right. >>>>>>>>>>>>> >>>>>>>>>>>>> Looking >>>>>>>>>>>>> at >>>>>>>>>>>>> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java >>>>>>>>>>>>> : >>>>>>>>>>>>> >>>>>>>>>>>>> import >>>>>>>>>>>>> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; >>>>>>>>>>>>> ... >>>>>>>>>>>>> Partitioning outputPartitioning(); >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan < >>>>>>>>>>>>> cloud0...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Actually clustering is already supported, please take a look >>>>>>>>>>>>>> at SupportsReportPartitioning >>>>>>>>>>>>>> >>>>>>>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan >>>>>>>>>>>>>> proposed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Interesting. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Should requiredClustering return a Set of Expression's ? >>>>>>>>>>>>>>> This way, we can determine the order of Expression's by >>>>>>>>>>>>>>> looking at what requiredOrdering() returns. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue < >>>>>>>>>>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Pat, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for starting the discussion on this, we’re really >>>>>>>>>>>>>>>> interested in it as well. I don’t think there is a proposed >>>>>>>>>>>>>>>> API yet, but I >>>>>>>>>>>>>>>> was thinking something like this: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> interface RequiresClustering { >>>>>>>>>>>>>>>> List<Expression> requiredClustering(); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> interface RequiresSort { >>>>>>>>>>>>>>>> List<SortOrder> requiredOrdering(); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The reason why RequiresClustering should provide Expression >>>>>>>>>>>>>>>> is that it needs to be able to customize the implementation. >>>>>>>>>>>>>>>> For example, >>>>>>>>>>>>>>>> writing to HTable would require building a key (or the data >>>>>>>>>>>>>>>> for a key) and >>>>>>>>>>>>>>>> that might use a hash function that differs from Spark’s >>>>>>>>>>>>>>>> built-ins. >>>>>>>>>>>>>>>> RequiresSort is fairly straightforward, but the >>>>>>>>>>>>>>>> interaction between the two requirements deserves some >>>>>>>>>>>>>>>> consideration. To >>>>>>>>>>>>>>>> make the two compatible, I think that RequiresSort must be >>>>>>>>>>>>>>>> interpreted as a sort within each partition of the clustering, >>>>>>>>>>>>>>>> but could >>>>>>>>>>>>>>>> possibly be used for a global sort when the two overlap. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For example, if I have a table partitioned by “day” and >>>>>>>>>>>>>>>> “category” then the RequiredClustering would be by day, >>>>>>>>>>>>>>>> category. A required sort might be day ASC, category DESC, >>>>>>>>>>>>>>>> name ASC. Because that sort satisfies the required >>>>>>>>>>>>>>>> clustering, it could be used for a global ordering. But, is >>>>>>>>>>>>>>>> that useful? >>>>>>>>>>>>>>>> How would the global ordering matter beyond a sort within each >>>>>>>>>>>>>>>> partition, >>>>>>>>>>>>>>>> i.e., how would the partition’s place in the global ordering >>>>>>>>>>>>>>>> be passed? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> To your other questions, you might want to have a look at >>>>>>>>>>>>>>>> the recent SPIP I’m working on to consolidate and clean up >>>>>>>>>>>>>>>> logical plans >>>>>>>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>. >>>>>>>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that >>>>>>>>>>>>>>>> should help >>>>>>>>>>>>>>>> clarify what validation needs to take place. As for custom >>>>>>>>>>>>>>>> catalyst rules, >>>>>>>>>>>>>>>> I’d like to hear about the use cases to see if we can build it >>>>>>>>>>>>>>>> into these >>>>>>>>>>>>>>>> improvements. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> rb >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody < >>>>>>>>>>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hey all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I saw in some of the discussions around DataSourceV2 >>>>>>>>>>>>>>>>> writes that we might have the data source inform Spark of >>>>>>>>>>>>>>>>> requirements for >>>>>>>>>>>>>>>>> the input data's ordering and partitioning. Has there been a >>>>>>>>>>>>>>>>> proposed API >>>>>>>>>>>>>>>>> for that yet? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Even one level up it would be helpful to understand how I >>>>>>>>>>>>>>>>> should be thinking about the responsibility of the data >>>>>>>>>>>>>>>>> source writer, when >>>>>>>>>>>>>>>>> I should be inserting a custom catalyst rule, and how I >>>>>>>>>>>>>>>>> should handle >>>>>>>>>>>>>>>>> validation/assumptions of the table before attempting the >>>>>>>>>>>>>>>>> write. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>>> Pat >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Ryan Blue >>>>>>>>>>>>>>>> Software Engineer >>>>>>>>>>>>>>>> Netflix >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Ryan Blue >>>>>>>>>>> Software Engineer >>>>>>>>>>> Netflix >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Ryan Blue >>>>>>>>> Software Engineer >>>>>>>>> Netflix >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> Software Engineer >>>>>>> Netflix >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >>> >