[ https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Paul Shearer updated SPARK-12824: --------------------------------- Description: Below is a simple {{pyspark}} script that tries to split an RDD into a dictionary containing several RDDs. As the *sample run* shows, the script only works if we do a {{collect()}} on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale. What's really strange is, I'm not assigning the intermediate {{collect()}} results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the {{collect()}} call. Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using {{collect()}}? It seems that pyspark is not keeping consistent track of the filter transformation applied to the RDD, so the object assigned to the dictionary is always the same, even though the RDDs are supposed to be different. The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. h3. spark_script.py {noformat} from pprint import PrettyPrinter pp = PrettyPrinter(indent=4).pprint logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False): d = dict() for key_value in key_values: d[key_value] = rdd.filter(lambda row: row[key_field] == key_value) if collect_in_loop: d[key_value].collect() return d def print_results(d): for k in d: print k pp(d[k].collect()) rdd = sc.parallelize([ {'color':'red','size':3}, {'color':'red', 'size':7}, {'color':'red', 'size':8}, {'color':'red', 'size':10}, {'color':'green', 'size':9}, {'color':'green', 'size':5}, {'color':'green', 'size':50}, {'color':'blue', 'size':4}, {'color':'purple', 'size':6}]) key_field = 'color' key_values = ['red', 'green', 'blue', 'purple'] print '### run WITH collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True) print_results(d) print '### run WITHOUT collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False) print_results(d) {noformat} h3. Sample run in IPython shell {noformat} In [1]: execfile('spark_script.py') ### run WITH collect in loop: blue [{ 'color': 'blue', 'size': 4}] purple [{ 'color': 'purple', 'size': 6}] green [ { 'color': 'green', 'size': 9}, { 'color': 'green', 'size': 5}, { 'color': 'green', 'size': 50}] red [ { 'color': 'red', 'size': 3}, { 'color': 'red', 'size': 7}, { 'color': 'red', 'size': 8}, { 'color': 'red', 'size': 10}] ### run WITHOUT collect in loop: blue [{ 'color': 'purple', 'size': 6}] purple [{ 'color': 'purple', 'size': 6}] green [{ 'color': 'purple', 'size': 6}] red [{ 'color': 'purple', 'size': 6}] {noformat} was: Below is a simple {{pyspark}} script that tries to split an RDD into a dictionary containing several RDDs. As the *sample run* shows, the script only works if we do a {{collect()}} on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale. What's really strange is, I'm not assigning the intermediate {{collect()}} results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the {{collect()}} call. Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using {{collect()}}? The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. h3. spark_script.py {noformat} from pprint import PrettyPrinter pp = PrettyPrinter(indent=4).pprint logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False): d = dict() for key_value in key_values: d[key_value] = rdd.filter(lambda row: row[key_field] == key_value) if collect_in_loop: d[key_value].collect() return d def print_results(d): for k in d: print k pp(d[k].collect()) rdd = sc.parallelize([ {'color':'red','size':3}, {'color':'red', 'size':7}, {'color':'red', 'size':8}, {'color':'red', 'size':10}, {'color':'green', 'size':9}, {'color':'green', 'size':5}, {'color':'green', 'size':50}, {'color':'blue', 'size':4}, {'color':'purple', 'size':6}]) key_field = 'color' key_values = ['red', 'green', 'blue', 'purple'] print '### run WITH collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True) print_results(d) print '### run WITHOUT collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False) print_results(d) {noformat} h3. Sample run in IPython shell {noformat} In [1]: execfile('spark_script.py') ### run WITH collect in loop: blue [{ 'color': 'blue', 'size': 4}] purple [{ 'color': 'purple', 'size': 6}] green [ { 'color': 'green', 'size': 9}, { 'color': 'green', 'size': 5}, { 'color': 'green', 'size': 50}] red [ { 'color': 'red', 'size': 3}, { 'color': 'red', 'size': 7}, { 'color': 'red', 'size': 8}, { 'color': 'red', 'size': 10}] ### run WITHOUT collect in loop: blue [{ 'color': 'purple', 'size': 6}] purple [{ 'color': 'purple', 'size': 6}] green [{ 'color': 'purple', 'size': 6}] red [{ 'color': 'purple', 'size': 6}] {noformat} > Failure to maintain consistent RDD references in pyspark > -------------------------------------------------------- > > Key: SPARK-12824 > URL: https://issues.apache.org/jira/browse/SPARK-12824 > Project: Spark > Issue Type: Bug > Components: PySpark > Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. > Reporter: Paul Shearer > > Below is a simple {{pyspark}} script that tries to split an RDD into a > dictionary containing several RDDs. > As the *sample run* shows, the script only works if we do a {{collect()}} on > the intermediate RDDs as they are created. Of course I would not want to do > that in practice, since it doesn't scale. > What's really strange is, I'm not assigning the intermediate {{collect()}} > results to any variable. So the difference in behavior is due solely to a > hidden side-effect of the computation triggered by the {{collect()}} call. > Spark is supposed to be a very functional framework with minimal side > effects. Why is it only possible to get the desired behavior by triggering > some mysterious side effect using {{collect()}}? > It seems that pyspark is not keeping consistent track of the filter > transformation applied to the RDD, so the object assigned to the dictionary > is always the same, even though the RDDs are supposed to be different. > The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. > h3. spark_script.py > {noformat} > from pprint import PrettyPrinter > pp = PrettyPrinter(indent=4).pprint > logger = sc._jvm.org.apache.log4j > logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) > logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) > > def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False): > d = dict() > for key_value in key_values: > d[key_value] = rdd.filter(lambda row: row[key_field] == key_value) > if collect_in_loop: > d[key_value].collect() > return d > def print_results(d): > for k in d: > print k > pp(d[k].collect()) > > rdd = sc.parallelize([ > {'color':'red','size':3}, > {'color':'red', 'size':7}, > {'color':'red', 'size':8}, > {'color':'red', 'size':10}, > {'color':'green', 'size':9}, > {'color':'green', 'size':5}, > {'color':'green', 'size':50}, > {'color':'blue', 'size':4}, > {'color':'purple', 'size':6}]) > key_field = 'color' > key_values = ['red', 'green', 'blue', 'purple'] > > print '### run WITH collect in loop: ' > d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True) > print_results(d) > print '### run WITHOUT collect in loop: ' > d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False) > print_results(d) > {noformat} > h3. Sample run in IPython shell > {noformat} > In [1]: execfile('spark_script.py') > ### run WITH collect in loop: > blue > [{ 'color': 'blue', 'size': 4}] > purple > [{ 'color': 'purple', 'size': 6}] > green > [ { 'color': 'green', 'size': 9}, > { 'color': 'green', 'size': 5}, > { 'color': 'green', 'size': 50}] > red > [ { 'color': 'red', 'size': 3}, > { 'color': 'red', 'size': 7}, > { 'color': 'red', 'size': 8}, > { 'color': 'red', 'size': 10}] > ### run WITHOUT collect in loop: > blue > [{ 'color': 'purple', 'size': 6}] > purple > [{ 'color': 'purple', 'size': 6}] > green > [{ 'color': 'purple', 'size': 6}] > red > [{ 'color': 'purple', 'size': 6}] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org