Hi, I wanted to try to implement https://issues.apache.org/jira/browse/SPARK-17691. So I started by looking at the implementation of collect_list. My idea was, do the same as they but when adding a new element, if there are already more than the threshold, remove one instead. The problem with this is that since collect_list has no partial aggregation we would end up shuffling all the data anyway. So while it would mean the actual resulting column might be smaller, the whole process would be as expensive as collect_list. So I thought of adding partial aggregation. The problem is that the merge function receives a buffer which is in a row format. Collect_list doesn't use the buffer and uses its own data structure for collecting the data. I can change the implementation to use a spark ArrayType instead, however, since ArrayType is immutable it would mean that I would need to copy it whenever I do anything. Consider the simplest implementation of the update function: If there are few elements => add an element to the array (if I use regular Array this would mean copy as I grow it which is fine for this stage) If there are enough elements => we do not grow the array. Instead we need to decide what to replace. If we want to have the top 10 for example and there are 10 elements, we need to drop the lowest and put the new one. This means that if we simply loop across the array we would create a new copy and pay the copy + loop. If we keep it sorted then adding, sorting and removing the low one means 3 copies. If I would have been able to use scala's array then I would basically copy whenever I grow and then when we grown to the max, all I would need to do is REPLACE the relevant element which is much cheaper.
The only other solution I see is to simply provide "take first N" agg function and have the user sort beforehand but this seems a bad solution to me both because sort is expensive and because if we do multiple aggregations we can't sort in two different ways. I can't find a way to convert an internal buffer the way collect_list does it to an internal buffer before the merge. I also can't find any way to use an array in the internal buffer as a mutable array. If I look at GenericMutableRow implementation then updating an array means creating a new one. I thought maybe of adding a function update_array_element which would change the relevant element (and similarly get_array_element to get an array element) which would allow to easily make the array mutable but if I look at the documentation it states this is not allowed. Can anyone give me a tip on where to try to go from here? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/https-issues-apache-org-jira-browse-SPARK-17691-tp19107.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.