Sure thanks for clarification.  I gather what you are alluding to is -- in
a distributed environment, when one does operations that involve shuffling
or repartitioning of data, the order in which this data is processed across
partitions is not guaranteed. So when repartitioning a dataframe, the data
is redistributed across partitions, and each partition may process its
portion of the data independently and that makes the debugging distributed
systems challenging.

I hope that makes sense.

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Feb 2024 at 21:25, Jack Goodson <jackagood...@gmail.com> wrote:

> Apologies if it wasn't clear, I was meaning the difficulty of debugging,
> not floating point precision :)
>
> On Wed, Feb 14, 2024 at 2:03 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi Jack,
>>
>> "....  most SQL engines suffer from the same issue... ""
>>
>> Sure. This behavior is not a bug, but rather a consequence of the
>> limitations of floating-point precision. The numbers involved in the
>> example (see SPIP [SPARK-47024] Sum of floats/doubles may be incorrect
>> depending on partitioning - ASF JIRA (apache.org)
>> <https://issues.apache.org/jira/browse/SPARK-47024> exceed the precision
>> of the double-precision floating-point representation used by default in
>> Spark and others Interesting to have a look and test the code
>>
>> This is the code
>>
>> SUM_EXAMPLE = [
>> (1.0,), (0.0,), (1.0,), (9007199254740992.0,), ] spark = (
>> SparkSession.builder .config("spark.log.level", "ERROR") .getOrCreate() )
>> def compare_sums(data, num_partitions): df = spark.createDataFrame(data,
>> "val double").coalesce(1) result1 = df.agg(sum(col("val"))).collect()[0][0]
>> df = spark.createDataFrame(data, "val double").repartition(num_partitions) 
>> *result2
>> = df.agg(sum(col("val"))).collect()[0][0]* assert result1 == result2,
>> f"{result1}, {result2}" if __name__ == "__main__":
>> print(compare_sums(SUM_EXAMPLE, 2))
>> In Python, floating-point numbers are implemented using the IEEE 754
>> standard,
>> <https://stackoverflow.com/questions/73340696/how-is-pythons-decimal-and-other-precise-decimal-libraries-implemented-and-wh>which
>> has a limited precision. When one performs operations with very large
>> numbers or numbers with many decimal places, one may encounter precision
>> errors.
>>
>> print(compare_sums(SUM_EXAMPLE, 2)) File "issue01.py", line 23, in
>> compare_sums assert result1 == result2, f"{result1}, {result2}"
>> AssertionError: 9007199254740994.0, 9007199254740992.0
>> In the aforementioned case, the result of the aggregation (sum) is
>> affected by the precision limits of floating-point representation. The
>> difference between 9007199254740994.0, 9007199254740992.0. is within the
>> expected precision limitations of double-precision floating-point numbers.
>>
>> The likely cause in this scenario in this example
>>
>> When one performs an aggregate operation like sum on a DataFrame, the
>> operation may be affected by the order of the data.and the case here, the
>> order of data can be influenced by the number of partitions in
>> Spark..result2 above creates a new DataFrame df with the same data but
>> explicitly repartition it into two partitions
>> (repartition(num_partitions)). Repartitioning can shuffle the data
>> across partitions, introducing a different order for the subsequent
>> aggregation. The sum operation is then performed on the data in a
>> different order, leading to a slightly different result from result1
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 13 Feb 2024 at 03:06, Jack Goodson <jackagood...@gmail.com>
>> wrote:
>>
>>> I may be ignorant of other debugging methods in Spark but the best
>>> success I've had is using smaller datasets (if runs take a long time) and
>>> adding intermediate output steps. This is quite different from application
>>> development in non-distributed systems where a debugger is trivial to
>>> attach but I believe it's one of the trade offs on using a system like
>>> Spark for data processing, most SQL engines suffer from the same issue. If
>>> you do believe there is a bug in Spark using the explain function like
>>> Herman mentioned helps as well as looking at the Spark plan in the Spark UI
>>>
>>> On Tue, Feb 13, 2024 at 9:24 AM Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
>>>> OK, I figured it out. The details are in SPARK-47024
>>>> <https://issues.apache.org/jira/browse/SPARK-47024> for anyone who’s
>>>> interested.
>>>>
>>>> It turned out to be a floating point arithmetic “bug”. The main reason
>>>> I was able to figure it out was because I’ve been investigating another,
>>>> unrelated bug (a real bug) related to floats, so these weird float corner
>>>> cases have been top of mind.
>>>>
>>>> If it weren't for that, I wonder how much progress I would have made.
>>>> Though I could inspect the generated code, I couldn’t figure out how to get
>>>> logging statements placed in the generated code to print somewhere I could
>>>> see them.
>>>>
>>>> Depending on how often we find ourselves debugging aggregates like
>>>> this, it would be really helpful if we added some way to trace the
>>>> aggregation buffer.
>>>>
>>>> In any case, mystery solved. Thank you for the pointer!
>>>>
>>>>
>>>> On Feb 12, 2024, at 8:39 AM, Herman van Hovell <her...@databricks.com>
>>>> wrote:
>>>>
>>>> There is no really easy way of getting the state of the aggregation
>>>> buffer, unless you are willing to modify the code generation and sprinkle
>>>> in some logging.
>>>>
>>>> What I would start with is dumping the generated code by calling
>>>> explain('codegen') on the DataFrame. That helped me to find similar issues
>>>> in most cases.
>>>>
>>>> HTH
>>>>
>>>> On Sun, Feb 11, 2024 at 11:26 PM Nicholas Chammas <
>>>> nicholas.cham...@gmail.com> wrote:
>>>>
>>>>> Consider this example:
>>>>>
>>>>> >>> from pyspark.sql.functions import sum>>> 
>>>>> >>> spark.range(4).repartition(2).select(sum("id")).show()+-------+|sum(id)|+-------+|
>>>>> >>>   6    |+-------+
>>>>>
>>>>>
>>>>> I’m trying to understand how this works because I’m investigating a
>>>>> bug in this kind of aggregate.
>>>>>
>>>>> I see that doProduceWithoutKeys
>>>>> <https://github.com/apache/spark/blob/d02fbba6491fd17dc6bfc1a416971af7544952f3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala#L98>
>>>>>  and doConsumeWithoutKeys
>>>>> <https://github.com/apache/spark/blob/d02fbba6491fd17dc6bfc1a416971af7544952f3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala#L193>
>>>>>  are
>>>>> called, and I believe they are responsible for computing a declarative
>>>>> aggregate like `sum`. But I’m not sure how I would debug the generated
>>>>> code, or the inputs that drive what code gets generated.
>>>>>
>>>>> Say you were running the above example and it was producing an
>>>>> incorrect result, and you knew the problem was somehow related to the sum.
>>>>> How would you troubleshoot it to identify the root cause?
>>>>>
>>>>> Ideally, I would like some way to track how the aggregation buffer
>>>>> mutates as the computation is executed, so I can see something roughly 
>>>>> like:
>>>>>
>>>>> [0, 1, 2, 3]
>>>>> [1, 5]
>>>>> [6]
>>>>>
>>>>>
>>>>> Is there some way to trace a declarative aggregate like this?
>>>>>
>>>>> Nick
>>>>>
>>>>>
>>>>

Reply via email to