Hi,

I wanted to try to implement https://issues.apache.org/jira/browse/SPARK-17691.
So I started by looking at the implementation of collect_list. My idea was, do 
the same as they but when adding a new element, if there are already more than 
the threshold, remove one instead.
The problem with this is that since collect_list has no partial aggregation we 
would end up shuffling all the data anyway. So while it would mean the actual 
resulting column might be smaller, the whole process would be as expensive as 
collect_list.
So I thought of adding partial aggregation. The problem is that the merge 
function receives a buffer which is in a row format. Collect_list doesn't use 
the buffer and uses its own data structure for collecting the data.
I can change the implementation to use a spark ArrayType instead, however, 
since ArrayType is immutable it would mean that I would need to copy it 
whenever I do anything.
Consider the simplest implementation of the update function:
If there are few elements => add an element to the array (if I use regular 
Array this would mean copy as I grow it which is fine for this stage)
If there are enough elements => we do not grow the array. Instead we need to 
decide what to replace. If we want to have the top 10 for example and there are 
10 elements, we need to drop the lowest and put the new one.
This means that if we simply loop across the array we would create a new copy 
and pay the copy + loop. If we keep it sorted then adding, sorting and removing 
the low one means 3 copies.
If I would have been able to use scala's array then I would basically copy 
whenever I grow and then when we grown to the max, all I would need to do is 
REPLACE the relevant element which is much cheaper.

The only other solution I see is to simply provide "take first N" agg function 
and have the user sort beforehand but this seems a bad solution to me both 
because sort is expensive and because if we do multiple aggregations we can't 
sort in two different ways.


I can't find a way to convert an internal buffer the way collect_list does it 
to an internal buffer before the merge.
I also can't find any way to use an array in the internal buffer as a mutable 
array. If I look at GenericMutableRow implementation then updating an array 
means creating a new one. I thought maybe of adding a function 
update_array_element which would change the relevant element (and similarly 
get_array_element to get an array element) which would allow to easily make the 
array mutable but if I look at the documentation it states this is not allowed.

Can anyone give me a tip on where to try to go from here?




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/https-issues-apache-org-jira-browse-SPARK-17691-tp19107.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to