>
> sure, but then my values are not sorted per key, right?

It does do a partition local sort. Look at the query plan in my example
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>.
The code here will also take care of finding the boundaries and is pretty
careful to spill / avoid materializing unnecessarily.

I think you are correct though that we are not pushing any of the sort into
the shuffle.  I'm not sure how much that buys you.  If its a lot we could
extend the planner to look for Exchange->Sort pairs and change the exchange.

On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers <ko...@tresata.com> wrote:

> i just noticed Sort for Dataset has a global flag. and Dataset also has
> sortWithinPartitions.
>
> how about:
> repartition + sortWithinPartitions + mapPartitions?
>
> the plan looks ok, but it is not clear to me if the sort is done as part
> of the shuffle (which is the important optimization).
>
> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
> "value")
>
> scala> df.repartition(2, col("key")).sortWithinPartitions("value").as[(String,
> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top
> level non-flat input object)._1, true) AS _1#39, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
> object)._2, true) AS _2#40]
> +- MapPartitions <function1>, obj#38: scala.Tuple2
>    +- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
> scala.Tuple2
>       +- *Sort [value#6 ASC], false, 0
>          +- Exchange hashpartitioning(key#5, 2)
>             +- LocalTableScan [key#5, value#6]
>
>
>
>
> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> sure, but then my values are not sorted per key, right?
>>
>> so a group by key with values sorted according to to some ordering is an
>> operation that can be done efficiently in a single shuffle without first
>> figuring out range boundaries. and it is needed for quite a few algos,
>> including Window and lots of timeseries stuff. but it seems there is no way
>> to express i want to do this yet (at least not in an efficient way).
>>
>> which makes me wonder, what does Window do?
>>
>>
>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> Thinking out loud is good :)
>>>
>>> You are right in that anytime you ask for a global ordering from Spark
>>> you will pay the cost of figuring out the range boundaries for partitions.
>>> If you say orderBy, though, we aren't sure that you aren't expecting a
>>> global order.
>>>
>>> If you only want to make sure that items are colocated, it is cheaper to
>>> do a groupByKey followed by a flatMapGroups
>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>
>>> .
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>>>> then do mapPartitions?
>>>>
>>>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>>>
>>>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> thats an interesting thought about orderBy and mapPartitions. i guess
>>>>> i could emulate a groupBy with secondary sort using those two. however
>>>>> isn't using an orderBy expensive since it is a total sort? i mean a 
>>>>> groupBy
>>>>> with secondary sort is also a total sort under the hood, but its on
>>>>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>>>>> therefore can be implemented more efficiently.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> It is still unclear to me why we should remember all these tricks (or
>>>>>>> add lots of extra little functions) when this elegantly can be 
>>>>>>> expressed in
>>>>>>> a reduce operation with a simple one line lamba function.
>>>>>>>
>>>>>> I think you can do that too.  KeyValueGroupedDataset has a
>>>>>> reduceGroups function.  This probably won't be as fast though because you
>>>>>> end up creating objects where as the version I gave will get codgened to
>>>>>> operate on binary data the whole way though.
>>>>>>
>>>>>>> The same applies to these Window functions. I had to read it 3 times
>>>>>>> to understand what it all means. Maybe it makes sense for someone who 
>>>>>>> has
>>>>>>> been forced to use such limited tools in sql for many years but that's 
>>>>>>> not
>>>>>>> necessary what we should aim for. Why can I not just have the sortBy and
>>>>>>> then an Iterator[X] => Iterator[Y] to express what I want to do?
>>>>>>>
>>>>>> We also have orderBy and mapPartitions.
>>>>>>
>>>>>>> All these functions (rank etc.) can be trivially expressed in this,
>>>>>>> plus I can add other operations if needed, instead of being locked in 
>>>>>>> like
>>>>>>> this Window framework.
>>>>>>>
>>>>>>  I agree that window functions would probably not be my first choice
>>>>>> for many problems, but for people coming from SQL it was a very popular
>>>>>> feature.  My real goal is to give as many paradigms as possible in a 
>>>>>> single
>>>>>> unified framework.  Let people pick the right mode of expression for any
>>>>>> given job :)
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to