[ 
https://issues.apache.org/jira/browse/SPARK-47024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-47024:
-------------------------------------
    Description: 
I found this problem using 
[Hypothesis|https://hypothesis.readthedocs.io/en/latest/].

Here's a reproduction that fails on {{{}master{}}}, 3.5.0, 3.4.2, and 3.3.4 
(and probably all prior versions as well):
{code:python}
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

SUM_EXAMPLE = [
    (1.0,),
    (0.0,),
    (1.0,),
    (9007199254740992.0,),
]

spark = (
    SparkSession.builder
    .config("spark.log.level", "ERROR")
    .getOrCreate()
)


def compare_sums(data, num_partitions):
    df = spark.createDataFrame(data, "val double").coalesce(1)
    result1 = df.agg(sum(col("val"))).collect()[0][0]
    df = spark.createDataFrame(data, "val double").repartition(num_partitions)
    result2 = df.agg(sum(col("val"))).collect()[0][0]
    assert result1 == result2, f"{result1}, {result2}"


if __name__ == "__main__":
    print(compare_sums(SUM_EXAMPLE, 2))
{code}
This fails as follows:
{code:python}
AssertionError: 9007199254740994.0, 9007199254740992.0
{code}
I suspected some kind of problem related to code generation, so tried setting 
all of these to {{{}false{}}}:
 * {{spark.sql.codegen.wholeStage}}
 * {{spark.sql.codegen.aggregate.map.twolevel.enabled}}
 * {{spark.sql.codegen.aggregate.splitAggregateFunc.enabled}}

But this did not change the behavior.

Somehow, the partitioning of the data affects the computed sum.

  was:Will fill in the details shortly.

        Summary: Sum of floats/doubles may be incorrect depending on 
partitioning  (was: Sum is incorrect (exact cause currently unknown))

Sadly, I think this is a case where we may not be able to do anything. The 
problem appears to be a classic case of floating point arithmetic going wrong.
{code:scala}
scala> 9007199254740992.0 + 1.0
val res0: Double = 9.007199254740992E15

scala> 9007199254740992.0 + 2.0
val res1: Double = 9.007199254740994E15
{code}
Notice how adding {{1.0}} did not change the large value, whereas adding 
{{2.0}} did.

So what I believe is happening is that, depending on the order in which the 
rows happen to be added, we either hit or do not hit this corner case.

In other words, if the aggregation goes like this:
{code:java}
(1.0 + 1.0) + (0.0 + 9007199254740992.0)
2.0 + 9007199254740992.0
9007199254740994.0
{code}
Then there is no problem.

However, if we are unlucky and it goes like this:
{code:java}
(1.0 + 0.0) + (1.0 + 9007199254740992.0)
1.0 + 9007199254740992.0
9007199254740992.0
{code}
Then we get the incorrect result shown in the description above.

This violates what I believe should be an invariant in Spark: That declarative 
aggregates like {{sum}} do not compute different results depending on accidents 
of row order or partitioning.

However, given that this is a basic problem of floating point arithmetic, I 
doubt we can really do anything here.

Note that there are many such "special" numbers that have this problem, not 
just 9007199254740992.0:
{code:scala}
scala> 1.7168917017330176e+16 + 1.0
val res2: Double = 1.7168917017330176E16

scala> 1.7168917017330176e+16 + 2.0
val res3: Double = 1.7168917017330178E16
{code}

> Sum of floats/doubles may be incorrect depending on partitioning
> ----------------------------------------------------------------
>
>                 Key: SPARK-47024
>                 URL: https://issues.apache.org/jira/browse/SPARK-47024
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.2, 3.5.0, 3.3.4
>            Reporter: Nicholas Chammas
>            Priority: Major
>              Labels: correctness
>
> I found this problem using 
> [Hypothesis|https://hypothesis.readthedocs.io/en/latest/].
> Here's a reproduction that fails on {{{}master{}}}, 3.5.0, 3.4.2, and 3.3.4 
> (and probably all prior versions as well):
> {code:python}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col, sum
> SUM_EXAMPLE = [
>     (1.0,),
>     (0.0,),
>     (1.0,),
>     (9007199254740992.0,),
> ]
> spark = (
>     SparkSession.builder
>     .config("spark.log.level", "ERROR")
>     .getOrCreate()
> )
> def compare_sums(data, num_partitions):
>     df = spark.createDataFrame(data, "val double").coalesce(1)
>     result1 = df.agg(sum(col("val"))).collect()[0][0]
>     df = spark.createDataFrame(data, "val double").repartition(num_partitions)
>     result2 = df.agg(sum(col("val"))).collect()[0][0]
>     assert result1 == result2, f"{result1}, {result2}"
> if __name__ == "__main__":
>     print(compare_sums(SUM_EXAMPLE, 2))
> {code}
> This fails as follows:
> {code:python}
> AssertionError: 9007199254740994.0, 9007199254740992.0
> {code}
> I suspected some kind of problem related to code generation, so tried setting 
> all of these to {{{}false{}}}:
>  * {{spark.sql.codegen.wholeStage}}
>  * {{spark.sql.codegen.aggregate.map.twolevel.enabled}}
>  * {{spark.sql.codegen.aggregate.splitAggregateFunc.enabled}}
> But this did not change the behavior.
> Somehow, the partitioning of the data affects the computed sum.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to