[ 
https://issues.apache.org/jira/browse/SPARK-38823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

IKozar updated SPARK-38823:
---------------------------
    Description: 
{code:java}
  @Data
  @NoArgsConstructor
  @AllArgsConstructor
  public static class Item implements Serializable {
    private String x;
    private String y;
    private int z;

    public Item addZ(int z) {
      return new Item(x, y, this.z + z);
    }
  } {code}
{code:java}
List<Item> items = List.of(
 new Item("X1", "Y1", 1),
 new Item("X2", "Y1", 1),
 new Item("X1", "Y1", 1),
 new Item("X2", "Y1", 1),
 new Item("X3", "Y1", 1),
 new Item("X1", "Y1", 1),
 new Item("X1", "Y2", 1),
 new Item("X2", "Y1", 1)); 

Dataset<Item> ds = spark.createDataFrame(items, 
Item.class).as(Encoders.bean(Item.class)); 
ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item -> 
Tuple2.apply(item.getX(), item.getY()),
    Encoders.tuple(Encoders.STRING(), Encoders.STRING())) 
.reduceGroups((ReduceFunction<Item>) (item1, item2) -> 
  item1.addZ(item2.getZ()))
 .show(10);
{code}
result is
{noformat}
+--------+----------------------------------------------+
|     key|ReduceAggregator(poc.job.JavaSparkReduce$Item)|
+--------+----------------------------------------------+
|{X1, Y1}|                                   {X2, Y1, 2}|-- expected 3
|{X2, Y1}|                                   {X2, Y1, 2}|-- expected 3
|{X1, Y2}|                                   {X2, Y1, 1}|
|{X3, Y1}|                                   {X2, Y1, 1}|
+--------+----------------------------------------------+{noformat}

  was:
{code:java}
  @Data
  @NoArgsConstructor
  @AllArgsConstructor
  public static class Item implements Serializable {
    private String x;
    private String y;
    private int z;

    public Item addZ(int z) {
      return new Item(x, y, this.z + z);
    }
  } {code}
{code:java}
List<Item> items = List.of( new Item("X1", "Y1", 1), new Item("X2", "Y1", 1), 
new Item("X1", "Y1", 1), new Item("X2", "Y1", 1), new Item("X3", "Y1", 1), new 
Item("X1", "Y1", 1), new Item("X1", "Y2", 1), new Item("X2", "Y1", 1)); 
Dataset<Item> ds = spark.createDataFrame(items, 
Item.class).as(Encoders.bean(Item.class)); ds.groupByKey((MapFunction<Item, 
Tuple2<String, String>>) item -> Tuple2.apply(item.getX(), item.getY()), 
Encoders.tuple(Encoders.STRING(), Encoders.STRING())) 
.reduceGroups((ReduceFunction<Item>) (item1, item2) -> 
item1.addZ(item2.getZ())) .show(10);
{code}
result is
{noformat}
+--------+----------------------------------------------+
|     key|ReduceAggregator(poc.job.JavaSparkReduce$Item)|
+--------+----------------------------------------------+
|{X1, Y1}|                                   {X2, Y1, 2}|-- expected 3
|{X2, Y1}|                                   {X2, Y1, 2}|-- expected 3
|{X1, Y2}|                                   {X2, Y1, 1}|
|{X3, Y1}|                                   {X2, Y1, 1}|
+--------+----------------------------------------------+{noformat}


> Incorrect result of dataset reduceGroups in java
> ------------------------------------------------
>
>                 Key: SPARK-38823
>                 URL: https://issues.apache.org/jira/browse/SPARK-38823
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 3.2.1
>            Reporter: IKozar
>            Priority: Major
>
> {code:java}
>   @Data
>   @NoArgsConstructor
>   @AllArgsConstructor
>   public static class Item implements Serializable {
>     private String x;
>     private String y;
>     private int z;
>     public Item addZ(int z) {
>       return new Item(x, y, this.z + z);
>     }
>   } {code}
> {code:java}
> List<Item> items = List.of(
>  new Item("X1", "Y1", 1),
>  new Item("X2", "Y1", 1),
>  new Item("X1", "Y1", 1),
>  new Item("X2", "Y1", 1),
>  new Item("X3", "Y1", 1),
>  new Item("X1", "Y1", 1),
>  new Item("X1", "Y2", 1),
>  new Item("X2", "Y1", 1)); 
> Dataset<Item> ds = spark.createDataFrame(items, 
> Item.class).as(Encoders.bean(Item.class)); 
> ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item -> 
> Tuple2.apply(item.getX(), item.getY()),
>     Encoders.tuple(Encoders.STRING(), Encoders.STRING())) 
> .reduceGroups((ReduceFunction<Item>) (item1, item2) -> 
>   item1.addZ(item2.getZ()))
>  .show(10);
> {code}
> result is
> {noformat}
> +--------+----------------------------------------------+
> |     key|ReduceAggregator(poc.job.JavaSparkReduce$Item)|
> +--------+----------------------------------------------+
> |{X1, Y1}|                                   {X2, Y1, 2}|-- expected 3
> |{X2, Y1}|                                   {X2, Y1, 2}|-- expected 3
> |{X1, Y2}|                                   {X2, Y1, 1}|
> |{X3, Y1}|                                   {X2, Y1, 1}|
> +--------+----------------------------------------------+{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to