[ 
https://issues.apache.org/jira/browse/SPARK-32989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ondrej Kokes updated SPARK-32989:
---------------------------------
    Description: 
When I create a map using str_to_map and select more than a single value, I 
notice a notable performance regression in 3.0.1 compared to 2.4.7. When 
selecting a single value, the performance is the same. Plans are identical 
between versions.

It seems like in 2.x the map from str_to_map is preserved for a given row, but 
in 3.x it's recalculated for each column. One hint that it might be the case is 
that when I tried forcing materialisation of said map in 3.x (by a coalesce, 
don't know if there's a better way), I got the performance roughly to 2.x 
levels.

Here's a reproducer (the csv in question gets autogenerated by the python code):
{code:java}
$ head regression.csv 
foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
... (10M more rows)
{code}
{code:python}
import time

import pyspark  
from pyspark.sql import SparkSession

import pyspark.sql.functions as f

if __name__ == '__main__':
    print(pyspark.__version__)
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.option('header', True).csv('regression.csv')
    t = time.time()
    dd = (df
            .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
            .select(
                f.col('my_map')['foo'],
            )
        )
    dd.write.mode('overwrite').csv('tmp')
    t2 = time.time()
    print('selected one', t2 - t)

    dd = (df
            .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
            # .coalesce(100) # forcing evaluation before selection speeds it up 
in 3.0.1
            .select(
                f.col('my_map')['foo'],
                f.col('my_map')['bar'],
                f.col('my_map')['baz'],
            )
        )
    dd.explain(True)
    dd.write.mode('overwrite').csv('tmp')
    t3 = time.time()
    print('selected three', t3 - t2)
{code}
Results for 2.4.7 and 3.0.1, both installed from PyPI, Python 3.7, macOS (times 
are in seconds)
{code:java}
# 3.0.1
# selected one 6.375471830368042                                                
  
# selected three 14.847578048706055

# 2.4.7
# selected one 6.679579019546509                                                
  
# selected three 6.5622029304504395  
{code}

  was:
When I create a map using str_to_map and select more than a single value, I 
notice a notable performance regression in 3.0.1 compared to 2.4.7. When 
selecting a single value, the performance is the same. Plans are identical 
between versions.

It seems like in 2.x the map from str_to_map is preserved for a given row, but 
in 3.x it's recalculated for each column. One hint that it might be the case is 
that when I tried forcing materialisation of said map in 3.x (by a coalesce, 
don't know if there's a better way), I got the performance roughly to 2.x 
levels.

Here's a reproducer (the CSV gets autogenerated by the python code):

{code}
$ head regression.csv 
foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
foo=bar&baz=bak&bar=foo
... (10M more rows)
{code}

{code:python}
import time
import os

import pyspark  
from pyspark.sql import SparkSession

import pyspark.sql.functions as f

if __name__ == '__main__':
    print(pyspark.__version__)
    spark = SparkSession.builder.getOrCreate()

    filename = 'regression.csv'
    if not os.path.isfile(filename):
        with open(filename, 'wt') as fw:
            fw.write('foo\n')
            for _ in range(10_000_000):
                fw.write('foo=bar&baz=bak&bar=foo\n')

    df = spark.read.option('header', True).csv(filename)
    t = time.time()
    dd = (df
            .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
            .select(
                f.col('my_map')['foo'],
            )
        )
    dd.write.mode('overwrite').csv('tmp')
    t2 = time.time()
    print('selected one', t2 - t)

    dd = (df
            .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
            # .coalesce(100) # forcing evaluation before selection speeds it up 
in 3.0.1
            .select(
                f.col('my_map')['foo'],
                f.col('my_map')['bar'],
                f.col('my_map')['baz'],
            )
        )
    dd.explain(True)
    dd.write.mode('overwrite').csv('tmp')
    t3 = time.time()
    print('selected three', t3 - t2)
{code}

Results for 2.4.7 and 3.0.1, both installed from PyPI, Python 3.7, macOS (times 
are in seconds)

{code}
# 3.0.1
# selected one 6.375471830368042                                                
  
# selected three 14.847578048706055

# 2.4.7
# selected one 6.679579019546509                                                
  
# selected three 6.5622029304504395  
{code}



> Performance regression when selecting from str_to_map
> -----------------------------------------------------
>
>                 Key: SPARK-32989
>                 URL: https://issues.apache.org/jira/browse/SPARK-32989
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.1
>            Reporter: Ondrej Kokes
>            Priority: Minor
>
> When I create a map using str_to_map and select more than a single value, I 
> notice a notable performance regression in 3.0.1 compared to 2.4.7. When 
> selecting a single value, the performance is the same. Plans are identical 
> between versions.
> It seems like in 2.x the map from str_to_map is preserved for a given row, 
> but in 3.x it's recalculated for each column. One hint that it might be the 
> case is that when I tried forcing materialisation of said map in 3.x (by a 
> coalesce, don't know if there's a better way), I got the performance roughly 
> to 2.x levels.
> Here's a reproducer (the csv in question gets autogenerated by the python 
> code):
> {code:java}
> $ head regression.csv 
> foo
> foo=bar&baz=bak&bar=foo
> foo=bar&baz=bak&bar=foo
> foo=bar&baz=bak&bar=foo
> foo=bar&baz=bak&bar=foo
> foo=bar&baz=bak&bar=foo
> ... (10M more rows)
> {code}
> {code:python}
> import time
> import pyspark  
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as f
> if __name__ == '__main__':
>     print(pyspark.__version__)
>     spark = SparkSession.builder.getOrCreate()
>     df = spark.read.option('header', True).csv('regression.csv')
>     t = time.time()
>     dd = (df
>             .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
>             .select(
>                 f.col('my_map')['foo'],
>             )
>         )
>     dd.write.mode('overwrite').csv('tmp')
>     t2 = time.time()
>     print('selected one', t2 - t)
>     dd = (df
>             .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
>             # .coalesce(100) # forcing evaluation before selection speeds it 
> up in 3.0.1
>             .select(
>                 f.col('my_map')['foo'],
>                 f.col('my_map')['bar'],
>                 f.col('my_map')['baz'],
>             )
>         )
>     dd.explain(True)
>     dd.write.mode('overwrite').csv('tmp')
>     t3 = time.time()
>     print('selected three', t3 - t2)
> {code}
> Results for 2.4.7 and 3.0.1, both installed from PyPI, Python 3.7, macOS 
> (times are in seconds)
> {code:java}
> # 3.0.1
> # selected one 6.375471830368042                                              
>     
> # selected three 14.847578048706055
> # 2.4.7
> # selected one 6.679579019546509                                              
>     
> # selected three 6.5622029304504395  
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to