Thanks Raghavendra, Could you be more specific about how I can use ExpressionEncoder()? More specifically, how can I conform to the return type of Encoder<List<Long>>?
Thomas On Sun, Apr 23, 2023 at 9:42 AM Raghavendra Ganesh <raghavendr...@gmail.com> wrote: > For simple array types setting encoder to ExpressionEncoder() should work. > -- > Raghavendra > > > On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang <w...@datability.io> wrote: > >> Hi Spark Community, >> >> I'm trying to implement a custom Spark Aggregator (a subclass to >> org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong, >> but I'm assuming I will be able to use it as an aggregation function like >> SUM. >> >> What I'm trying to do is that I have a column of ARRAY<BOOLEAN> and I >> would like to GROUP BY another column and perform element-wise SUM if >> the boolean flag is set to True. The result of such aggregation should >> return ARRAY<LONG>. >> >> Here is my implementation so far: >> >> package mypackage.udf; >> >> import org.apache.spark.sql.Encoder; >> import org.apache.spark.sql.expressions.Aggregator; >> >> import java.util.ArrayList; >> import java.util.List; >> >> public class ElementWiseAgg extends Aggregator<List<Boolean>, List<Long>, >> List<Long>> { >> >> @Override >> public List<Long> zero() { >> return new ArrayList<>(); >> } >> >> @Override >> public List<Long> reduce(List<Long> b, List<Boolean> a) { >> if (a == null) return b; >> int diff = a.size() - b.size(); >> for (int i = 0; i < diff; i++) { >> b.add(0L); >> } >> for (int i = 0; i < a.size(); i++) { >> if (a.get(i)) b.set(i, b.get(i) + 1); >> } >> return b; >> } >> >> @Override >> public List<Long> merge(List<Long> b1, List<Long> b2) { >> List<Long> longer; >> List<Long> shorter; >> if (b1.size() > b2.size()) { >> longer = b1; >> shorter = b2; >> } else { >> longer = b2; >> shorter = b1; >> } >> for (int i = 0; i < shorter.size(); i++) { >> longer.set(i, longer.get(i) + shorter.get(i)); >> } >> return longer; >> } >> >> @Override >> public List<Long> finish(List<Long> reduction) { >> return reduction; >> } >> >> @Override >> public Encoder<List<Long>> bufferEncoder() { >> return null; >> } >> >> @Override >> public Encoder<List<Long>> outputEncoder() { >> return null; >> } >> } >> >> The part I'm not quite sure is how to override bufferEncoder and >> outputEncoder. The default Encoders list does not provide encoding for >> List. >> >> Can someone point me to the right direction? Thanks! >> >> >> Thomas >> >> >>