For simple array types setting encoder to ExpressionEncoder() should work.
--
Raghavendra


On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang <w...@datability.io> wrote:

> Hi Spark Community,
>
> I'm trying to implement a custom Spark Aggregator (a subclass to
> org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong,
> but I'm assuming I will be able to use it as an aggregation function like
> SUM.
>
> What I'm trying to do is that I have a column of ARRAY<BOOLEAN> and I
> would like to GROUP BY another column and perform element-wise SUM if the
> boolean flag is set to True. The result of such aggregation should return
> ARRAY<LONG>.
>
> Here is my implementation so far:
>
> package mypackage.udf;
>
> import org.apache.spark.sql.Encoder;
> import org.apache.spark.sql.expressions.Aggregator;
>
> import java.util.ArrayList;
> import java.util.List;
>
> public class ElementWiseAgg extends Aggregator<List<Boolean>, List<Long>, 
> List<Long>> {
>
>     @Override
>     public List<Long> zero() {
>         return new ArrayList<>();
>     }
>
>     @Override
>     public List<Long> reduce(List<Long> b, List<Boolean> a) {
>         if (a == null) return b;
>         int diff = a.size() - b.size();
>         for (int i = 0; i < diff; i++) {
>             b.add(0L);
>         }
>         for (int i = 0; i < a.size(); i++) {
>             if (a.get(i)) b.set(i, b.get(i) + 1);
>         }
>         return b;
>     }
>
>     @Override
>     public List<Long> merge(List<Long> b1, List<Long> b2) {
>         List<Long> longer;
>         List<Long> shorter;
>         if (b1.size() > b2.size()) {
>             longer = b1;
>             shorter = b2;
>         } else {
>             longer = b2;
>             shorter = b1;
>         }
>         for (int i = 0; i < shorter.size(); i++) {
>             longer.set(i, longer.get(i) + shorter.get(i));
>         }
>         return longer;
>     }
>
>     @Override
>     public List<Long> finish(List<Long> reduction) {
>         return reduction;
>     }
>
>     @Override
>     public Encoder<List<Long>> bufferEncoder() {
>         return null;
>     }
>
>     @Override
>     public Encoder<List<Long>> outputEncoder() {
>         return null;
>     }
> }
>
> The part I'm not quite sure is how to override bufferEncoder and
> outputEncoder. The default Encoders list does not provide encoding for
> List.
>
> Can someone point me to the right direction? Thanks!
>
>
> Thomas
>
>
>

Reply via email to