How would Spark determine whether or not to apply a recommendation - a cost
threshold?

Spark would always apply the required clustering and sort order because
they are required by the data source. It is reasonable for a source to
reject data that isn’t properly prepared. For example, data must be written
to HTable files with keys in order or else the files are invalid. Sorting
should not be implemented in the sources themselves because Spark handles
concerns like spilling to disk. Spark must prepare data correctly, which is
why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark
need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than
just the clustering expressions? Two use cases would be for explicit global
sorting of the dataset and attempting to ensure a minimum write task
size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a
good idea for a table to put requirements on the number of tasks used for a
write. The parallelism should be set appropriately for the data volume,
which is for Spark or the user to determine. A minimum or maximum number of
tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s
really just a clustering by an expression with the shard calculation,
e.g., hash(id_col,
64). The shards should be handled as a cluster, but it doesn’t matter how
many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that
there can’t be an explicit global ordering for a table when it is populated
by a series of independent writes. Each write could have a global order,
but once those files are written, you have to deal with multiple sorted
data sets. I think it makes sense to focus on order within data files, not
order between data files.
​

On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <patrick.woo...@gmail.com>
wrote:

> How would Spark determine whether or not to apply a recommendation - a
> cost threshold? And yes, it would be good to flesh out what information we
> get from Spark in the datasource when providing these
> recommendations/requirements - I could see statistics and the existing
> outputPartitioning/Ordering of the child plan being used for providing the
> requirement.
>
> Should a datasource be able to provide a Distribution proper rather than
> just the clustering expressions? Two use cases would be for explicit global
> sorting of the dataset and attempting to ensure a minimum write task
> size/number of write tasks.
>
>
>
> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> Thanks for the clarification, definitely would want to require Sort but
>> only recommend partitioning ...  I think that would be useful to request
>> based on details about the incoming dataset.
>>
>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> A required clustering would not, but a required sort would. Clustering
>>> is asking for the input dataframe's partitioning, and sorting would be how
>>> each partition is sorted.
>>>
>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>>> russell.spit...@gmail.com> wrote:
>>>
>>>> I forgot since it's been a while, but does Clustering support allow
>>>> requesting that partitions contain elements in order as well? That would be
>>>> a useful trick for me. IE
>>>> Request/Require(SortedOn(Col1))
>>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>>>
>>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Thanks, it makes sense that the existing interface is for aggregation
>>>>> and not joins. Why are there requirements for the number of partitions 
>>>>> that
>>>>> are returned then?
>>>>>
>>>>> Does it makes sense to design the write-side `Requirement` classes and
>>>>> the read-side reporting separately?
>>>>>
>>>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>>>>>> expose hash function, so Join can't benefit from this interface, as Join
>>>>>> doesn't require a general ClusteredDistribution, but a more specific one
>>>>>> called HashClusteredDistribution.
>>>>>>
>>>>>> So currently only Aggregate can benefit from
>>>>>> SupportsReportPartitioning and save shuffle. We can add a new interface 
>>>>>> to
>>>>>> expose the hash function to make it work for Join.
>>>>>>
>>>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> I just took a look at SupportsReportPartitioning and I'm not sure
>>>>>>> that it will work for real use cases. It doesn't specify, as far as I 
>>>>>>> can
>>>>>>> tell, a hash function for combining clusters into tasks or a way to 
>>>>>>> provide
>>>>>>> Spark a hash function for the other side of a join. It seems unlikely 
>>>>>>> to me
>>>>>>> that many data sources would have partitioning that happens to match the
>>>>>>> other side of a join. And, it looks like task order matters? Maybe I'm
>>>>>>> missing something?
>>>>>>>
>>>>>>> I think that we should design the write side independently based on
>>>>>>> what data stores actually need, and take a look at the read side based 
>>>>>>> on
>>>>>>> what data stores can actually provide. Wenchen, was there a design doc 
>>>>>>> for
>>>>>>> partitioning on the read path?
>>>>>>>
>>>>>>> I completely agree with your point about a global sort. We recommend
>>>>>>> to all of our data engineers to add a sort to most tables because it
>>>>>>> introduces the range partitioner and does a skew calculation, in 
>>>>>>> addition
>>>>>>> to making data filtering much better when it is read. It's really common
>>>>>>> for tables to be skewed by partition values.
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hey Ryan, Ted, Wenchen
>>>>>>>>
>>>>>>>> Thanks for the quick replies.
>>>>>>>>
>>>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have to
>>>>>>>> ensure something similar to requiredChildDistribution in SparkPlan 
>>>>>>>> where we
>>>>>>>> have the number of partitions as well if we'd want to further report to
>>>>>>>> SupportsReportPartitioning, yeah?
>>>>>>>>
>>>>>>>> Specifying an explicit global sort can also be useful for filtering
>>>>>>>> purposes on Parquet row group stats if we have a time based/high
>>>>>>>> cardinality ID field. If my datasource or catalog knows about previous
>>>>>>>> queries on a table, it could be really useful to recommend more 
>>>>>>>> appropriate
>>>>>>>> formatting for consumers on the next materialization. The same would be
>>>>>>>> true of clustering on commonly joined fields.
>>>>>>>>
>>>>>>>> Thanks again
>>>>>>>> Pat
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hmm. Ryan seems to be right.
>>>>>>>>>
>>>>>>>>> Looking at sql/core/src/main/java/org/
>>>>>>>>> apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
>>>>>>>>> :
>>>>>>>>>
>>>>>>>>> import org.apache.spark.sql.sources.v
>>>>>>>>> 2.reader.partitioning.Partitioning;
>>>>>>>>> ...
>>>>>>>>>   Partitioning outputPartitioning();
>>>>>>>>>
>>>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Actually clustering is already supported, please take a look at
>>>>>>>>>> SupportsReportPartitioning
>>>>>>>>>>
>>>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan
>>>>>>>>>> proposed.
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Interesting.
>>>>>>>>>>>
>>>>>>>>>>> Should requiredClustering return a Set of Expression's ?
>>>>>>>>>>> This way, we can determine the order of Expression's by looking
>>>>>>>>>>> at what requiredOrdering() returns.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Pat,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for starting the discussion on this, we’re really
>>>>>>>>>>>> interested in it as well. I don’t think there is a proposed API 
>>>>>>>>>>>> yet, but I
>>>>>>>>>>>> was thinking something like this:
>>>>>>>>>>>>
>>>>>>>>>>>> interface RequiresClustering {
>>>>>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> interface RequiresSort {
>>>>>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> The reason why RequiresClustering should provide Expression is
>>>>>>>>>>>> that it needs to be able to customize the implementation. For 
>>>>>>>>>>>> example,
>>>>>>>>>>>> writing to HTable would require building a key (or the data for a 
>>>>>>>>>>>> key) and
>>>>>>>>>>>> that might use a hash function that differs from Spark’s built-ins.
>>>>>>>>>>>> RequiresSort is fairly straightforward, but the interaction
>>>>>>>>>>>> between the two requirements deserves some consideration. To make 
>>>>>>>>>>>> the two
>>>>>>>>>>>> compatible, I think that RequiresSort must be interpreted as a
>>>>>>>>>>>> sort within each partition of the clustering, but could possibly 
>>>>>>>>>>>> be used
>>>>>>>>>>>> for a global sort when the two overlap.
>>>>>>>>>>>>
>>>>>>>>>>>> For example, if I have a table partitioned by “day” and
>>>>>>>>>>>> “category” then the RequiredClustering would be by day,
>>>>>>>>>>>> category. A required sort might be day ASC, category DESC,
>>>>>>>>>>>> name ASC. Because that sort satisfies the required clustering,
>>>>>>>>>>>> it could be used for a global ordering. But, is that useful? How 
>>>>>>>>>>>> would the
>>>>>>>>>>>> global ordering matter beyond a sort within each partition, i.e., 
>>>>>>>>>>>> how would
>>>>>>>>>>>> the partition’s place in the global ordering be passed?
>>>>>>>>>>>>
>>>>>>>>>>>> To your other questions, you might want to have a look at the
>>>>>>>>>>>> recent SPIP I’m working on to consolidate and clean up logical
>>>>>>>>>>>> plans
>>>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that 
>>>>>>>>>>>> should help
>>>>>>>>>>>> clarify what validation needs to take place. As for custom 
>>>>>>>>>>>> catalyst rules,
>>>>>>>>>>>> I’d like to hear about the use cases to see if we can build it 
>>>>>>>>>>>> into these
>>>>>>>>>>>> improvements.
>>>>>>>>>>>>
>>>>>>>>>>>> rb
>>>>>>>>>>>> ​
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes
>>>>>>>>>>>>> that we might have the data source inform Spark of requirements 
>>>>>>>>>>>>> for the
>>>>>>>>>>>>> input data's ordering and partitioning. Has there been a proposed 
>>>>>>>>>>>>> API for
>>>>>>>>>>>>> that yet?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Even one level up it would be helpful to understand how I
>>>>>>>>>>>>> should be thinking about the responsibility of the data source 
>>>>>>>>>>>>> writer, when
>>>>>>>>>>>>> I should be inserting a custom catalyst rule, and how I should 
>>>>>>>>>>>>> handle
>>>>>>>>>>>>> validation/assumptions of the table before attempting the write.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>> Pat
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>> Netflix
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to