[ 
https://issues.apache.org/jira/browse/SPARK-38823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521894#comment-17521894
 ] 

Bruce Robbins commented on SPARK-38823:
---------------------------------------

By the way, here is some code that demos the issue in spark-shell:
{noformat}
// repro in scala using Java APIs
import org.apache.spark.api.java.function.{MapFunction, ReduceFunction}
import org.apache.spark.sql.Encoders;
import collection.JavaConverters._

class Item (var k: String, var v: Int) extends java.io.Serializable {  
  def setK(value: String): Unit = {
    k = value
  }
  def setV(value: Int): Unit = {
    v = value
  }
  def getK: String = {
    k
  }
  def getV: Int = {
    v
  }
  def this() {
    this("", 0)
  }

  def addValue(inc: Int): Item = {
    new Item(k, v + inc)
  }

  override def toString: String = {
    s"Item($k,$v)"
  }
}

val items = Seq(
  new Item("a", 1),
  new Item("b", 3),
  new Item("c", 2),
  new Item("a", 7)
)

val ds = spark.createDataFrame(items.asJava, 
classOf[Item]).as(Encoders.bean(classOf[Item])).coalesce(1)

val mf = new MapFunction[Item, String] {
  override def call(item: Item): String = {
    println(s"Key is ${item.k} for item $item")
    item.k
  }
}

val kvgd1 = ds.groupByKey(mf, Encoders.STRING)

val rf = new ReduceFunction[Item] {
  override def call(item1: Item, item2: Item): Item = {
    val sameRef = item1 eq item2
    val msg = s"item1 $item1; item2 $item2"
    val newItem = item1.addValue(item2.v)
    println(s"$msg; new item is $newItem; sameRef is $sameRef")
    newItem
  }
}
 
kvgd1.reduceGroups(rf).show(10)
{noformat}
This will return
{noformat}
+---+----------------------------------------------------------------------------+
|key|ReduceAggregator($line20.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Item)|
+---+----------------------------------------------------------------------------+
|  a|                                                                      {a, 
7}|
|  b|                                                                      {a, 
7}|
|  c|                                                                      {a, 
7}|
+---+----------------------------------------------------------------------------+
{noformat}
However, it should return
{noformat}
+---+----------------------------------------------------------------------------+
|key|ReduceAggregator($line20.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Item)|
+---+----------------------------------------------------------------------------+
|  a|                                                                      {a, 
8}|
|  b|                                                                      {b, 
3}|
|  c|                                                                      {c, 
2}|
+---+----------------------------------------------------------------------------+
{noformat}

> Incorrect result of dataset reduceGroups in java
> ------------------------------------------------
>
>                 Key: SPARK-38823
>                 URL: https://issues.apache.org/jira/browse/SPARK-38823
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 3.3.0, 3.4.0
>            Reporter: IKozar
>            Priority: Major
>              Labels: correctness
>
> {code:java}
>   @Data
>   @NoArgsConstructor
>   @AllArgsConstructor
>   public static class Item implements Serializable {
>     private String x;
>     private String y;
>     private int z;
>     public Item addZ(int z) {
>       return new Item(x, y, this.z + z);
>     }
>   } {code}
> {code:java}
> List<Item> items = List.of(
>  new Item("X1", "Y1", 1),
>  new Item("X2", "Y1", 1),
>  new Item("X1", "Y1", 1),
>  new Item("X2", "Y1", 1),
>  new Item("X3", "Y1", 1),
>  new Item("X1", "Y1", 1),
>  new Item("X1", "Y2", 1),
>  new Item("X2", "Y1", 1)); 
> Dataset<Item> ds = spark.createDataFrame(items, 
> Item.class).as(Encoders.bean(Item.class)); 
> ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item -> 
> Tuple2.apply(item.getX(), item.getY()),
>     Encoders.tuple(Encoders.STRING(), Encoders.STRING())) 
> .reduceGroups((ReduceFunction<Item>) (item1, item2) -> 
>   item1.addZ(item2.getZ()))
>  .show(10);
> {code}
> result is
> {noformat}
> +--------+----------------------------------------------+
> |     key|ReduceAggregator(poc.job.JavaSparkReduce$Item)|
> +--------+----------------------------------------------+
> |{X1, Y1}|                                   {X2, Y1, 2}|-- expected 3
> |{X2, Y1}|                                   {X2, Y1, 2}|-- expected 3
> |{X1, Y2}|                                   {X2, Y1, 1}|
> |{X3, Y1}|                                   {X2, Y1, 1}|
> +--------+----------------------------------------------+{noformat}
> pay attention that key doesn't mach with value



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to