[ https://issues.apache.org/jira/browse/SPARK-38823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521894#comment-17521894 ]
Bruce Robbins commented on SPARK-38823: --------------------------------------- By the way, here is some code that demos the issue in spark-shell: {noformat} // repro in scala using Java APIs import org.apache.spark.api.java.function.{MapFunction, ReduceFunction} import org.apache.spark.sql.Encoders; import collection.JavaConverters._ class Item (var k: String, var v: Int) extends java.io.Serializable { def setK(value: String): Unit = { k = value } def setV(value: Int): Unit = { v = value } def getK: String = { k } def getV: Int = { v } def this() { this("", 0) } def addValue(inc: Int): Item = { new Item(k, v + inc) } override def toString: String = { s"Item($k,$v)" } } val items = Seq( new Item("a", 1), new Item("b", 3), new Item("c", 2), new Item("a", 7) ) val ds = spark.createDataFrame(items.asJava, classOf[Item]).as(Encoders.bean(classOf[Item])).coalesce(1) val mf = new MapFunction[Item, String] { override def call(item: Item): String = { println(s"Key is ${item.k} for item $item") item.k } } val kvgd1 = ds.groupByKey(mf, Encoders.STRING) val rf = new ReduceFunction[Item] { override def call(item1: Item, item2: Item): Item = { val sameRef = item1 eq item2 val msg = s"item1 $item1; item2 $item2" val newItem = item1.addValue(item2.v) println(s"$msg; new item is $newItem; sameRef is $sameRef") newItem } } kvgd1.reduceGroups(rf).show(10) {noformat} This will return {noformat} +---+----------------------------------------------------------------------------+ |key|ReduceAggregator($line20.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Item)| +---+----------------------------------------------------------------------------+ | a| {a, 7}| | b| {a, 7}| | c| {a, 7}| +---+----------------------------------------------------------------------------+ {noformat} However, it should return {noformat} +---+----------------------------------------------------------------------------+ |key|ReduceAggregator($line20.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Item)| +---+----------------------------------------------------------------------------+ | a| {a, 8}| | b| {b, 3}| | c| {c, 2}| +---+----------------------------------------------------------------------------+ {noformat} > Incorrect result of dataset reduceGroups in java > ------------------------------------------------ > > Key: SPARK-38823 > URL: https://issues.apache.org/jira/browse/SPARK-38823 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 3.3.0, 3.4.0 > Reporter: IKozar > Priority: Major > Labels: correctness > > {code:java} > @Data > @NoArgsConstructor > @AllArgsConstructor > public static class Item implements Serializable { > private String x; > private String y; > private int z; > public Item addZ(int z) { > return new Item(x, y, this.z + z); > } > } {code} > {code:java} > List<Item> items = List.of( > new Item("X1", "Y1", 1), > new Item("X2", "Y1", 1), > new Item("X1", "Y1", 1), > new Item("X2", "Y1", 1), > new Item("X3", "Y1", 1), > new Item("X1", "Y1", 1), > new Item("X1", "Y2", 1), > new Item("X2", "Y1", 1)); > Dataset<Item> ds = spark.createDataFrame(items, > Item.class).as(Encoders.bean(Item.class)); > ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item -> > Tuple2.apply(item.getX(), item.getY()), > Encoders.tuple(Encoders.STRING(), Encoders.STRING())) > .reduceGroups((ReduceFunction<Item>) (item1, item2) -> > item1.addZ(item2.getZ())) > .show(10); > {code} > result is > {noformat} > +--------+----------------------------------------------+ > | key|ReduceAggregator(poc.job.JavaSparkReduce$Item)| > +--------+----------------------------------------------+ > |{X1, Y1}| {X2, Y1, 2}|-- expected 3 > |{X2, Y1}| {X2, Y1, 2}|-- expected 3 > |{X1, Y2}| {X2, Y1, 1}| > |{X3, Y1}| {X2, Y1, 1}| > +--------+----------------------------------------------+{noformat} > pay attention that key doesn't mach with value -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org