Re: iteratively modifying an RDD

2015-02-11 Thread Davies Liu
On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
 I was having trouble with memory exceptions when broadcasting a large lookup
 table, so I've resorted to processing it iteratively -- but how can I modify
 an RDD iteratively?

 I'm trying something like :

 rdd = sc.parallelize(...)
 lookup_tables = {...}

 for lookup_table in lookup_tables :
 rdd = rdd.map(lambda x: func(x, lookup_table))

 If I leave it as is, then only the last lookup_table is applied instead of
 stringing together all the maps. However, if add a .cache() to the .map then
 it seems to work fine.

This is the something related to Python closure implementation, you should
do it like this:

def create_func(lookup_table):
 return lambda x: func(x, lookup_table)

for lookup_table in lookup_tables:
rdd = rdd.map(create_func(lookup_table))

The Python closure just remember the variable, not copy the value of it.
In the loop, `lookup_table` is the same variable. When we serialize the final
rdd, all the closures are referring to the same `lookup_table`, which points
to the last value.

When we create the closure in a function, Python create a variable for
each closure, so it works.

 A second problem is that the runtime for each iteration roughly doubles at
 each iteration so this clearly doesn't seem to be the way to do it. What is
 the preferred way of doing such repeated modifications to an RDD and how can
 the accumulation of overhead be minimized?

 Thanks!

 Rok



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
Aha great! Thanks for the clarification!
On Feb 11, 2015 8:11 PM, Davies Liu dav...@databricks.com wrote:

 On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
  I was having trouble with memory exceptions when broadcasting a large
 lookup
  table, so I've resorted to processing it iteratively -- but how can I
 modify
  an RDD iteratively?
 
  I'm trying something like :
 
  rdd = sc.parallelize(...)
  lookup_tables = {...}
 
  for lookup_table in lookup_tables :
  rdd = rdd.map(lambda x: func(x, lookup_table))
 
  If I leave it as is, then only the last lookup_table is applied
 instead of
  stringing together all the maps. However, if add a .cache() to the .map
 then
  it seems to work fine.

 This is the something related to Python closure implementation, you should
 do it like this:

 def create_func(lookup_table):
  return lambda x: func(x, lookup_table)

 for lookup_table in lookup_tables:
 rdd = rdd.map(create_func(lookup_table))

 The Python closure just remember the variable, not copy the value of it.
 In the loop, `lookup_table` is the same variable. When we serialize the
 final
 rdd, all the closures are referring to the same `lookup_table`, which
 points
 to the last value.

 When we create the closure in a function, Python create a variable for
 each closure, so it works.

  A second problem is that the runtime for each iteration roughly doubles
 at
  each iteration so this clearly doesn't seem to be the way to do it. What
 is
  the preferred way of doing such repeated modifications to an RDD and how
 can
  the accumulation of overhead be minimized?
 
  Thanks!
 
  Rok
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
the runtime for each consecutive iteration is still roughly twice as long as 
for the previous one -- is there a way to reduce whatever overhead is 
accumulating? 

On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote:

 On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
 I was having trouble with memory exceptions when broadcasting a large lookup
 table, so I've resorted to processing it iteratively -- but how can I modify
 an RDD iteratively?
 
 I'm trying something like :
 
 rdd = sc.parallelize(...)
 lookup_tables = {...}
 
 for lookup_table in lookup_tables :
rdd = rdd.map(lambda x: func(x, lookup_table))
 
 If I leave it as is, then only the last lookup_table is applied instead of
 stringing together all the maps. However, if add a .cache() to the .map then
 it seems to work fine.
 
 This is the something related to Python closure implementation, you should
 do it like this:
 
 def create_func(lookup_table):
 return lambda x: func(x, lookup_table)
 
 for lookup_table in lookup_tables:
rdd = rdd.map(create_func(lookup_table))
 
 The Python closure just remember the variable, not copy the value of it.
 In the loop, `lookup_table` is the same variable. When we serialize the final
 rdd, all the closures are referring to the same `lookup_table`, which points
 to the last value.
 
 When we create the closure in a function, Python create a variable for
 each closure, so it works.
 
 A second problem is that the runtime for each iteration roughly doubles at
 each iteration so this clearly doesn't seem to be the way to do it. What is
 the preferred way of doing such repeated modifications to an RDD and how can
 the accumulation of overhead be minimized?
 
 Thanks!
 
 Rok
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iteratively modifying an RDD

