[ 
https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Shearer updated SPARK-12824:
---------------------------------
    Description: 
Below is a simple {{pyspark}} script that tries to split an RDD into a 
dictionary containing several RDDs. 

As the *sample run* shows, the script only works if we do a {{collect()}} on 
the intermediate RDDs as they are created. Of course I would not want to do 
that in practice, since it doesn't scale.

What's really strange is, I'm not assigning the intermediate {{collect()}} 
results to any variable. So the difference in behavior is due solely to a 
hidden side-effect of the computation triggered by the {{collect()}} call. 

Spark is supposed to be a very functional framework with minimal side effects. 
Why is it only possible to get the desired behavior by triggering some 
mysterious side effect using {{collect()}}? 

It seems that all the keys in the dictionary are referencing the same object 
even though in the code they are clearly supposed to be different objects.

The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.

h3. spark_script.py

{noformat}
    from pprint import PrettyPrinter
    pp = PrettyPrinter(indent=4).pprint
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
    
    def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
        d = dict()
        for key_value in key_values:
            d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
            if collect_in_loop:
                d[key_value].collect()
        return d
    def print_results(d):
        for k in d:
            print k
            pp(d[k].collect())    
    
    rdd = sc.parallelize([
        {'color':'red','size':3},
        {'color':'red', 'size':7},
        {'color':'red', 'size':8},    
        {'color':'red', 'size':10},
        {'color':'green', 'size':9},
        {'color':'green', 'size':5},
        {'color':'green', 'size':50},    
        {'color':'blue', 'size':4},
        {'color':'purple', 'size':6}])
    key_field = 'color'
    key_values = ['red', 'green', 'blue', 'purple']
    
    print '### run WITH collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
    print_results(d)
    print '### run WITHOUT collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
    print_results(d)
{noformat}

h3. Sample run in IPython shell

{noformat}
    In [1]: execfile('spark_script.py')
    ### run WITH collect in loop: 
    blue
    [{   'color': 'blue', 'size': 4}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [   {   'color': 'green', 'size': 9},
        {   'color': 'green', 'size': 5},
        {   'color': 'green', 'size': 50}]
    red
    [   {   'color': 'red', 'size': 3},
        {   'color': 'red', 'size': 7},
        {   'color': 'red', 'size': 8},
        {   'color': 'red', 'size': 10}]
    ### run WITHOUT collect in loop: 
    blue
    [{   'color': 'purple', 'size': 6}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [{   'color': 'purple', 'size': 6}]
    red
    [{   'color': 'purple', 'size': 6}]
{noformat}

  was:
Below is a simple {{pyspark}} script that tries to split an RDD into a 
dictionary containing several RDDs. 

As the *sample run* shows, the script only works if we do a {{collect()}} on 
the intermediate RDDs as they are created. Of course I would not want to do 
that in practice, since it doesn't scale.

What's really strange is, I'm not assigning the intermediate {{collect()}} 
results to any variable. So the difference in behavior is due solely to a 
hidden side-effect of the computation triggered by the {{collect()}} call. 

Spark is supposed to be a very functional framework with minimal side effects. 
Why is it only possible to get the desired behavior by triggering some 
mysterious side effect using {{collect()}}? 

It seems that pyspark is not keeping consistent track of the filter 
transformation applied to the RDD, so the object assigned to the dictionary is 
always the same, even though the RDDs are supposed to be different.

The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.

h3. spark_script.py

{noformat}
    from pprint import PrettyPrinter
    pp = PrettyPrinter(indent=4).pprint
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
    
    def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
        d = dict()
        for key_value in key_values:
            d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
            if collect_in_loop:
                d[key_value].collect()
        return d
    def print_results(d):
        for k in d:
            print k
            pp(d[k].collect())    
    
    rdd = sc.parallelize([
        {'color':'red','size':3},
        {'color':'red', 'size':7},
        {'color':'red', 'size':8},    
        {'color':'red', 'size':10},
        {'color':'green', 'size':9},
        {'color':'green', 'size':5},
        {'color':'green', 'size':50},    
        {'color':'blue', 'size':4},
        {'color':'purple', 'size':6}])
    key_field = 'color'
    key_values = ['red', 'green', 'blue', 'purple']
    
    print '### run WITH collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
    print_results(d)
    print '### run WITHOUT collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
    print_results(d)
{noformat}

h3. Sample run in IPython shell

{noformat}
    In [1]: execfile('spark_script.py')
    ### run WITH collect in loop: 
    blue
    [{   'color': 'blue', 'size': 4}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [   {   'color': 'green', 'size': 9},
        {   'color': 'green', 'size': 5},
        {   'color': 'green', 'size': 50}]
    red
    [   {   'color': 'red', 'size': 3},
        {   'color': 'red', 'size': 7},
        {   'color': 'red', 'size': 8},
        {   'color': 'red', 'size': 10}]
    ### run WITHOUT collect in loop: 
    blue
    [{   'color': 'purple', 'size': 6}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [{   'color': 'purple', 'size': 6}]
    red
    [{   'color': 'purple', 'size': 6}]
{noformat}


> Failure to maintain consistent RDD references in pyspark
> --------------------------------------------------------
>
>                 Key: SPARK-12824
>                 URL: https://issues.apache.org/jira/browse/SPARK-12824
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.2
>         Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
>            Reporter: Paul Shearer
>
> Below is a simple {{pyspark}} script that tries to split an RDD into a 
> dictionary containing several RDDs. 
> As the *sample run* shows, the script only works if we do a {{collect()}} on 
> the intermediate RDDs as they are created. Of course I would not want to do 
> that in practice, since it doesn't scale.
> What's really strange is, I'm not assigning the intermediate {{collect()}} 
> results to any variable. So the difference in behavior is due solely to a 
> hidden side-effect of the computation triggered by the {{collect()}} call. 
> Spark is supposed to be a very functional framework with minimal side 
> effects. Why is it only possible to get the desired behavior by triggering 
> some mysterious side effect using {{collect()}}? 
> It seems that all the keys in the dictionary are referencing the same object 
> even though in the code they are clearly supposed to be different objects.
> The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
> h3. spark_script.py
> {noformat}
>     from pprint import PrettyPrinter
>     pp = PrettyPrinter(indent=4).pprint
>     logger = sc._jvm.org.apache.log4j
>     logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
>     logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
>     
>     def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
>         d = dict()
>         for key_value in key_values:
>             d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
>             if collect_in_loop:
>                 d[key_value].collect()
>         return d
>     def print_results(d):
>         for k in d:
>             print k
>             pp(d[k].collect())    
>     
>     rdd = sc.parallelize([
>         {'color':'red','size':3},
>         {'color':'red', 'size':7},
>         {'color':'red', 'size':8},    
>         {'color':'red', 'size':10},
>         {'color':'green', 'size':9},
>         {'color':'green', 'size':5},
>         {'color':'green', 'size':50},    
>         {'color':'blue', 'size':4},
>         {'color':'purple', 'size':6}])
>     key_field = 'color'
>     key_values = ['red', 'green', 'blue', 'purple']
>     
>     print '### run WITH collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
>     print_results(d)
>     print '### run WITHOUT collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
>     print_results(d)
> {noformat}
> h3. Sample run in IPython shell
> {noformat}
>     In [1]: execfile('spark_script.py')
>     ### run WITH collect in loop: 
>     blue
>     [{   'color': 'blue', 'size': 4}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [   {   'color': 'green', 'size': 9},
>         {   'color': 'green', 'size': 5},
>         {   'color': 'green', 'size': 50}]
>     red
>     [   {   'color': 'red', 'size': 3},
>         {   'color': 'red', 'size': 7},
>         {   'color': 'red', 'size': 8},
>         {   'color': 'red', 'size': 10}]
>     ### run WITHOUT collect in loop: 
>     blue
>     [{   'color': 'purple', 'size': 6}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [{   'color': 'purple', 'size': 6}]
>     red
>     [{   'color': 'purple', 'size': 6}]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to