[ 
https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Shearer resolved SPARK-12824.
----------------------------------
    Resolution: Not A Problem

This is not so much a Spark issue as a Python gotcha. key_value binds late, 
when the RDD is evaluated, not when the closure is defined. If the collect is 
used inside the loop this happens at the "right" time, but if not, it happens 
after the last iteration of the loop. By then key_value is the final value in 
the loop.

A quick hack to force early binding is to add a default argument:

{noformat}
lambda row, key_value=key_value: row[key_field] == key_value
{noformat}

The other way is with functools.partial (this second way per Nick Chammas).

> Failure to maintain consistent RDD references in pyspark
> --------------------------------------------------------
>
>                 Key: SPARK-12824
>                 URL: https://issues.apache.org/jira/browse/SPARK-12824
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.2
>         Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
>            Reporter: Paul Shearer
>
> Below is a simple {{pyspark}} script that tries to split an RDD into a 
> dictionary containing several RDDs. 
> As the *sample run* shows, the script only works if we do a {{collect()}} on 
> the intermediate RDDs as they are created. Of course I would not want to do 
> that in practice, since it doesn't scale.
> What's really strange is, I'm not assigning the intermediate {{collect()}} 
> results to any variable. So the difference in behavior is due solely to a 
> hidden side-effect of the computation triggered by the {{collect()}} call. 
> Spark is supposed to be a very functional framework with minimal side 
> effects. Why is it only possible to get the desired behavior by triggering 
> some mysterious side effect using {{collect()}}? 
> It seems that all the keys in the dictionary are referencing the same object 
> even though in the code they are clearly supposed to be different objects.
> The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
> h3. spark_script.py
> {noformat}
>     from pprint import PrettyPrinter
>     pp = PrettyPrinter(indent=4).pprint
>     logger = sc._jvm.org.apache.log4j
>     logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
>     logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
>     
>     def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
>         d = dict()
>         for key_value in key_values:
>             d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
>             if collect_in_loop:
>                 d[key_value].collect()
>         return d
>     def print_results(d):
>         for k in d:
>             print k
>             pp(d[k].collect())    
>     
>     rdd = sc.parallelize([
>         {'color':'red','size':3},
>         {'color':'red', 'size':7},
>         {'color':'red', 'size':8},    
>         {'color':'red', 'size':10},
>         {'color':'green', 'size':9},
>         {'color':'green', 'size':5},
>         {'color':'green', 'size':50},    
>         {'color':'blue', 'size':4},
>         {'color':'purple', 'size':6}])
>     key_field = 'color'
>     key_values = ['red', 'green', 'blue', 'purple']
>     
>     print '### run WITH collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
>     print_results(d)
>     print '### run WITHOUT collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
>     print_results(d)
> {noformat}
> h3. Sample run in IPython shell
> {noformat}
>     In [1]: execfile('spark_script.py')
>     ### run WITH collect in loop: 
>     blue
>     [{   'color': 'blue', 'size': 4}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [   {   'color': 'green', 'size': 9},
>         {   'color': 'green', 'size': 5},
>         {   'color': 'green', 'size': 50}]
>     red
>     [   {   'color': 'red', 'size': 3},
>         {   'color': 'red', 'size': 7},
>         {   'color': 'red', 'size': 8},
>         {   'color': 'red', 'size': 10}]
>     ### run WITHOUT collect in loop: 
>     blue
>     [{   'color': 'purple', 'size': 6}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [{   'color': 'purple', 'size': 6}]
>     red
>     [{   'color': 'purple', 'size': 6}]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to