[ 
https://issues.apache.org/jira/browse/SPARK-54090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aimilios Tsouvelekakis updated SPARK-54090:
-------------------------------------------
    Description: 
When we try to do assertDataFrameEqual, on two dataframes that have not the 
same amount of rows, the output gets cascading from the difference till the 
end. Why this is happening:

[https://github.com/apache/spark/blob/067969ff946712eeabf47040415f25000837cd87/python/pyspark/testing/utils.py#L1036]
{code:java}
    def assert_rows_equal(
        rows1: List[Row], rows2: List[Row], maxErrors: int = None, 
showOnlyDiff: bool = False
    ):
        __tracebackhide__ = True
        zipped = list(zip_longest(rows1, rows2))
        diff_rows_cnt = 0
        diff_rows = []
        has_diff_rows = False        
        rows_str1 = ""
        rows_str2 = ""        
        
        # count different rows
        for r1, r2 in zipped:
            if not compare_rows(r1, r2):
                diff_rows_cnt += 1
                has_diff_rows = True
                if includeDiffRows:
                    diff_rows.append((r1, r2))
                rows_str1 += str(r1) + "\n"
                rows_str2 += str(r2) + "\n"
                if maxErrors is not None and diff_rows_cnt >= maxErrors:
                    break
            elif not showOnlyDiff:
                rows_str1 += str(r1) + "\n"
                rows_str2 += str(r2) + "\n"        
        generated_diff = _context_diff(
            actual=rows_str1.splitlines(), expected=rows_str2.splitlines(), 
n=len(zipped)
        )        
        if has_diff_rows:
            error_msg = "Results do not match: "
            percent_diff = (diff_rows_cnt / len(zipped)) * 100
            error_msg += "( %.5f %% )" % percent_diff
            error_msg += "\n" + "\n".join(generated_diff)
            data = diff_rows if includeDiffRows else None
            raise PySparkAssertionError(
                errorClass="DIFFERENT_ROWS", messageParameters={"error_msg": 
error_msg}, data=data
            ){code}
The problem lies in the way that we zip the lines
{code:java}
zipped = list(zip_longest(rows1, rows2)){code}
With zip longest we assume that the rows are in order and we do position by 
position comparison but it does not work well with checkRowOrder which defaults 
to False.

If I have 1 line difference in 100 line dataframe the result percentage won't 
be 1% but the amount of rows that cascade towards on from that difference. 

The best solution here would be to have a set based comparison and return the 
percentage and the rows over that.

A sample of what zip_longest is doing:
{code:java}
from itertools import zip_longest
rows1 = [    'A',    'B',    'C',    'D',    'E']
rows2 = [    'A',    'C',    'D',]
zipped = list(zip_longest(rows1, rows2))zipped {code}
Result:
{code:java}
[('A', 'A'), ('B', 'C'), ('C', 'D'), ('D', None), ('E', None)]{code}
So in this case we would have 80% rows failure.

This comes directly with the implication of CheckRowOrder, when it is True we 
do not sort and it makes sense to use the zip_longest, when it is False it 
makes sense to use set based comparison since we have already sorted.

 

  was:
When we try to do assertDataFrameEqual, on two dataframes that have not the 
same amount of rows, the output gets cascading from the difference till the 
end. Why this is happening:

[https://github.com/apache/spark/blob/067969ff946712eeabf47040415f25000837cd87/python/pyspark/testing/utils.py#L1036]
{code:java}
    def assert_rows_equal(
        rows1: List[Row], rows2: List[Row], maxErrors: int = None, 
showOnlyDiff: bool = False
    ):
        __tracebackhide__ = True
        zipped = list(zip_longest(rows1, rows2))
        diff_rows_cnt = 0
        diff_rows = []
        has_diff_rows = False        
        rows_str1 = ""
        rows_str2 = ""        
        
        # count different rows
        for r1, r2 in zipped:
            if not compare_rows(r1, r2):
                diff_rows_cnt += 1
                has_diff_rows = True
                if includeDiffRows:
                    diff_rows.append((r1, r2))
                rows_str1 += str(r1) + "\n"
                rows_str2 += str(r2) + "\n"
                if maxErrors is not None and diff_rows_cnt >= maxErrors:
                    break
            elif not showOnlyDiff:
                rows_str1 += str(r1) + "\n"
                rows_str2 += str(r2) + "\n"        
        generated_diff = _context_diff(
            actual=rows_str1.splitlines(), expected=rows_str2.splitlines(), 
n=len(zipped)
        )        
        if has_diff_rows:
            error_msg = "Results do not match: "
            percent_diff = (diff_rows_cnt / len(zipped)) * 100
            error_msg += "( %.5f %% )" % percent_diff
            error_msg += "\n" + "\n".join(generated_diff)
            data = diff_rows if includeDiffRows else None
            raise PySparkAssertionError(
                errorClass="DIFFERENT_ROWS", messageParameters={"error_msg": 
error_msg}, data=data
            ){code}
The problem lies in the way that we zip the lines
{code:java}
zipped = list(zip_longest(rows1, rows2)){code}
With zip longest we assume that the rows are in order and we do position by 
position comparison but it does not work well with checkRowOrder which defaults 
to False.

If I have 1 line difference in 100 line dataframe the result percentage won't 
be 1% but the amount of rows that cascade towards on from that difference. 

The best solution here would be to have a set based comparison and return the 
percentage and the rows over that.

A sample of what zip_longest is doing:
{code:java}
from itertools import zip_longest
rows1 = [    'A',    'B',    'C',    'D',    'E']
rows2 = [    'A',    'C',    'D',]
zipped = list(zip_longest(rows1, rows2))zipped {code}
Result:
{code:java}
[('A', 'A'), ('B', 'C'), ('C', 'D'), ('D', None), ('E', None)] Code Text 
Assistant {code}
So in this case we would have 80% rows failure.

This comes directly with the implication of CheckRowOrder, when it is True we 
do not sort and it makes sense to use the zip_longest, when it is False it 
makes sense to use set based comparison since we have already sorted.

 


> AssertDataframeEqual carries rows when showing differences
> ----------------------------------------------------------
>
>                 Key: SPARK-54090
>                 URL: https://issues.apache.org/jira/browse/SPARK-54090
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.0.1
>            Reporter: Aimilios Tsouvelekakis
>            Priority: Major
>
> When we try to do assertDataFrameEqual, on two dataframes that have not the 
> same amount of rows, the output gets cascading from the difference till the 
> end. Why this is happening:
> [https://github.com/apache/spark/blob/067969ff946712eeabf47040415f25000837cd87/python/pyspark/testing/utils.py#L1036]
> {code:java}
>     def assert_rows_equal(
>         rows1: List[Row], rows2: List[Row], maxErrors: int = None, 
> showOnlyDiff: bool = False
>     ):
>         __tracebackhide__ = True
>         zipped = list(zip_longest(rows1, rows2))
>         diff_rows_cnt = 0
>         diff_rows = []
>         has_diff_rows = False        
>         rows_str1 = ""
>         rows_str2 = ""        
>         
>         # count different rows
>         for r1, r2 in zipped:
>             if not compare_rows(r1, r2):
>                 diff_rows_cnt += 1
>                 has_diff_rows = True
>                 if includeDiffRows:
>                     diff_rows.append((r1, r2))
>                 rows_str1 += str(r1) + "\n"
>                 rows_str2 += str(r2) + "\n"
>                 if maxErrors is not None and diff_rows_cnt >= maxErrors:
>                     break
>             elif not showOnlyDiff:
>                 rows_str1 += str(r1) + "\n"
>                 rows_str2 += str(r2) + "\n"        
>         generated_diff = _context_diff(
>             actual=rows_str1.splitlines(), expected=rows_str2.splitlines(), 
> n=len(zipped)
>         )        
>         if has_diff_rows:
>             error_msg = "Results do not match: "
>             percent_diff = (diff_rows_cnt / len(zipped)) * 100
>             error_msg += "( %.5f %% )" % percent_diff
>             error_msg += "\n" + "\n".join(generated_diff)
>             data = diff_rows if includeDiffRows else None
>             raise PySparkAssertionError(
>                 errorClass="DIFFERENT_ROWS", messageParameters={"error_msg": 
> error_msg}, data=data
>             ){code}
> The problem lies in the way that we zip the lines
> {code:java}
> zipped = list(zip_longest(rows1, rows2)){code}
> With zip longest we assume that the rows are in order and we do position by 
> position comparison but it does not work well with checkRowOrder which 
> defaults to False.
> If I have 1 line difference in 100 line dataframe the result percentage won't 
> be 1% but the amount of rows that cascade towards on from that difference. 
> The best solution here would be to have a set based comparison and return the 
> percentage and the rows over that.
> A sample of what zip_longest is doing:
> {code:java}
> from itertools import zip_longest
> rows1 = [    'A',    'B',    'C',    'D',    'E']
> rows2 = [    'A',    'C',    'D',]
> zipped = list(zip_longest(rows1, rows2))zipped {code}
> Result:
> {code:java}
> [('A', 'A'), ('B', 'C'), ('C', 'D'), ('D', None), ('E', None)]{code}
> So in this case we would have 80% rows failure.
> This comes directly with the implication of CheckRowOrder, when it is True we 
> do not sort and it makes sense to use the zip_longest, when it is False it 
> makes sense to use set based comparison since we have already sorted.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to