[ https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Paul Shearer updated SPARK-12824: --------------------------------- Description: Below is a simple `pyspark` script that tries to split an RDD into a dictionary containing several RDDs. As the **sample run** shows, the script only works if we do a `collect()` on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale. What's really strange is, I'm not assigning the intermediate `collect()` results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the `collect()` call. Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using `collect()`? The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. h3. spark_script.py {noformat} from pprint import PrettyPrinter pp = PrettyPrinter(indent=4).pprint logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False): d = dict() for key_value in key_values: d[key_value] = rdd.filter(lambda row: row[key_field] == key_value) if collect_in_loop: d[key_value].collect() return d def print_results(d): for k in d: print k pp(d[k].collect()) rdd = sc.parallelize([ {'color':'red','size':3}, {'color':'red', 'size':7}, {'color':'red', 'size':8}, {'color':'red', 'size':10}, {'color':'green', 'size':9}, {'color':'green', 'size':5}, {'color':'green', 'size':50}, {'color':'blue', 'size':4}, {'color':'purple', 'size':6}]) key_field = 'color' key_values = ['red', 'green', 'blue', 'purple'] print '### run WITH collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True) print_results(d) print '### run WITHOUT collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False) print_results(d) {noformat} h3. Sample run in IPython shell {noformat} In [1]: execfile('spark_script.py') ### run WITH collect in loop: blue [{ 'color': 'blue', 'size': 4}] purple [{ 'color': 'purple', 'size': 6}] green [ { 'color': 'green', 'size': 9}, { 'color': 'green', 'size': 5}, { 'color': 'green', 'size': 50}] red [ { 'color': 'red', 'size': 3}, { 'color': 'red', 'size': 7}, { 'color': 'red', 'size': 8}, { 'color': 'red', 'size': 10}] ### run WITHOUT collect in loop: blue [{ 'color': 'purple', 'size': 6}] purple [{ 'color': 'purple', 'size': 6}] green [{ 'color': 'purple', 'size': 6}] red [{ 'color': 'purple', 'size': 6}] {noformat} was: Below is a simple `pyspark` script that tries to split an RDD into a dictionary containing several RDDs. As the **sample run** shows, the script only works if we do a `collect()` on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale. What's really strange is, I'm not assigning the intermediate `collect()` results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the `collect()` call. Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using `collect()`? The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. spark_script.py ``` from pprint import PrettyPrinter pp = PrettyPrinter(indent=4).pprint logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False): d = dict() for key_value in key_values: d[key_value] = rdd.filter(lambda row: row[key_field] == key_value) if collect_in_loop: d[key_value].collect() return d def print_results(d): for k in d: print k pp(d[k].collect()) rdd = sc.parallelize([ {'color':'red','size':3}, {'color':'red', 'size':7}, {'color':'red', 'size':8}, {'color':'red', 'size':10}, {'color':'green', 'size':9}, {'color':'green', 'size':5}, {'color':'green', 'size':50}, {'color':'blue', 'size':4}, {'color':'purple', 'size':6}]) key_field = 'color' key_values = ['red', 'green', 'blue', 'purple'] print '### run WITH collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True) print_results(d) print '### run WITHOUT collect in loop: ' d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False) print_results(d) ``` Sample run in IPython shell ``` In [1]: execfile('spark_script.py') ### run WITH collect in loop: blue [{ 'color': 'blue', 'size': 4}] purple [{ 'color': 'purple', 'size': 6}] green [ { 'color': 'green', 'size': 9}, { 'color': 'green', 'size': 5}, { 'color': 'green', 'size': 50}] red [ { 'color': 'red', 'size': 3}, { 'color': 'red', 'size': 7}, { 'color': 'red', 'size': 8}, { 'color': 'red', 'size': 10}] ### run WITHOUT collect in loop: blue [{ 'color': 'purple', 'size': 6}] purple [{ 'color': 'purple', 'size': 6}] green [{ 'color': 'purple', 'size': 6}] red [{ 'color': 'purple', 'size': 6}] ``` > Failure to maintain consistent RDD references in pyspark > -------------------------------------------------------- > > Key: SPARK-12824 > URL: https://issues.apache.org/jira/browse/SPARK-12824 > Project: Spark > Issue Type: Bug > Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. > Reporter: Paul Shearer > > Below is a simple `pyspark` script that tries to split an RDD into a > dictionary containing several RDDs. > As the **sample run** shows, the script only works if we do a `collect()` on > the intermediate RDDs as they are created. Of course I would not want to do > that in practice, since it doesn't scale. > What's really strange is, I'm not assigning the intermediate `collect()` > results to any variable. So the difference in behavior is due solely to a > hidden side-effect of the computation triggered by the `collect()` call. > Spark is supposed to be a very functional framework with minimal side > effects. Why is it only possible to get the desired behavior by triggering > some mysterious side effect using `collect()`? > The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0. > h3. spark_script.py > {noformat} > from pprint import PrettyPrinter > pp = PrettyPrinter(indent=4).pprint > logger = sc._jvm.org.apache.log4j > logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) > logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) > > def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False): > d = dict() > for key_value in key_values: > d[key_value] = rdd.filter(lambda row: row[key_field] == key_value) > if collect_in_loop: > d[key_value].collect() > return d > def print_results(d): > for k in d: > print k > pp(d[k].collect()) > > rdd = sc.parallelize([ > {'color':'red','size':3}, > {'color':'red', 'size':7}, > {'color':'red', 'size':8}, > {'color':'red', 'size':10}, > {'color':'green', 'size':9}, > {'color':'green', 'size':5}, > {'color':'green', 'size':50}, > {'color':'blue', 'size':4}, > {'color':'purple', 'size':6}]) > key_field = 'color' > key_values = ['red', 'green', 'blue', 'purple'] > > print '### run WITH collect in loop: ' > d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True) > print_results(d) > print '### run WITHOUT collect in loop: ' > d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False) > print_results(d) > {noformat} > h3. Sample run in IPython shell > {noformat} > In [1]: execfile('spark_script.py') > ### run WITH collect in loop: > blue > [{ 'color': 'blue', 'size': 4}] > purple > [{ 'color': 'purple', 'size': 6}] > green > [ { 'color': 'green', 'size': 9}, > { 'color': 'green', 'size': 5}, > { 'color': 'green', 'size': 50}] > red > [ { 'color': 'red', 'size': 3}, > { 'color': 'red', 'size': 7}, > { 'color': 'red', 'size': 8}, > { 'color': 'red', 'size': 10}] > ### run WITHOUT collect in loop: > blue > [{ 'color': 'purple', 'size': 6}] > purple > [{ 'color': 'purple', 'size': 6}] > green > [{ 'color': 'purple', 'size': 6}] > red > [{ 'color': 'purple', 'size': 6}] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org