Use Spark Aggregator in PySpark
Hi Spark Community, I have implemented a custom Spark Aggregator (a subclass to org.apache.spark.sql.expressions.Aggregator). Now I'm trying to use it in a PySpark application, but for some reason, I'm not able to trigger the function. Here is what I'm doing, could someone help me take a look? Thanks. spark = self._gen_spark_session() spark.udf.registerJavaFunction( name="MyAggrator", javaClassName="my.package.MyAggrator", returnType=ArrayType(elementType=LongType()), ) The above code runs successfully. However, to call it, I assume I should do something like the following. df = df.groupBy().agg( functions.expr("MyAggrator(input)").alias("output"), ) But this one gives me the following error: pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator doesn't implement any UDF interface My question is how can I use the Spark Aggregator defined in a jar file in PySpark? Thanks. Thomas
Re: Spark Aggregator with ARRAY input and ARRAY output
Thanks Raghavendra, Could you be more specific about how I can use ExpressionEncoder()? More specifically, how can I conform to the return type of Encoder>? Thomas On Sun, Apr 23, 2023 at 9:42 AM Raghavendra Ganesh wrote: > For simple array types setting encoder to ExpressionEncoder() should work. > -- > Raghavendra > > > On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang wrote: > >> Hi Spark Community, >> >> I'm trying to implement a custom Spark Aggregator (a subclass to >> org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong, >> but I'm assuming I will be able to use it as an aggregation function like >> SUM. >> >> What I'm trying to do is that I have a column of ARRAY and I >> would like to GROUP BY another column and perform element-wise SUM if >> the boolean flag is set to True. The result of such aggregation should >> return ARRAY. >> >> Here is my implementation so far: >> >> package mypackage.udf; >> >> import org.apache.spark.sql.Encoder; >> import org.apache.spark.sql.expressions.Aggregator; >> >> import java.util.ArrayList; >> import java.util.List; >> >> public class ElementWiseAgg extends Aggregator, List, >> List> { >> >> @Override >> public List zero() { >> return new ArrayList<>(); >> } >> >> @Override >> public List reduce(List b, List a) { >> if (a == null) return b; >> int diff = a.size() - b.size(); >> for (int i = 0; i < diff; i++) { >> b.add(0L); >> } >> for (int i = 0; i < a.size(); i++) { >> if (a.get(i)) b.set(i, b.get(i) + 1); >> } >> return b; >> } >> >> @Override >> public List merge(List b1, List b2) { >> List longer; >> List shorter; >> if (b1.size() > b2.size()) { >> longer = b1; >> shorter = b2; >> } else { >> longer = b2; >> shorter = b1; >> } >> for (int i = 0; i < shorter.size(); i++) { >> longer.set(i, longer.get(i) + shorter.get(i)); >> } >> return longer; >> } >> >> @Override >> public List finish(List reduction) { >> return reduction; >> } >> >> @Override >> public Encoder> bufferEncoder() { >> return null; >> } >> >> @Override >> public Encoder> outputEncoder() { >> return null; >> } >> } >> >> The part I'm not quite sure is how to override bufferEncoder and >> outputEncoder. The default Encoders list does not provide encoding for >> List. >> >> Can someone point me to the right direction? Thanks! >> >> >> Thomas >> >> >>
Spark Aggregator with ARRAY input and ARRAY output
Hi Spark Community, I'm trying to implement a custom Spark Aggregator (a subclass to org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong, but I'm assuming I will be able to use it as an aggregation function like SUM. What I'm trying to do is that I have a column of ARRAY and I would like to GROUP BY another column and perform element-wise SUM if the boolean flag is set to True. The result of such aggregation should return ARRAY. Here is my implementation so far: package mypackage.udf; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.expressions.Aggregator; import java.util.ArrayList; import java.util.List; public class ElementWiseAgg extends Aggregator, List, List> { @Override public List zero() { return new ArrayList<>(); } @Override public List reduce(List b, List a) { if (a == null) return b; int diff = a.size() - b.size(); for (int i = 0; i < diff; i++) { b.add(0L); } for (int i = 0; i < a.size(); i++) { if (a.get(i)) b.set(i, b.get(i) + 1); } return b; } @Override public List merge(List b1, List b2) { List longer; List shorter; if (b1.size() > b2.size()) { longer = b1; shorter = b2; } else { longer = b2; shorter = b1; } for (int i = 0; i < shorter.size(); i++) { longer.set(i, longer.get(i) + shorter.get(i)); } return longer; } @Override public List finish(List reduction) { return reduction; } @Override public Encoder> bufferEncoder() { return null; } @Override public Encoder> outputEncoder() { return null; } } The part I'm not quite sure is how to override bufferEncoder and outputEncoder. The default Encoders list does not provide encoding for List. Can someone point me to the right direction? Thanks! Thomas
eqNullSafe breaks Sorted Merge Bucket Join?
Hi, I have two tables t1 and t2. Both are bucketed and sorted on user_id into 32 buckets. When I use a regular equal join, Spark triggers the expected Sorted Merge Bucket Join. Please see my code and the physical plan below. from pyspark.sql import SparkSession def _gen_spark_session(job_name: str) -> SparkSession: return ( SparkSession .builder .enableHiveSupport() .appName(f'Job: {job_name}') .config( key='spark.sql.sources.bucketing.enabled', value='true', ).config( key='spark.sql.legacy.bucketedTableScan.outputOrdering', value='true', ).config( key='spark.hadoop.mapreduce.fileoutputcommitter' '.algorithm.version', value='2', ).config( key='spark.speculation', value='false', ).getOrCreate() ) def run() -> None: spark = _gen_spark_session(job_name='TEST') joined_df = spark.sql(f''' SELECT COALESCE(t1.user_id, t2.user_id) AS user_id FROM t1 FULL OUTER JOIN t2 ON t1.user_id = t2.user_id ''') joined_df.explain(True) if __name__ == '__main__': run() Physical Plan: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [coalesce(user_id#0L, user_id#6L) AS user_id#12L] +- SortMergeJoin [user_id#0L], [user_id#6L], FullOuter :- FileScan parquet Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct, SelectedBucketsCount: 32 out of 32 +- FileScan parquet Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct, SelectedBucketsCount: 32 out of 32 As you can see, there is no exchange and sort before the SorteMergeJoin step. However, if I switch to using eqNullSafe as the join condition, Spark doesn't trigger the Sorted Merge Bucket Join any more. def run() -> None: spark = _gen_spark_session(job_name='TEST') joined_df = spark.sql(f''' SELECT COALESCE(t1.user_id, t2.user_id) AS user_id FROM t1 FULL OUTER JOIN t2 ON t1.user_id <=> t2.user_id ''') joined_df.explain(True) The equal sign is the only thing I changed between the two runs. Physical Plan: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [coalesce(user_id#0L, user_id#6L) AS user_id#12L] +- SortMergeJoin [coalesce(user_id#0L, 0), isnull(user_id#0L)], [coalesce(user_id#6L, 0), isnull(user_id#6L)], FullOuter :- Sort [coalesce(user_id#0L, 0) ASC NULLS FIRST, isnull(user_id#0L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(user_id#0L, 0), isnull(user_id#0L), 1000), ENSURE_REQUIREMENTS, [id=#23] : +- FileScan parquet [user_id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- Sort [coalesce(user_id#6L, 0) ASC NULLS FIRST, isnull(user_id#6L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(user_id#6L, 0), isnull(user_id#6L), 1000), ENSURE_REQUIREMENTS, [id=#26] +- FileScan parquet [user_id#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct If I read this correctly, the eqNullSafe is just a syntactic sugar that automatically applies a COALESCE to 0? Does Spark consider potential key collisions in this case (e.g. I have a user_id = 0 in my original dataset)? I know if we apply a UDF on the join condition, it would break the bucketing, thus the rebucketing and resorting. However, I'm wondering in this special case, can we make it work as well? Thanks. Thomas