[ 
https://issues.apache.org/jira/browse/SPARK-38823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521414#comment-17521414
 ] 

Bruce Robbins commented on SPARK-38823:
---------------------------------------

This appears to be an optimization bug that results in corruption of the 
buffers in {{AggregationIterator}}.

On master and 3.3, {{NewInstance}} with no arguments is considered foldable. As 
a result, the {{ConstantFolding}} rule turns NewInstance into a Literal holding 
an instance of the user's specified Java bean. The instance becomes a singleton 
that gets reused for each input record (although its fields get updated by 
{{InitializeJavaBean}}).

Because the instance gets reused, sometimes multiple buffers in 
{{AggregationIterator}} are actually referring to the same Java bean instance.

Take, for example, the test I added 
[here|https://github.com/bersprockets/spark/blob/17a8ad64f5bc39cb26d25b63f3692e7b8632baf8/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java#L560].

The input is:
{noformat}
List<Item> items = Arrays.asList(
    new Item("a", 1),
    new Item("b", 3),
    new Item("c", 2),
    new Item("a", 7));
{noformat}
As {{ObjectAggregationIterator}} reads the input, the buffers get set up as 
follows (note that the first field of Item should be the same as the key):
{noformat}
- Read Item("a", 1)

- Buffers are now:
  Key "a" --> Item("a", 1)

- Read Item("b", 3)

- Buffers are now:
  Key "a" -> Item("b", 3)
  Key "b" -> Item("b", 3)
{noformat}
The buffer for key "a" now contains Item("b", 3). That's because both buffers 
contain a reference to the same Item instance, and that Item instance's fields 
were updated when {{Item("b", 3)}} was read.

When {{AggregateIterator}} finally calls the test's reduce function, it will 
pass the same Item instance ({{Item("a", 7)}}) as both the buffer and the input 
record. At that point, the buffers for "a", "b", and "c" will all contain 
{{Item("a", 7)}}.

I _think_ the fix for this is to make {{NewInstance}} non-foldable. My linked 
test passes with that change (and fails without it). I will run the unit tests 
and hopefully make a PR tomorrow, assuming the proposed fix doesn't break 
something else besides {{ConstantFoldingSuite}}.


> Incorrect result of dataset reduceGroups in java
> ------------------------------------------------
>
>                 Key: SPARK-38823
>                 URL: https://issues.apache.org/jira/browse/SPARK-38823
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 3.4.0
>            Reporter: IKozar
>            Priority: Major
>
> {code:java}
>   @Data
>   @NoArgsConstructor
>   @AllArgsConstructor
>   public static class Item implements Serializable {
>     private String x;
>     private String y;
>     private int z;
>     public Item addZ(int z) {
>       return new Item(x, y, this.z + z);
>     }
>   } {code}
> {code:java}
> List<Item> items = List.of(
>  new Item("X1", "Y1", 1),
>  new Item("X2", "Y1", 1),
>  new Item("X1", "Y1", 1),
>  new Item("X2", "Y1", 1),
>  new Item("X3", "Y1", 1),
>  new Item("X1", "Y1", 1),
>  new Item("X1", "Y2", 1),
>  new Item("X2", "Y1", 1)); 
> Dataset<Item> ds = spark.createDataFrame(items, 
> Item.class).as(Encoders.bean(Item.class)); 
> ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item -> 
> Tuple2.apply(item.getX(), item.getY()),
>     Encoders.tuple(Encoders.STRING(), Encoders.STRING())) 
> .reduceGroups((ReduceFunction<Item>) (item1, item2) -> 
>   item1.addZ(item2.getZ()))
>  .show(10);
> {code}
> result is
> {noformat}
> +--------+----------------------------------------------+
> |     key|ReduceAggregator(poc.job.JavaSparkReduce$Item)|
> +--------+----------------------------------------------+
> |{X1, Y1}|                                   {X2, Y1, 2}|-- expected 3
> |{X2, Y1}|                                   {X2, Y1, 2}|-- expected 3
> |{X1, Y2}|                                   {X2, Y1, 1}|
> |{X3, Y1}|                                   {X2, Y1, 1}|
> +--------+----------------------------------------------+{noformat}
> pay attention that key doesn't mach with value



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to