Andrei created SPARK-37547: ------------------------------ Summary: Unexpected NullPointerException when Aggregator.finish returns null Key: SPARK-37547 URL: https://issues.apache.org/jira/browse/SPARK-37547 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0, 3.1.2 Reporter: Andrei
I'm migrating existing code (Java 8) from Spark 2.4 to Spark 3 and I see NullPointerException when an Aggregator returns null in finish method for a custom class. I've created simple snippet to repro the issue. {code:java} public class SparkTest { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("name").setMaster("local[*]"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); List<String> data = Arrays.asList("1", "2", "3"); Dataset<String> dataset = spark.createDataset(data, Encoders.STRING()); Dataset<Row> aggDataset = dataset.groupBy("value").agg(new EntityAggregator().toColumn().name("agg")); aggDataset.show(); } } {code} {code:java} public class EntityAggregator extends Aggregator<Row, EntityAgg, EntityAgg> { public EntityAgg zero() { return new EntityAgg(0l); } public EntityAgg reduce(EntityAgg agg, Row row) { return agg; } public EntityAgg merge(EntityAgg e1, EntityAgg e2) { return e1; } public Encoder<EntityAgg> bufferEncoder() { return Encoders.bean(EntityAgg.class); } public Encoder<EntityAgg> outputEncoder() { return Encoders.bean(EntityAgg.class); } public EntityAgg finish(EntityAgg reduction) { return null; } } {code} {code:java} public class EntityAgg { private long field; public EntityAgg() { } public EntityAgg(long field) { this.field = field; } public long getField() { return field; } public void setField(long field) { this.field = field; } } {code} Expected behavior is to print table like this {noformat} +-----+----+ |value| agg| +-----+----+ | 3|null| | 1|null| | 2|null| +-----+----+ {noformat} This code works fine for 2.4 but fails with the following stacktrace for Spark 3 (I tested for 3.1.2 and 3.2.0) {noformat} Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:49) at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:85) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:32) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} Another observation, that if I change EntityAgg to String in Aggregator then It works fine. I've found a test in github that should check for this behavior. [https://github.com/apache/spark/blob/branch-3.1/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala#L338] I haven't found similar issue so please point me to open ticket if there is any. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org