okay i see the partition local sort. got it. i would expect that pushing the partition local sort into shuffle would give a signficicant boost. but thats just a guess.
On Fri, Nov 4, 2016 at 2:39 PM, Michael Armbrust <mich...@databricks.com> wrote: > sure, but then my values are not sorted per key, right? > > > It does do a partition local sort. Look at the query plan in my example > <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>. > The code here will also take care of finding the boundaries and is pretty > careful to spill / avoid materializing unnecessarily. > > I think you are correct though that we are not pushing any of the sort > into the shuffle. I'm not sure how much that buys you. If its a lot we > could extend the planner to look for Exchange->Sort pairs and change the > exchange. > > On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers <ko...@tresata.com> wrote: > >> i just noticed Sort for Dataset has a global flag. and Dataset also has >> sortWithinPartitions. >> >> how about: >> repartition + sortWithinPartitions + mapPartitions? >> >> the plan looks ok, but it is not clear to me if the sort is done as part >> of the shuffle (which is the important optimization). >> >> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", >> "value") >> >> scala> df.repartition(2, >> col("key")).sortWithinPartitions("value").as[(String, >> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain >> == Physical Plan == >> *SerializeFromObject [staticinvoke(class >> org.apache.spark.unsafe.types.UTF8String, >> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top >> level non-flat input object)._1, true) AS _1#39, staticinvoke(class >> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, >> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input >> object)._2, true) AS _2#40] >> +- MapPartitions <function1>, obj#38: scala.Tuple2 >> +- DeserializeToObject newInstance(class scala.Tuple2), obj#37: >> scala.Tuple2 >> +- *Sort [value#6 ASC], false, 0 >> +- Exchange hashpartitioning(key#5, 2) >> +- LocalTableScan [key#5, value#6] >> >> >> >> >> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> sure, but then my values are not sorted per key, right? >>> >>> so a group by key with values sorted according to to some ordering is an >>> operation that can be done efficiently in a single shuffle without first >>> figuring out range boundaries. and it is needed for quite a few algos, >>> including Window and lots of timeseries stuff. but it seems there is no way >>> to express i want to do this yet (at least not in an efficient way). >>> >>> which makes me wonder, what does Window do? >>> >>> >>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>>> Thinking out loud is good :) >>>> >>>> You are right in that anytime you ask for a global ordering from Spark >>>> you will pay the cost of figuring out the range boundaries for partitions. >>>> If you say orderBy, though, we aren't sure that you aren't expecting a >>>> global order. >>>> >>>> If you only want to make sure that items are colocated, it is cheaper >>>> to do a groupByKey followed by a flatMapGroups >>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html> >>>> . >>>> >>>> >>>> >>>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers <ko...@tresata.com> >>>> wrote: >>>> >>>>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and >>>>> then do mapPartitions? >>>>> >>>>> sorry thinking out loud a bit here. ok i think that could work. thanks >>>>> >>>>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers <ko...@tresata.com> >>>>> wrote: >>>>> >>>>>> thats an interesting thought about orderBy and mapPartitions. i guess >>>>>> i could emulate a groupBy with secondary sort using those two. however >>>>>> isn't using an orderBy expensive since it is a total sort? i mean a >>>>>> groupBy >>>>>> with secondary sort is also a total sort under the hood, but its on >>>>>> (hashCode(key), secondarySortColumn) which is easier to distribute and >>>>>> therefore can be implemented more efficiently. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust < >>>>>> mich...@databricks.com> wrote: >>>>>> >>>>>>> It is still unclear to me why we should remember all these tricks >>>>>>>> (or add lots of extra little functions) when this elegantly can be >>>>>>>> expressed in a reduce operation with a simple one line lamba function. >>>>>>>> >>>>>>> I think you can do that too. KeyValueGroupedDataset has a >>>>>>> reduceGroups function. This probably won't be as fast though because >>>>>>> you >>>>>>> end up creating objects where as the version I gave will get codgened to >>>>>>> operate on binary data the whole way though. >>>>>>> >>>>>>>> The same applies to these Window functions. I had to read it 3 >>>>>>>> times to understand what it all means. Maybe it makes sense for >>>>>>>> someone who >>>>>>>> has been forced to use such limited tools in sql for many years but >>>>>>>> that's >>>>>>>> not necessary what we should aim for. Why can I not just have the >>>>>>>> sortBy >>>>>>>> and then an Iterator[X] => Iterator[Y] to express what I want to do? >>>>>>>> >>>>>>> We also have orderBy and mapPartitions. >>>>>>> >>>>>>>> All these functions (rank etc.) can be trivially expressed in this, >>>>>>>> plus I can add other operations if needed, instead of being locked in >>>>>>>> like >>>>>>>> this Window framework. >>>>>>>> >>>>>>> I agree that window functions would probably not be my first choice >>>>>>> for many problems, but for people coming from SQL it was a very popular >>>>>>> feature. My real goal is to give as many paradigms as possible in a >>>>>>> single >>>>>>> unified framework. Let people pick the right mode of expression for any >>>>>>> given job :) >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >