yes, sorry i wasn't clear -- I still have to trigger the calculation of the RDD 
at the end of each iteration. Otherwise all of the lookup tables are shipped to 
the cluster at the same time resulting in memory errors. Therefore this becomes 
several map jobs instead of one and each consecutive map is slower than the one 
before. I'll try the checkpoint, thanks for the suggestion. 


On Feb 12, 2015, at 12:13 AM, Davies Liu <dav...@databricks.com> wrote:

> On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar <rokros...@gmail.com> wrote:
>> the runtime for each consecutive iteration is still roughly twice as long as 
>> for the previous one -- is there a way to reduce whatever overhead is 
>> accumulating?
> 
> Sorry, I didn't fully understand you question, which two are you comparing?
> 
> PySpark will try to combine the multiple map() together, then you will get
> a task which need all the lookup_tables (the same size as before).
> 
> You could add a checkpoint after some of the iterations.
> 
>> On Feb 11, 2015, at 8:11 PM, Davies Liu <dav...@databricks.com> wrote:
>> 
>>> On Wed, Feb 11, 2015 at 10:47 AM, rok <rokros...@gmail.com> wrote:
>>>> I was having trouble with memory exceptions when broadcasting a large 
>>>> lookup
>>>> table, so I've resorted to processing it iteratively -- but how can I 
>>>> modify
>>>> an RDD iteratively?
>>>> 
>>>> I'm trying something like :
>>>> 
>>>> rdd = sc.parallelize(...)
>>>> lookup_tables = {...}
>>>> 
>>>> for lookup_table in lookup_tables :
>>>>   rdd = rdd.map(lambda x: func(x, lookup_table))
>>>> 
>>>> If I leave it as is, then only the last "lookup_table" is applied instead 
>>>> of
>>>> stringing together all the maps. However, if add a .cache() to the .map 
>>>> then
>>>> it seems to work fine.
>>> 
>>> This is the something related to Python closure implementation, you should
>>> do it like this:
>>> 
>>> def create_func(lookup_table):
>>>    return lambda x: func(x, lookup_table)
>>> 
>>> for lookup_table in lookup_tables:
>>>   rdd = rdd.map(create_func(lookup_table))
>>> 
>>> The Python closure just remember the variable, not copy the value of it.
>>> In the loop, `lookup_table` is the same variable. When we serialize the 
>>> final
>>> rdd, all the closures are referring to the same `lookup_table`, which points
>>> to the last value.
>>> 
>>> When we create the closure in a function, Python create a variable for
>>> each closure, so it works.
>>> 
>>>> A second problem is that the runtime for each iteration roughly doubles at
>>>> each iteration so this clearly doesn't seem to be the way to do it. What is
>>>> the preferred way of doing such repeated modifications to an RDD and how 
>>>> can
>>>> the accumulation of overhead be minimized?
>>>> 
>>>> Thanks!
>>>> 
>>>> Rok
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: 
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>> 
>> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to