Yeah, I think approximate percentile is good enough most of the time. I don't have a specific need for a precise median. I was interested in implementing it more as a Catalyst learning exercise, but it turns out I picked a bad learning exercise to solve. :)
On Mon, Dec 13, 2021 at 9:46 PM Reynold Xin <r...@databricks.com> wrote: > tl;dr: there's no easy way to implement aggregate expressions that'd > require multiple pass over data. It is simply not something that's > supported and doing so would be very high cost. > > Would you be OK using approximate percentile? That's relatively cheap. > > > > On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas < > nicholas.cham...@gmail.com> wrote: > >> No takers here? :) >> >> I can see now why a median function is not available in most data >> processing systems. It's pretty annoying to implement! >> >> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> I'm trying to create a new aggregate function. It's my first time >>> working with Catalyst, so it's exciting---but I'm also in a bit over my >>> head. >>> >>> My goal is to create a function to calculate the median >>> <https://issues.apache.org/jira/browse/SPARK-26589>. >>> >>> As a very simple solution, I could just define median to be an alias of >>> `Percentile(col, >>> 0.5)`. However, the leading comment on the Percentile expression >>> <https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39> >>> highlights that it's very memory-intensive and can easily lead to >>> OutOfMemory errors. >>> >>> So instead of using Percentile, I'm trying to create an Expression that >>> calculates the median without needing to hold everything in memory at once. >>> I'm considering two different approaches: >>> >>> 1. Define Median as a combination of existing expressions: The median >>> can perhaps be built out of the existing expressions for Count >>> <https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48> >>> and NthValue >>> <https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675> >>> . >>> >>> I don't see a template I can follow for building a new expression out of >>> existing expressions (i.e. without having to implement a bunch of methods >>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I >>> would wrap NthValue to make it usable as a regular aggregate function. The >>> wrapped NthValue would need an implicit window that provides the necessary >>> ordering. >>> >>> >>> Is there any potential to this idea? Any pointers on how to implement it? >>> >>> >>> 2. Another memory-light approach to calculating the median requires >>> multiple passes over the data to converge on the answer. The approach is >>> described >>> here >>> <https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>. >>> (I posted a sketch implementation of this approach using Spark's user-level >>> API here >>> <https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081> >>> .) >>> >>> I am also struggling to understand how I would build an aggregate >>> function like this, since it requires multiple passes over the data. From >>> what I can see, Catalyst's aggregate functions are designed to work with a >>> single pass over the data. >>> >>> We don't seem to have an interface for AggregateFunction that supports >>> multiple passes over the data. Is there some way to do this? >>> >>> >>> Again, this is my first serious foray into Catalyst. Any specific >>> implementation guidance is appreciated! >>> >>> Nick >>> >> >