Aggregating over sorted data

2016-11-23 Thread assaf.mendelson
Hi,
An issue I have encountered frequently is the need to look at data in an 
ordered manner per key.
A common way of doing this can be seen in the classic map reduce as the shuffle 
stage provides sorted data per key and one can therefore do a lot with that.
It is of course relatively easy to achieve this by using RDD but that would 
mean moving to RDD and back which has a non-insignificant performance penalty 
(beyond the fact that we lose any catalyst optimization).
We can use SQL window functions but that is not an ideal solution either. 
Beyond the fact that window functions are much slower than aggregate functions 
(as we need to generate a result for every record), we also can't join them 
together (i.e. if we have two window functions on the same window, it is still 
two separate scans).

Ideally, I would have liked to see something like: 
df.groupBy(col1).sortBy(col2).agg(...) and have the aggregations work on the 
sorted data. That would enable to use both the existing functions and UDAF 
where we can assume the order (and do any windowing we need as part of the 
function itself which is relatively easy in many cases).

I have tried to look for this and seen many questions on the subject but no 
answers.

I was hoping I missed something (I have seen that the SQL CLUSTER BY command 
repartitions and sorts accordingly but from my understanding it does not 
promise that this would remain true if we do a groupby afterwards). If I 
didn't, I believe this should be a feature to add (I can open a JIRA if people 
think it is a good idea).
Assaf.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Aggregating over sorted data

2016-12-12 Thread nsyca
Hi,

SPARK-18591 <https://issues.apache.org/jira/browse/SPARK-18591>   might be a
solution to your problem but making assuming in your UDAF logic on how Spark
will process the aggregation is really a risky thing. Is there a way to do
it using Windows function with ORDER BY clause to enforce the processing in
this case?




-
Nattavut Sutyanyong | @nsyca
Spark Technology Center
http://www.spark.tc/
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20206.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Aggregating over sorted data

2016-12-18 Thread Liang-Chi Hsieh

Hi,

As I know, Spark SQL doesn't provide native support for this feature now.
After searching, I found only few database systems support it, e.g.,
PostgreSQL.

Actually based on the Spark SQL's aggregate system, I think it is not very
difficult to add the support for this feature. The problem is how frequently
this feature is needed for Spark SQL users and if it is worth adding this,
because as I see, this feature is not very common.

Alternative possible to achieve this in current Spark SQL, is to use
Aggregator with Dataset API. You can write your custom Aggregator which has
an user-defined JVM object as buffer to hold the input data into your
aggregate function. But you may need to write necessary encoder for the
buffer object.

If you really need this feature, you may open a Jira to ask others' opinion
about this feature.






-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20273.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Aggregating over sorted data

2016-12-19 Thread Robin East
This is also a feature we need for our time-series processing 



> On 19 Dec 2016, at 04:07, Liang-Chi Hsieh  wrote:
> 
> 
> Hi,
> 
> As I know, Spark SQL doesn't provide native support for this feature now.
> After searching, I found only few database systems support it, e.g.,
> PostgreSQL.
> 
> Actually based on the Spark SQL's aggregate system, I think it is not very
> difficult to add the support for this feature. The problem is how frequently
> this feature is needed for Spark SQL users and if it is worth adding this,
> because as I see, this feature is not very common.
> 
> Alternative possible to achieve this in current Spark SQL, is to use
> Aggregator with Dataset API. You can write your custom Aggregator which has
> an user-defined JVM object as buffer to hold the input data into your
> aggregate function. But you may need to write necessary encoder for the
> buffer object.
> 
> If you really need this feature, you may open a Jira to ask others' opinion
> about this feature.
> 
> 
> 
> 
> 
> 
> -
> Liang-Chi Hsieh | @viirya 
> Spark Technology Center 
> http://www.spark.tc/ 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20273.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Aggregating over sorted data

2016-12-19 Thread Koert Kuipers
take a look at:
https://issues.apache.org/jira/browse/SPARK-15798


On Dec 19, 2016 00:17, "Robin East"  wrote:

This is also a feature we need for our time-series processing



> On 19 Dec 2016, at 04:07, Liang-Chi Hsieh  wrote:
>
>
> Hi,
>
> As I know, Spark SQL doesn't provide native support for this feature now.
> After searching, I found only few database systems support it, e.g.,
> PostgreSQL.
>
> Actually based on the Spark SQL's aggregate system, I think it is not very
> difficult to add the support for this feature. The problem is how
frequently
> this feature is needed for Spark SQL users and if it is worth adding this,
> because as I see, this feature is not very common.
>
> Alternative possible to achieve this in current Spark SQL, is to use
> Aggregator with Dataset API. You can write your custom Aggregator which
has
> an user-defined JVM object as buffer to hold the input data into your
> aggregate function. But you may need to write necessary encoder for the
> buffer object.
>
> If you really need this feature, you may open a Jira to ask others'
opinion
> about this feature.
>
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Aggregating-over-
sorted-data-tp1p20273.html
> Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: Aggregating over sorted data

