bq. this shuffle could outweigh the benefits of the organized data if the cardinality is lower.
I wonder if you meant higher in place of the last word above. Cheers On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer <russell.spit...@gmail.com> wrote: > For added color, one thing that I may want to consider as a data source > implementer is the cost / benefit of applying a particular clustering. For > example, a dataset with low cardinality in the clustering key could benefit > greatly from clustering on that key before writing to Cassandra since > Cassandra can benefit from these sorts of batching. But the cost of > performing this shuffle could outweigh the benefits of the organized data > if the cardinality is lower. > > I imagine other sources might have similar benefit calculations. Doing a > particular sort or clustering can provide increased throughput but only in > certain situations based on some facts about the data. > > > For a concrete example here. > > Cassandra can insert records with the same partition-key faster if they > arrive in the same payload. But this is only beneficial if the incoming > dataset has multiple entries for the same partition key. If the incoming > source does not have any duplicates then there is no benefit to requiring a > sort or partitioning. > > On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody <patrick.woo...@gmail.com> > wrote: > >> Spark would always apply the required clustering and sort order because >>> they are required by the data source. It is reasonable for a source to >>> reject data that isn’t properly prepared. For example, data must be written >>> to HTable files with keys in order or else the files are invalid. Sorting >>> should not be implemented in the sources themselves because Spark handles >>> concerns like spilling to disk. Spark must prepare data correctly, which is >>> why the interfaces start with “Requires”. >> >> >> This was in reference to Russell's suggestion that the data source could >> have a required sort, but only a recommended partitioning. I don't have an >> immediate recommending use case that would come to mind though. I'm >> definitely in sync that the data source itself shouldn't do work outside of >> the writes themselves. >> >> >> Considering the second use case you mentioned first, I don’t think it is >>> a good idea for a table to put requirements on the number of tasks used for >>> a write. The parallelism should be set appropriately for the data volume, >>> which is for Spark or the user to determine. A minimum or maximum number of >>> tasks could cause bad behavior. >> >> For your first use case, an explicit global ordering, the problem is that >>> there can’t be an explicit global ordering for a table when it is populated >>> by a series of independent writes. Each write could have a global order, >>> but once those files are written, you have to deal with multiple sorted >>> data sets. I think it makes sense to focus on order within data files, not >>> order between data files. >> >> >> This is where I'm interested in learning about the separation of >> responsibilities for the data source and how "smart" it is supposed to be. >> >> For the first part, I would assume that given the estimated data size >> from Spark and options passed in from the user, the data source could make >> a more intelligent requirement on the write format than Spark >> independently. Somewhat analogous to how the current FileSource does bin >> packing of small files on the read side, restricting parallelism for the >> sake of overhead. >> >> For the second, I wouldn't assume that a data source requiring a certain >> write format would give any guarantees around reading the same data? In the >> cases where it is a complete overwrite it would, but for independent writes >> it could still be useful for statistics or compression. >> >> Thanks >> Pat >> >> >> >> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> wrote: >> >>> How would Spark determine whether or not to apply a recommendation - a >>> cost threshold? >>> >>> Spark would always apply the required clustering and sort order because >>> they are required by the data source. It is reasonable for a source to >>> reject data that isn’t properly prepared. For example, data must be written >>> to HTable files with keys in order or else the files are invalid. Sorting >>> should not be implemented in the sources themselves because Spark handles >>> concerns like spilling to disk. Spark must prepare data correctly, which is >>> why the interfaces start with “Requires”. >>> >>> I’m not sure what the second half of your question means. What does >>> Spark need to pass into the data source? >>> >>> Should a datasource be able to provide a Distribution proper rather than >>> just the clustering expressions? Two use cases would be for explicit global >>> sorting of the dataset and attempting to ensure a minimum write task >>> size/number of write tasks. >>> >>> Considering the second use case you mentioned first, I don’t think it is >>> a good idea for a table to put requirements on the number of tasks used for >>> a write. The parallelism should be set appropriately for the data volume, >>> which is for Spark or the user to determine. A minimum or maximum number of >>> tasks could cause bad behavior. >>> >>> That said, I think there is a related use case for sharding. But that’s >>> really just a clustering by an expression with the shard calculation, e.g., >>> hash(id_col, >>> 64). The shards should be handled as a cluster, but it doesn’t matter >>> how many tasks are used for it. >>> >>> For your first use case, an explicit global ordering, the problem is >>> that there can’t be an explicit global ordering for a table when it is >>> populated by a series of independent writes. Each write could have a global >>> order, but once those files are written, you have to deal with multiple >>> sorted data sets. I think it makes sense to focus on order within data >>> files, not order between data files. >>> >>> >>> On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <patrick.woo...@gmail.com >>> > wrote: >>> >>>> How would Spark determine whether or not to apply a recommendation - a >>>> cost threshold? And yes, it would be good to flesh out what information we >>>> get from Spark in the datasource when providing these >>>> recommendations/requirements - I could see statistics and the existing >>>> outputPartitioning/Ordering of the child plan being used for providing the >>>> requirement. >>>> >>>> Should a datasource be able to provide a Distribution proper rather >>>> than just the clustering expressions? Two use cases would be for explicit >>>> global sorting of the dataset and attempting to ensure a minimum write task >>>> size/number of write tasks. >>>> >>>> >>>> >>>> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer < >>>> russell.spit...@gmail.com> wrote: >>>> >>>>> Thanks for the clarification, definitely would want to require Sort >>>>> but only recommend partitioning ... I think that would be useful to >>>>> request based on details about the incoming dataset. >>>>> >>>>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote: >>>>> >>>>>> A required clustering would not, but a required sort would. >>>>>> Clustering is asking for the input dataframe's partitioning, and sorting >>>>>> would be how each partition is sorted. >>>>>> >>>>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer < >>>>>> russell.spit...@gmail.com> wrote: >>>>>> >>>>>>> I forgot since it's been a while, but does Clustering support allow >>>>>>> requesting that partitions contain elements in order as well? That >>>>>>> would be >>>>>>> a useful trick for me. IE >>>>>>> Request/Require(SortedOn(Col1)) >>>>>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2)) >>>>>>> >>>>>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks, it makes sense that the existing interface is for >>>>>>>> aggregation and not joins. Why are there requirements for the number of >>>>>>>> partitions that are returned then? >>>>>>>> >>>>>>>> Does it makes sense to design the write-side `Requirement` classes >>>>>>>> and the read-side reporting separately? >>>>>>>> >>>>>>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't >>>>>>>>> expose hash function, so Join can't benefit from this interface, as >>>>>>>>> Join >>>>>>>>> doesn't require a general ClusteredDistribution, but a more specific >>>>>>>>> one >>>>>>>>> called HashClusteredDistribution. >>>>>>>>> >>>>>>>>> So currently only Aggregate can benefit from >>>>>>>>> SupportsReportPartitioning and save shuffle. We can add a new >>>>>>>>> interface to >>>>>>>>> expose the hash function to make it work for Join. >>>>>>>>> >>>>>>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I just took a look at SupportsReportPartitioning and I'm not sure >>>>>>>>>> that it will work for real use cases. It doesn't specify, as far as >>>>>>>>>> I can >>>>>>>>>> tell, a hash function for combining clusters into tasks or a way to >>>>>>>>>> provide >>>>>>>>>> Spark a hash function for the other side of a join. It seems >>>>>>>>>> unlikely to me >>>>>>>>>> that many data sources would have partitioning that happens to match >>>>>>>>>> the >>>>>>>>>> other side of a join. And, it looks like task order matters? Maybe >>>>>>>>>> I'm >>>>>>>>>> missing something? >>>>>>>>>> >>>>>>>>>> I think that we should design the write side independently based >>>>>>>>>> on what data stores actually need, and take a look at the read side >>>>>>>>>> based >>>>>>>>>> on what data stores can actually provide. Wenchen, was there a >>>>>>>>>> design doc >>>>>>>>>> for partitioning on the read path? >>>>>>>>>> >>>>>>>>>> I completely agree with your point about a global sort. We >>>>>>>>>> recommend to all of our data engineers to add a sort to most tables >>>>>>>>>> because >>>>>>>>>> it introduces the range partitioner and does a skew calculation, in >>>>>>>>>> addition to making data filtering much better when it is read. It's >>>>>>>>>> really >>>>>>>>>> common for tables to be skewed by partition values. >>>>>>>>>> >>>>>>>>>> rb >>>>>>>>>> >>>>>>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody < >>>>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hey Ryan, Ted, Wenchen >>>>>>>>>>> >>>>>>>>>>> Thanks for the quick replies. >>>>>>>>>>> >>>>>>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have >>>>>>>>>>> to ensure something similar to requiredChildDistribution in >>>>>>>>>>> SparkPlan where >>>>>>>>>>> we have the number of partitions as well if we'd want to further >>>>>>>>>>> report to >>>>>>>>>>> SupportsReportPartitioning, yeah? >>>>>>>>>>> >>>>>>>>>>> Specifying an explicit global sort can also be useful for >>>>>>>>>>> filtering purposes on Parquet row group stats if we have a time >>>>>>>>>>> based/high >>>>>>>>>>> cardinality ID field. If my datasource or catalog knows about >>>>>>>>>>> previous >>>>>>>>>>> queries on a table, it could be really useful to recommend more >>>>>>>>>>> appropriate >>>>>>>>>>> formatting for consumers on the next materialization. The same >>>>>>>>>>> would be >>>>>>>>>>> true of clustering on commonly joined fields. >>>>>>>>>>> >>>>>>>>>>> Thanks again >>>>>>>>>>> Pat >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hmm. Ryan seems to be right. >>>>>>>>>>>> >>>>>>>>>>>> Looking at sql/core/src/main/java/org/ >>>>>>>>>>>> apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java >>>>>>>>>>>> : >>>>>>>>>>>> >>>>>>>>>>>> import org.apache.spark.sql.sources.v2.reader.partitioning. >>>>>>>>>>>> Partitioning; >>>>>>>>>>>> ... >>>>>>>>>>>> Partitioning outputPartitioning(); >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan < >>>>>>>>>>>> cloud0...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Actually clustering is already supported, please take a look >>>>>>>>>>>>> at SupportsReportPartitioning >>>>>>>>>>>>> >>>>>>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan >>>>>>>>>>>>> proposed. >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Interesting. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Should requiredClustering return a Set of Expression's ? >>>>>>>>>>>>>> This way, we can determine the order of Expression's by >>>>>>>>>>>>>> looking at what requiredOrdering() returns. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue < >>>>>>>>>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Pat, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for starting the discussion on this, we’re really >>>>>>>>>>>>>>> interested in it as well. I don’t think there is a proposed API >>>>>>>>>>>>>>> yet, but I >>>>>>>>>>>>>>> was thinking something like this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> interface RequiresClustering { >>>>>>>>>>>>>>> List<Expression> requiredClustering(); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> interface RequiresSort { >>>>>>>>>>>>>>> List<SortOrder> requiredOrdering(); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The reason why RequiresClustering should provide Expression >>>>>>>>>>>>>>> is that it needs to be able to customize the implementation. >>>>>>>>>>>>>>> For example, >>>>>>>>>>>>>>> writing to HTable would require building a key (or the data for >>>>>>>>>>>>>>> a key) and >>>>>>>>>>>>>>> that might use a hash function that differs from Spark’s >>>>>>>>>>>>>>> built-ins. >>>>>>>>>>>>>>> RequiresSort is fairly straightforward, but the interaction >>>>>>>>>>>>>>> between the two requirements deserves some consideration. To >>>>>>>>>>>>>>> make the two >>>>>>>>>>>>>>> compatible, I think that RequiresSort must be interpreted >>>>>>>>>>>>>>> as a sort within each partition of the clustering, but could >>>>>>>>>>>>>>> possibly be >>>>>>>>>>>>>>> used for a global sort when the two overlap. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For example, if I have a table partitioned by “day” and >>>>>>>>>>>>>>> “category” then the RequiredClustering would be by day, >>>>>>>>>>>>>>> category. A required sort might be day ASC, category DESC, >>>>>>>>>>>>>>> name ASC. Because that sort satisfies the required >>>>>>>>>>>>>>> clustering, it could be used for a global ordering. But, is >>>>>>>>>>>>>>> that useful? >>>>>>>>>>>>>>> How would the global ordering matter beyond a sort within each >>>>>>>>>>>>>>> partition, >>>>>>>>>>>>>>> i.e., how would the partition’s place in the global ordering be >>>>>>>>>>>>>>> passed? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> To your other questions, you might want to have a look at >>>>>>>>>>>>>>> the recent SPIP I’m working on to consolidate and clean up >>>>>>>>>>>>>>> logical plans >>>>>>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>. >>>>>>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that >>>>>>>>>>>>>>> should help >>>>>>>>>>>>>>> clarify what validation needs to take place. As for custom >>>>>>>>>>>>>>> catalyst rules, >>>>>>>>>>>>>>> I’d like to hear about the use cases to see if we can build it >>>>>>>>>>>>>>> into these >>>>>>>>>>>>>>> improvements. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> rb >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody < >>>>>>>>>>>>>>> patrick.woo...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hey all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes >>>>>>>>>>>>>>>> that we might have the data source inform Spark of >>>>>>>>>>>>>>>> requirements for the >>>>>>>>>>>>>>>> input data's ordering and partitioning. Has there been a >>>>>>>>>>>>>>>> proposed API for >>>>>>>>>>>>>>>> that yet? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Even one level up it would be helpful to understand how I >>>>>>>>>>>>>>>> should be thinking about the responsibility of the data source >>>>>>>>>>>>>>>> writer, when >>>>>>>>>>>>>>>> I should be inserting a custom catalyst rule, and how I should >>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>> validation/assumptions of the table before attempting the >>>>>>>>>>>>>>>> write. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>> Pat >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Ryan Blue >>>>>>>>>>>>>>> Software Engineer >>>>>>>>>>>>>>> Netflix >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Ryan Blue >>>>>>>>>> Software Engineer >>>>>>>>>> Netflix >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Ryan Blue >>>>>>>> Software Engineer >>>>>>>> Netflix >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>> >>>> >>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> >>