2015-02-11 Thread Davies Liu
On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar rokros...@gmail.com wrote:
 the runtime for each consecutive iteration is still roughly twice as long as 
 for the previous one -- is there a way to reduce whatever overhead is 
 accumulating?

Sorry, I didn't fully understand you question, which two are you comparing?

PySpark will try to combine the multiple map() together, then you will get
a task which need all the lookup_tables (the same size as before).

You could add a checkpoint after some of the iterations.

 On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote:

 On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
 I was having trouble with memory exceptions when broadcasting a large lookup
 table, so I've resorted to processing it iteratively -- but how can I modify
 an RDD iteratively?

 I'm trying something like :

 rdd = sc.parallelize(...)
 lookup_tables = {...}

 for lookup_table in lookup_tables :
rdd = rdd.map(lambda x: func(x, lookup_table))

 If I leave it as is, then only the last lookup_table is applied instead of
 stringing together all the maps. However, if add a .cache() to the .map then
 it seems to work fine.

 This is the something related to Python closure implementation, you should
 do it like this:

 def create_func(lookup_table):
 return lambda x: func(x, lookup_table)

 for lookup_table in lookup_tables:
rdd = rdd.map(create_func(lookup_table))

 The Python closure just remember the variable, not copy the value of it.
 In the loop, `lookup_table` is the same variable. When we serialize the final
 rdd, all the closures are referring to the same `lookup_table`, which points
 to the last value.

 When we create the closure in a function, Python create a variable for
 each closure, so it works.

 A second problem is that the runtime for each iteration roughly doubles at
 each iteration so this clearly doesn't seem to be the way to do it. What is
 the preferred way of doing such repeated modifications to an RDD and how can
 the accumulation of overhead be minimized?

 Thanks!

 Rok



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
yes, sorry i wasn't clear -- I still have to trigger the calculation of the RDD 
at the end of each iteration. Otherwise all of the lookup tables are shipped to 
the cluster at the same time resulting in memory errors. Therefore this becomes 
several map jobs instead of one and each consecutive map is slower than the one 
before. I'll try the checkpoint, thanks for the suggestion. 


On Feb 12, 2015, at 12:13 AM, Davies Liu dav...@databricks.com wrote:

 On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar rokros...@gmail.com wrote:
 the runtime for each consecutive iteration is still roughly twice as long as 
 for the previous one -- is there a way to reduce whatever overhead is 
 accumulating?
 
 Sorry, I didn't fully understand you question, which two are you comparing?
 
 PySpark will try to combine the multiple map() together, then you will get
 a task which need all the lookup_tables (the same size as before).
 
 You could add a checkpoint after some of the iterations.
 
 On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote:
 
 On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
 I was having trouble with memory exceptions when broadcasting a large 
 lookup
 table, so I've resorted to processing it iteratively -- but how can I 
 modify
 an RDD iteratively?
 
 I'm trying something like :
 
 rdd = sc.parallelize(...)
 lookup_tables = {...}
 
 for lookup_table in lookup_tables :
   rdd = rdd.map(lambda x: func(x, lookup_table))
 
 If I leave it as is, then only the last lookup_table is applied instead 
 of
 stringing together all the maps. However, if add a .cache() to the .map 
 then
 it seems to work fine.
 
 This is the something related to Python closure implementation, you should
 do it like this:
 
 def create_func(lookup_table):
return lambda x: func(x, lookup_table)
 
 for lookup_table in lookup_tables:
   rdd = rdd.map(create_func(lookup_table))
 
 The Python closure just remember the variable, not copy the value of it.
 In the loop, `lookup_table` is the same variable. When we serialize the 
 final
 rdd, all the closures are referring to the same `lookup_table`, which points
 to the last value.
 
 When we create the closure in a function, Python create a variable for
 each closure, so it works.
 
 A second problem is that the runtime for each iteration roughly doubles at
 each iteration so this clearly doesn't seem to be the way to do it. What is
 the preferred way of doing such repeated modifications to an RDD and how 
 can
 the accumulation of overhead be minimized?
 
 Thanks!
 
 Rok
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org