2016-12-20 Thread Liang-Chi Hsieh
Hi,

Can you try the combination of `repartition` + `sortWithinPartitions` on the
dataset?

E.g.,

val df = Seq((2, "b c a"), (1, "c a b"), (3, "a c b")).toDF("number",
"letters")
val df2 =
  df.explode('letters) {
case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
  }

df2
  .select('number, '_1 as 'letter)
  .repartition('number)
  .sortWithinPartitions('number, 'letter)
  .groupBy('number)
  .agg(collect_list('letter))
  .show()

+--++
|number|collect_list(letter)|
+--++
| 3|   [a, b, c]|
| 1|   [a, b, c]|
| 2|   [a, b, c]|
+--++

I think it should let you do aggregate on sorted data per key.




-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20310.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Aggregating over sorted data

2016-12-21 Thread Koert Kuipers
i think this works but it relies on groupBy and agg respecting the sorting.
the api provides no such guarantee, so this could break in future versions.
i would not rely on this i think...

On Dec 20, 2016 18:58, "Liang-Chi Hsieh"  wrote:

Hi,

Can you try the combination of `repartition` + `sortWithinPartitions` on the
dataset?

E.g.,

val df = Seq((2, "b c a"), (1, "c a b"), (3, "a c b")).toDF("number",
"letters")
val df2 =
  df.explode('letters) {
case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
  }

df2
  .select('number, '_1 as 'letter)
  .repartition('number)
  .sortWithinPartitions('number, 'letter)
  .groupBy('number)
  .agg(collect_list('letter))
  .show()

+--++
|number|collect_list(letter)|
+--++
| 3|   [a, b, c]|
| 1|   [a, b, c]|
| 2|   [a, b, c]|
+--++

I think it should let you do aggregate on sorted data per key.




-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Aggregating-over-
sorted-data-tp1p20310.html
Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: Aggregating over sorted data

2016-12-21 Thread Liang-Chi Hsieh

I agreed that to make sure this work, you might need to know the Spark
internal implementation for APIs such as `groupBy`.

But without any more changes to current Spark implementation, I think this
is the one possible way to achieve the required function to aggregate on
sorted data per key.





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Aggregating over sorted data

2016-12-21 Thread Koert Kuipers
it can also be done with repartition + sortWithinPartitions + mapPartitions.
perhaps not as convenient but it does not rely on undocumented behavior.
i used this approach in spark-sorted. see here:
https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala

On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh  wrote:

>
> I agreed that to make sure this work, you might need to know the Spark
> internal implementation for APIs such as `groupBy`.
>
> But without any more changes to current Spark implementation, I think this
> is the one possible way to achieve the required function to aggregate on
> sorted data per key.
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Aggregating-over-
> sorted-data-tp1p20331.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Aggregating over sorted data

2016-12-22 Thread Liang-Chi Hsieh

You can't use existing aggregation functions with that. Besides, the
execution plan of `mapPartitions` doesn't support wholestage codegen.
Without that and some optimization around aggregation, that might be
possible performance degradation. Also when you have more than one keys in a
partition, you will need to take care of that in your function applied to
each partition.


Koert Kuipers wrote
> it can also be done with repartition + sortWithinPartitions +
> mapPartitions.
> perhaps not as convenient but it does not rely on undocumented behavior.
> i used this approach in spark-sorted. see here:
> https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala
> 
> On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <

> viirya@

> > wrote:
> 
>>
>> I agreed that to make sure this work, you might need to know the Spark
>> internal implementation for APIs such as `groupBy`.
>>
>> But without any more changes to current Spark implementation, I think
>> this
>> is the one possible way to achieve the required function to aggregate on
>> sorted data per key.
>>
>>
>>
>>
>>
>> -
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> --
>> View this message in context: http://apache-spark-
>> developers-list.1001551.n3.nabble.com/Aggregating-over-
>> sorted-data-tp1p20331.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: 

> dev-unsubscribe@.apache

>>
>>





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20333.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



RE: Aggregating over sorted data

2016-12-22 Thread assaf.mendelson
It seems that this aggregation is for dataset operations only. I would have 
hoped to be able to do dataframe aggregation. Something along the line of: 
sort_df(df).agg(my_agg_func)

In any case, note that this kind of sorting is less efficient than the sorting 
done in window functions for example. Specifically here what is happening is 
that first the data is shuffled and then the entire partition is sorted. It is 
possible to do it another way (although I have no idea how to do it in spark 
without writing a UDAF which is probably very inefficient). The other way would 
be to collect everything by key in each partition, sort within the key (which 
would be a lot faster since there are fewer elements) and then merge the 
results.

I was hoping to find something like: Efficient sortByKey to work with…

From: Koert Kuipers [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20332...@n3.nabble.com]
Sent: Thursday, December 22, 2016 7:14 AM
To: Mendelson, Assaf
Subject: Re: Aggregating over sorted data

it can also be done with repartition + sortWithinPartitions + mapPartitions.
perhaps not as convenient but it does not rely on undocumented behavior.
i used this approach in spark-sorted. see here:
https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala

On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <[hidden 
email]> wrote:

I agreed that to make sure this work, you might need to know the Spark
internal implementation for APIs such as `groupBy`.

But without any more changes to current Spark implementation, I think this
is the one possible way to achieve the required function to aggregate on
sorted data per key.





-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: [hidden 
email]



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20332.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20334.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Aggregating over sorted data

2016-12-22 Thread trsell
I would love this feature

On Thu, 22 Dec 2016, 18:45 assaf.mendelson,  wrote:

> It seems that this aggregation is for dataset operations only. I would
> have hoped to be able to do dataframe aggregation. Something along the line
> of: sort_df(df).agg(my_agg_func)
>
>
>
> In any case, note that this kind of sorting is less efficient than the
> sorting done in window functions for example. Specifically here what is
> happening is that first the data is shuffled and then the entire partition
> is sorted. It is possible to do it another way (although I have no idea how
> to do it in spark without writing a UDAF which is probably very
> inefficient). The other way would be to collect everything by key in each
> partition, sort within the key (which would be a lot faster since there are
> fewer elements) and then merge the results.
>
>
>
> I was hoping to find something like: Efficient sortByKey to work with…
>
>
>
> *From:* Koert Kuipers [via Apache Spark Developers List] 
> [mailto:ml-node+[hidden
> email] <http:///user/SendEmail.jtp?type=node&node=20334&i=0>]
> *Sent:* Thursday, December 22, 2016 7:14 AM
> *To:* Mendelson, Assaf
> *Subject:* Re: Aggregating over sorted data
>
>
>
> it can also be done with repartition + sortWithinPartitions +
> mapPartitions.
>
> perhaps not as convenient but it does not rely on undocumented behavior.
>
> i used this approach in spark-sorted. see here:
>
>
> https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala
>
> On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=20332&i=0>> wrote:
>
>
> I agreed that to make sure this work, you might need to know the Spark
> internal implementation for APIs such as `groupBy`.
>
> But without any more changes to current Spark implementation, I think this
> is the one possible way to achieve the required function to aggregate on
> sorted data per key.
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20331.html
>
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
>
> To unsubscribe e-mail: [hidden email]
> <http:///user/SendEmail.jtp?type=node&node=20332&i=1>
>
>
>
>
> --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20332.html
>
> To start a new topic under Apache Spark Developers List, email [hidden
> email] <http:///user/SendEmail.jtp?type=node&node=20334&i=1>
> To unsubscribe from Apache Spark Developers List, click here.
> NAML
> <http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
> --
> View this message in context: RE: Aggregating over sorted data
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20334.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>


Re: Aggregating over sorted data

2016-12-22 Thread Koert Kuipers
yes it's less optimal because an abstraction is missing and with
mapPartitions it is done without optimizations. but aggregator is not the
right abstraction to begin with, is assumes a monoid which means no
ordering guarantees. you need a fold operation.

On Dec 22, 2016 02:20, "Liang-Chi Hsieh"  wrote:

>
> You can't use existing aggregation functions with that. Besides, the
> execution plan of `mapPartitions` doesn't support wholestage codegen.
> Without that and some optimization around aggregation, that might be
> possible performance degradation. Also when you have more than one keys in
> a
> partition, you will need to take care of that in your function applied to
> each partition.
>
>
> Koert Kuipers wrote
> > it can also be done with repartition + sortWithinPartitions +
> > mapPartitions.
> > perhaps not as convenient but it does not rely on undocumented behavior.
> > i used this approach in spark-sorted. see here:
> > https://github.com/tresata/spark-sorted/blob/master/src/
> main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala
> >
> > On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <
>
> > viirya@
>
> > > wrote:
> >
> >>
> >> I agreed that to make sure this work, you might need to know the Spark
> >> internal implementation for APIs such as `groupBy`.
> >>
> >> But without any more changes to current Spark implementation, I think
> >> this
> >> is the one possible way to achieve the required function to aggregate on
> >> sorted data per key.
> >>
> >>
> >>
> >>
> >>
> >> -
> >> Liang-Chi Hsieh | @viirya
> >> Spark Technology Center
> >> http://www.spark.tc/
> >> --
> >> View this message in context: http://apache-spark-
> >> developers-list.1001551.n3.nabble.com/Aggregating-over-
> >> sorted-data-tp1p20331.html
> >> Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >>
> >> -----
> >> To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
> >>
> >>
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Aggregating-over-
> sorted-data-tp1p20333.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>