[ https://issues.apache.org/jira/browse/SPARK-28067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark Sirek updated SPARK-28067: ------------------------------- Description: The following test case involving a join followed by a sum aggregation returns the wrong answer for the sum: {code:java} val df = Seq( (BigDecimal("10000000000000000000"), 1), (BigDecimal("10000000000000000000"), 1), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) scala> df2.show(40,false) --------------------------------------- sum(decNum) --------------------------------------- 40000000000000000000.000000000000000000 --------------------------------------- {code} The result should be 1040000000000000000000.0000000000000000. It appears a partial sum is computed for each join key, as the result returned would be the answer for all rows matching intNum === 1. If only the rows with intNum === 2 are included, the answer given is null: {code:java} scala> val df3 = df.filter($"intNum" === lit(2)) df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [decNum: decimal(38,18), intNum: int] scala> val df4 = df3.withColumnRenamed("decNum", "decNum2").join(df3, "intNum").agg(sum("decNum")) df4: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df4.show(40,false) ----------- sum(decNum) ----------- null ----------- {code} The correct answer, 1000000000000000000000.0000000000000000, doesn't fit in the DataType picked for the result, decimal(38,18), so the overflow is converted to null. The first example, which doesn't filter out the intNum === 1 values should also return null, indicating overflow, but it doesn't. This may mislead the user to think a valid sum was computed. If whole-stage code gen is turned off: spark.conf.set("spark.sql.codegen.wholeStage", false) ... incorrect results are not returned because the overflow is caught as an exception: java.lang.IllegalArgumentException: requirement failed: Decimal precision 39 exceeds max precision 38 was: The following test case involving a join followed by a sum aggregation returns the wrong answer for the sum: val df = Seq( (BigDecimal("10000000000000000000"), 1), (BigDecimal("10000000000000000000"), 1), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2), (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) scala> df2.show(40,false) +---------------------------------------+ |sum(decNum)| +---------------------------------------+ |40000000000000000000.000000000000000000| +---------------------------------------+ The result should be 1040000000000000000000.0000000000000000. It appears a partial sum is computed for each join key, as the result returned would be the answer for all rows matching intNum === 1. If only the rows with intNum === 2 are included, the answer given is null: scala> val df3 = df.filter($"intNum" === lit(2)) df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [decNum: decimal(38,18), intNum: int] scala> val df4 = df3.withColumnRenamed("decNum", "decNum2").join(df3, "intNum").agg(sum("decNum")) df4: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df4.show(40,false) +-----------+ |sum(decNum)| +-----------+ |null| +-----------+ The correct answer, 1000000000000000000000.0000000000000000, doesn't fit in the DataType picked for the result, decimal(38,18), so an overflow occurs, which seems to be converted to null. The first example, which doesn't filter out the intNum === 1 values should also return null, indicating overflow, but it doesn't. This may mislead the user to think a valid sum was computed. If whole-stage code gen is turned off: spark.conf.set("spark.sql.codegen.wholeStage", false) ... incorrect results are not returned because the overflow is caught as an exception: java.lang.IllegalArgumentException: requirement failed: Decimal precision 39 exceeds max precision 38 > Incorrect results in decimal aggregation with whole-stage code gen enabled > -------------------------------------------------------------------------- > > Key: SPARK-28067 > URL: https://issues.apache.org/jira/browse/SPARK-28067 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.0, 2.4.0 > Environment: Ubuntu LTS 16.04 > Oracle Java 1.8.0_201 > spark-2.4.3-bin-without-hadoop > spark-shell > Reporter: Mark Sirek > Priority: Minor > > The following test case involving a join followed by a sum aggregation > returns the wrong answer for the sum: > > {code:java} > val df = Seq( > (BigDecimal("10000000000000000000"), 1), > (BigDecimal("10000000000000000000"), 1), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2), > (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") > val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, > "intNum").agg(sum("decNum")) > scala> df2.show(40,false) > --------------------------------------- > sum(decNum) > --------------------------------------- > 40000000000000000000.000000000000000000 > --------------------------------------- > > {code} > > The result should be 1040000000000000000000.0000000000000000. > It appears a partial sum is computed for each join key, as the result > returned would be the answer for all rows matching intNum === 1. > If only the rows with intNum === 2 are included, the answer given is null: > > {code:java} > scala> val df3 = df.filter($"intNum" === lit(2)) > df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [decNum: > decimal(38,18), intNum: int] > scala> val df4 = df3.withColumnRenamed("decNum", "decNum2").join(df3, > "intNum").agg(sum("decNum")) > df4: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] > scala> df4.show(40,false) > ----------- > sum(decNum) > ----------- > null > ----------- > > {code} > > The correct answer, 1000000000000000000000.0000000000000000, doesn't fit in > the DataType picked for the result, decimal(38,18), so the overflow is > converted to null. > The first example, which doesn't filter out the intNum === 1 values should > also return null, indicating overflow, but it doesn't. This may mislead the > user to think a valid sum was computed. > If whole-stage code gen is turned off: > spark.conf.set("spark.sql.codegen.wholeStage", false) > ... incorrect results are not returned because the overflow is caught as an > exception: > java.lang.IllegalArgumentException: requirement failed: Decimal precision 39 > exceeds max precision 38 > > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org