I'm trying to create a new aggregate function. It's my first time working
with Catalyst, so it's exciting---but I'm also in a bit over my head.

My goal is to create a function to calculate the median
<https://issues.apache.org/jira/browse/SPARK-26589>.

As a very simple solution, I could just define median to be an alias
of `Percentile(col,
0.5)`. However, the leading comment on the Percentile expression
<https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39>
highlights that it's very memory-intensive and can easily lead to
OutOfMemory errors.

So instead of using Percentile, I'm trying to create an Expression that
calculates the median without needing to hold everything in memory at once.
I'm considering two different approaches:

1. Define Median as a combination of existing expressions: The median can
perhaps be built out of the existing expressions for Count
<https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48>
and NthValue
<https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675>
.

I don't see a template I can follow for building a new expression out of
existing expressions (i.e. without having to implement a bunch of methods
for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
would wrap NthValue to make it usable as a regular aggregate function. The
wrapped NthValue would need an implicit window that provides the necessary
ordering.


Is there any potential to this idea? Any pointers on how to implement it?


2. Another memory-light approach to calculating the median requires
multiple passes over the data to converge on the answer. The approach
is described
here
<https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>.
(I posted a sketch implementation of this approach using Spark's user-level
API here
<https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081>
.)

I am also struggling to understand how I would build an aggregate function
like this, since it requires multiple passes over the data. From what I can
see, Catalyst's aggregate functions are designed to work with a single pass
over the data.

We don't seem to have an interface for AggregateFunction that supports
multiple passes over the data. Is there some way to do this?


Again, this is my first serious foray into Catalyst. Any specific
implementation guidance is appreciated!

Nick

Reply via email to