[ https://issues.apache.org/jira/browse/SPARK-38823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521414#comment-17521414 ]
Bruce Robbins commented on SPARK-38823: --------------------------------------- This appears to be an optimization bug that results in corruption of the buffers in {{AggregationIterator}}. On master and 3.3, {{NewInstance}} with no arguments is considered foldable. As a result, the {{ConstantFolding}} rule turns NewInstance into a Literal holding an instance of the user's specified Java bean. The instance becomes a singleton that gets reused for each input record (although its fields get updated by {{InitializeJavaBean}}). Because the instance gets reused, sometimes multiple buffers in {{AggregationIterator}} are actually referring to the same Java bean instance. Take, for example, the test I added [here|https://github.com/bersprockets/spark/blob/17a8ad64f5bc39cb26d25b63f3692e7b8632baf8/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java#L560]. The input is: {noformat} List<Item> items = Arrays.asList( new Item("a", 1), new Item("b", 3), new Item("c", 2), new Item("a", 7)); {noformat} As {{ObjectAggregationIterator}} reads the input, the buffers get set up as follows (note that the first field of Item should be the same as the key): {noformat} - Read Item("a", 1) - Buffers are now: Key "a" --> Item("a", 1) - Read Item("b", 3) - Buffers are now: Key "a" -> Item("b", 3) Key "b" -> Item("b", 3) {noformat} The buffer for key "a" now contains Item("b", 3). That's because both buffers contain a reference to the same Item instance, and that Item instance's fields were updated when {{Item("b", 3)}} was read. When {{AggregateIterator}} finally calls the test's reduce function, it will pass the same Item instance ({{Item("a", 7)}}) as both the buffer and the input record. At that point, the buffers for "a", "b", and "c" will all contain {{Item("a", 7)}}. I _think_ the fix for this is to make {{NewInstance}} non-foldable. My linked test passes with that change (and fails without it). I will run the unit tests and hopefully make a PR tomorrow, assuming the proposed fix doesn't break something else besides {{ConstantFoldingSuite}}. > Incorrect result of dataset reduceGroups in java > ------------------------------------------------ > > Key: SPARK-38823 > URL: https://issues.apache.org/jira/browse/SPARK-38823 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 3.4.0 > Reporter: IKozar > Priority: Major > > {code:java} > @Data > @NoArgsConstructor > @AllArgsConstructor > public static class Item implements Serializable { > private String x; > private String y; > private int z; > public Item addZ(int z) { > return new Item(x, y, this.z + z); > } > } {code} > {code:java} > List<Item> items = List.of( > new Item("X1", "Y1", 1), > new Item("X2", "Y1", 1), > new Item("X1", "Y1", 1), > new Item("X2", "Y1", 1), > new Item("X3", "Y1", 1), > new Item("X1", "Y1", 1), > new Item("X1", "Y2", 1), > new Item("X2", "Y1", 1)); > Dataset<Item> ds = spark.createDataFrame(items, > Item.class).as(Encoders.bean(Item.class)); > ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item -> > Tuple2.apply(item.getX(), item.getY()), > Encoders.tuple(Encoders.STRING(), Encoders.STRING())) > .reduceGroups((ReduceFunction<Item>) (item1, item2) -> > item1.addZ(item2.getZ())) > .show(10); > {code} > result is > {noformat} > +--------+----------------------------------------------+ > | key|ReduceAggregator(poc.job.JavaSparkReduce$Item)| > +--------+----------------------------------------------+ > |{X1, Y1}| {X2, Y1, 2}|-- expected 3 > |{X2, Y1}| {X2, Y1, 2}|-- expected 3 > |{X1, Y2}| {X2, Y1, 1}| > |{X3, Y1}| {X2, Y1, 1}| > +--------+----------------------------------------------+{noformat} > pay attention that key doesn't mach with value -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org