Re: iteratively modifying an RDD
On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote: I was having trouble with memory exceptions when broadcasting a large lookup table, so I've resorted to processing it iteratively -- but how can I modify an RDD iteratively? I'm trying something like : rdd = sc.parallelize(...) lookup_tables = {...} for lookup_table in lookup_tables : rdd = rdd.map(lambda x: func(x, lookup_table)) If I leave it as is, then only the last lookup_table is applied instead of stringing together all the maps. However, if add a .cache() to the .map then it seems to work fine. This is the something related to Python closure implementation, you should do it like this: def create_func(lookup_table): return lambda x: func(x, lookup_table) for lookup_table in lookup_tables: rdd = rdd.map(create_func(lookup_table)) The Python closure just remember the variable, not copy the value of it. In the loop, `lookup_table` is the same variable. When we serialize the final rdd, all the closures are referring to the same `lookup_table`, which points to the last value. When we create the closure in a function, Python create a variable for each closure, so it works. A second problem is that the runtime for each iteration roughly doubles at each iteration so this clearly doesn't seem to be the way to do it. What is the preferred way of doing such repeated modifications to an RDD and how can the accumulation of overhead be minimized? Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iteratively modifying an RDD
Aha great! Thanks for the clarification! On Feb 11, 2015 8:11 PM, Davies Liu dav...@databricks.com wrote: On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote: I was having trouble with memory exceptions when broadcasting a large lookup table, so I've resorted to processing it iteratively -- but how can I modify an RDD iteratively? I'm trying something like : rdd = sc.parallelize(...) lookup_tables = {...} for lookup_table in lookup_tables : rdd = rdd.map(lambda x: func(x, lookup_table)) If I leave it as is, then only the last lookup_table is applied instead of stringing together all the maps. However, if add a .cache() to the .map then it seems to work fine. This is the something related to Python closure implementation, you should do it like this: def create_func(lookup_table): return lambda x: func(x, lookup_table) for lookup_table in lookup_tables: rdd = rdd.map(create_func(lookup_table)) The Python closure just remember the variable, not copy the value of it. In the loop, `lookup_table` is the same variable. When we serialize the final rdd, all the closures are referring to the same `lookup_table`, which points to the last value. When we create the closure in a function, Python create a variable for each closure, so it works. A second problem is that the runtime for each iteration roughly doubles at each iteration so this clearly doesn't seem to be the way to do it. What is the preferred way of doing such repeated modifications to an RDD and how can the accumulation of overhead be minimized? Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iteratively modifying an RDD
the runtime for each consecutive iteration is still roughly twice as long as for the previous one -- is there a way to reduce whatever overhead is accumulating? On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote: On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote: I was having trouble with memory exceptions when broadcasting a large lookup table, so I've resorted to processing it iteratively -- but how can I modify an RDD iteratively? I'm trying something like : rdd = sc.parallelize(...) lookup_tables = {...} for lookup_table in lookup_tables : rdd = rdd.map(lambda x: func(x, lookup_table)) If I leave it as is, then only the last lookup_table is applied instead of stringing together all the maps. However, if add a .cache() to the .map then it seems to work fine. This is the something related to Python closure implementation, you should do it like this: def create_func(lookup_table): return lambda x: func(x, lookup_table) for lookup_table in lookup_tables: rdd = rdd.map(create_func(lookup_table)) The Python closure just remember the variable, not copy the value of it. In the loop, `lookup_table` is the same variable. When we serialize the final rdd, all the closures are referring to the same `lookup_table`, which points to the last value. When we create the closure in a function, Python create a variable for each closure, so it works. A second problem is that the runtime for each iteration roughly doubles at each iteration so this clearly doesn't seem to be the way to do it. What is the preferred way of doing such repeated modifications to an RDD and how can the accumulation of overhead be minimized? Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iteratively modifying an RDD
On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar rokros...@gmail.com wrote: the runtime for each consecutive iteration is still roughly twice as long as for the previous one -- is there a way to reduce whatever overhead is accumulating? Sorry, I didn't fully understand you question, which two are you comparing? PySpark will try to combine the multiple map() together, then you will get a task which need all the lookup_tables (the same size as before). You could add a checkpoint after some of the iterations. On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote: On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote: I was having trouble with memory exceptions when broadcasting a large lookup table, so I've resorted to processing it iteratively -- but how can I modify an RDD iteratively? I'm trying something like : rdd = sc.parallelize(...) lookup_tables = {...} for lookup_table in lookup_tables : rdd = rdd.map(lambda x: func(x, lookup_table)) If I leave it as is, then only the last lookup_table is applied instead of stringing together all the maps. However, if add a .cache() to the .map then it seems to work fine. This is the something related to Python closure implementation, you should do it like this: def create_func(lookup_table): return lambda x: func(x, lookup_table) for lookup_table in lookup_tables: rdd = rdd.map(create_func(lookup_table)) The Python closure just remember the variable, not copy the value of it. In the loop, `lookup_table` is the same variable. When we serialize the final rdd, all the closures are referring to the same `lookup_table`, which points to the last value. When we create the closure in a function, Python create a variable for each closure, so it works. A second problem is that the runtime for each iteration roughly doubles at each iteration so this clearly doesn't seem to be the way to do it. What is the preferred way of doing such repeated modifications to an RDD and how can the accumulation of overhead be minimized? Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iteratively modifying an RDD
yes, sorry i wasn't clear -- I still have to trigger the calculation of the RDD at the end of each iteration. Otherwise all of the lookup tables are shipped to the cluster at the same time resulting in memory errors. Therefore this becomes several map jobs instead of one and each consecutive map is slower than the one before. I'll try the checkpoint, thanks for the suggestion. On Feb 12, 2015, at 12:13 AM, Davies Liu dav...@databricks.com wrote: On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar rokros...@gmail.com wrote: the runtime for each consecutive iteration is still roughly twice as long as for the previous one -- is there a way to reduce whatever overhead is accumulating? Sorry, I didn't fully understand you question, which two are you comparing? PySpark will try to combine the multiple map() together, then you will get a task which need all the lookup_tables (the same size as before). You could add a checkpoint after some of the iterations. On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote: On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote: I was having trouble with memory exceptions when broadcasting a large lookup table, so I've resorted to processing it iteratively -- but how can I modify an RDD iteratively? I'm trying something like : rdd = sc.parallelize(...) lookup_tables = {...} for lookup_table in lookup_tables : rdd = rdd.map(lambda x: func(x, lookup_table)) If I leave it as is, then only the last lookup_table is applied instead of stringing together all the maps. However, if add a .cache() to the .map then it seems to work fine. This is the something related to Python closure implementation, you should do it like this: def create_func(lookup_table): return lambda x: func(x, lookup_table) for lookup_table in lookup_tables: rdd = rdd.map(create_func(lookup_table)) The Python closure just remember the variable, not copy the value of it. In the loop, `lookup_table` is the same variable. When we serialize the final rdd, all the closures are referring to the same `lookup_table`, which points to the last value. When we create the closure in a function, Python create a variable for each closure, so it works. A second problem is that the runtime for each iteration roughly doubles at each iteration so this clearly doesn't seem to be the way to do it. What is the preferred way of doing such repeated modifications to an RDD and how can the accumulation of overhead be minimized? Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org