Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread algermissen1971
Cody,

On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:

 There are several database apis that use a thread local or singleton 
 reference to a connection pool (we use ScalikeJDBC currently, but there are 
 others).
  
 You can use mapPartitions earlier in the chain to make sure the connection 
 pool is set up on that executor, then use it inside updateStateByKey
 

Thanks. You are saying I should just make an arbitrary use of the ‘connection’ 
to invoke the ‘lazy’. E.g. like this:

object SomeDB {

  lazy val conn = new SomeDB( “some serializable config)

}


Then somewhere else:

theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
  SomeDb.conn.init
  pair
   }
)).updateStateByKey[Session](myUpdateFunction _)


An in myUpdateFunction

def myUpdateFunction( …) {

SomeDb.conn.store(  … )

}


Correct?

Jan




 On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
 Hi,
 
 I have a scenario with spark streaming, where I need to write to a database 
 from within updateStateByKey[1].
 
 That means that inside my update function I need a connection.
 
 I have so far understood that I should create a new (lazy) connection for 
 every partition. But since I am not working in foreachRDD I wonder where I 
 can iterate over the partitions.
 
 Should I use mapPartitions() somewhere up the chain?
 
 Jan
 
 
 
 [1] The use case being saving ‘done' sessions during web tracking.
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread algermissen1971


On 12 Jun 2015, at 23:19, Cody Koeninger c...@koeninger.org wrote:

 A. No, it's called once per partition.  Usually you have more partitions than 
 executors, so it will end up getting called multiple times per executor.  But 
 you can use a lazy val, singleton, etc to make sure the setup only takes 
 place once per JVM.
 
 B.  I cant speak to the specifics there ... but as long as you're making sure 
 the setup gets called at most once per executor, before the work that needs 
 it ... should be ok.
 

Great thanks so much - 

(I guess I am not yet clear about the relationship of partition / executor / 
stage, but I get the idea.)

Jan


 On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com 
 wrote:
 
 On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote:
 
  Close.  the mapPartitions call doesn't need to do anything at all to the 
  iter.
 
  mapPartitions { iter =
SomeDb.conn.init
iter
  }
 
 Yes, thanks!
 
 Maybe you can confirm two more things and then you helped me make a giant 
 leap today:
 
 a) When using spark streaming, will this happen exactly once per executor? I 
 mean: is mapPartitions called once per executor for the lifetime of the 
 stream?
 
 Or should I rather think once per stage?
 
 
 b) I actually need an ActorSystem and FlowMaterializer (for making an 
 Akka-HTTP request to store the data), not a DB connection - I presume this 
 does not changethe concept?
 
 
 Jan
 
 
 
 
  On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 
  algermissen1...@icloud.com wrote:
  Cody,
 
  On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:
 
   There are several database apis that use a thread local or singleton 
   reference to a connection pool (we use ScalikeJDBC currently, but there 
   are others).
  
   You can use mapPartitions earlier in the chain to make sure the 
   connection pool is set up on that executor, then use it inside 
   updateStateByKey
  
 
  Thanks. You are saying I should just make an arbitrary use of the 
  ‘connection’ to invoke the ‘lazy’. E.g. like this:
 
  object SomeDB {
 
lazy val conn = new SomeDB( “some serializable config)
 
  }
 
 
  Then somewhere else:
 
  theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
SomeDb.conn.init
pair
 }
  )).updateStateByKey[Session](myUpdateFunction _)
 
 
  An in myUpdateFunction
 
  def myUpdateFunction( …) {
 
  SomeDb.conn.store(  … )
 
  }
 
 
  Correct?
 
  Jan
 
 
 
 
   On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
   algermissen1...@icloud.com wrote:
   Hi,
  
   I have a scenario with spark streaming, where I need to write to a 
   database from within updateStateByKey[1].
  
   That means that inside my update function I need a connection.
  
   I have so far understood that I should create a new (lazy) connection for 
   every partition. But since I am not working in foreachRDD I wonder where 
   I can iterate over the partitions.
  
   Should I use mapPartitions() somewhere up the chain?
  
   Jan
  
  
  
   [1] The use case being saving ‘done' sessions during web tracking.
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
  
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
A. No, it's called once per partition.  Usually you have more partitions
than executors, so it will end up getting called multiple times per
executor.  But you can use a lazy val, singleton, etc to make sure the
setup only takes place once per JVM.

B.  I cant speak to the specifics there ... but as long as you're making
sure the setup gets called at most once per executor, before the work that
needs it ... should be ok.

On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com
 wrote:


 On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote:

  Close.  the mapPartitions call doesn't need to do anything at all to the
 iter.
 
  mapPartitions { iter =
SomeDb.conn.init
iter
  }

 Yes, thanks!

 Maybe you can confirm two more things and then you helped me make a giant
 leap today:

 a) When using spark streaming, will this happen exactly once per executor?
 I mean: is mapPartitions called once per executor for the lifetime of the
 stream?

 Or should I rather think once per stage?


 b) I actually need an ActorSystem and FlowMaterializer (for making an
 Akka-HTTP request to store the data), not a DB connection - I presume this
 does not changethe concept?


 Jan



 
  On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 
 algermissen1...@icloud.com wrote:
  Cody,
 
  On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:
 
   There are several database apis that use a thread local or singleton
 reference to a connection pool (we use ScalikeJDBC currently, but there are
 others).
  
   You can use mapPartitions earlier in the chain to make sure the
 connection pool is set up on that executor, then use it inside
 updateStateByKey
  
 
  Thanks. You are saying I should just make an arbitrary use of the
 ‘connection’ to invoke the ‘lazy’. E.g. like this:
 
  object SomeDB {
 
lazy val conn = new SomeDB( “some serializable config)
 
  }
 
 
  Then somewhere else:
 
  theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
SomeDb.conn.init
pair
 }
  )).updateStateByKey[Session](myUpdateFunction _)
 
 
  An in myUpdateFunction
 
  def myUpdateFunction( …) {
 
  SomeDb.conn.store(  … )
 
  }
 
 
  Correct?
 
  Jan
 
 
 
 
   On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
   Hi,
  
   I have a scenario with spark streaming, where I need to write to a
 database from within updateStateByKey[1].
  
   That means that inside my update function I need a connection.
  
   I have so far understood that I should create a new (lazy) connection
 for every partition. But since I am not working in foreachRDD I wonder
 where I can iterate over the partitions.
  
   Should I use mapPartitions() somewhere up the chain?
  
   Jan
  
  
  
   [1] The use case being saving ‘done' sessions during web tracking.
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
  
 
 




Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
Close.  the mapPartitions call doesn't need to do anything at all to the
iter.

mapPartitions { iter =
  SomeDb.conn.init
  iter
}

On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com
 wrote:

 Cody,

 On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:

  There are several database apis that use a thread local or singleton
 reference to a connection pool (we use ScalikeJDBC currently, but there are
 others).
 
  You can use mapPartitions earlier in the chain to make sure the
 connection pool is set up on that executor, then use it inside
 updateStateByKey
 

 Thanks. You are saying I should just make an arbitrary use of the
 ‘connection’ to invoke the ‘lazy’. E.g. like this:

 object SomeDB {

   lazy val conn = new SomeDB( “some serializable config)

 }


 Then somewhere else:

 theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
   SomeDb.conn.init
   pair
}
 )).updateStateByKey[Session](myUpdateFunction _)


 An in myUpdateFunction

 def myUpdateFunction( …) {

 SomeDb.conn.store(  … )

 }


 Correct?

 Jan




  On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
  Hi,
 
  I have a scenario with spark streaming, where I need to write to a
 database from within updateStateByKey[1].
 
  That means that inside my update function I need a connection.
 
  I have so far understood that I should create a new (lazy) connection
 for every partition. But since I am not working in foreachRDD I wonder
 where I can iterate over the partitions.
 
  Should I use mapPartitions() somewhere up the chain?
 
  Jan
 
 
 
  [1] The use case being saving ‘done' sessions during web tracking.
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 




Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread algermissen1971

On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote:

 Close.  the mapPartitions call doesn't need to do anything at all to the iter.
 
 mapPartitions { iter =
   SomeDb.conn.init
   iter
 }

Yes, thanks!

Maybe you can confirm two more things and then you helped me make a giant leap 
today:

a) When using spark streaming, will this happen exactly once per executor? I 
mean: is mapPartitions called once per executor for the lifetime of the stream?

Or should I rather think once per stage?


b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP 
request to store the data), not a DB connection - I presume this does not 
changethe concept?


Jan



 
 On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com 
 wrote:
 Cody,
 
 On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:
 
  There are several database apis that use a thread local or singleton 
  reference to a connection pool (we use ScalikeJDBC currently, but there are 
  others).
 
  You can use mapPartitions earlier in the chain to make sure the connection 
  pool is set up on that executor, then use it inside updateStateByKey
 
 
 Thanks. You are saying I should just make an arbitrary use of the 
 ‘connection’ to invoke the ‘lazy’. E.g. like this:
 
 object SomeDB {
 
   lazy val conn = new SomeDB( “some serializable config)
 
 }
 
 
 Then somewhere else:
 
 theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
   SomeDb.conn.init
   pair
}
 )).updateStateByKey[Session](myUpdateFunction _)
 
 
 An in myUpdateFunction
 
 def myUpdateFunction( …) {
 
 SomeDb.conn.store(  … )
 
 }
 
 
 Correct?
 
 Jan
 
 
 
 
  On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
  algermissen1...@icloud.com wrote:
  Hi,
 
  I have a scenario with spark streaming, where I need to write to a database 
  from within updateStateByKey[1].
 
  That means that inside my update function I need a connection.
 
  I have so far understood that I should create a new (lazy) connection for 
  every partition. But since I am not working in foreachRDD I wonder where I 
  can iterate over the partitions.
 
  Should I use mapPartitions() somewhere up the chain?
 
  Jan
 
 
 
  [1] The use case being saving ‘done' sessions during web tracking.
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
There are several database apis that use a thread local or singleton
reference to a connection pool (we use ScalikeJDBC currently, but there are
others).

You can use mapPartitions earlier in the chain to make sure the connection
pool is set up on that executor, then use it inside updateStateByKey

On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
algermissen1...@icloud.com wrote:

 Hi,

 I have a scenario with spark streaming, where I need to write to a
 database from within updateStateByKey[1].

 That means that inside my update function I need a connection.

 I have so far understood that I should create a new (lazy) connection for
 every partition. But since I am not working in foreachRDD I wonder where I
 can iterate over the partitions.

 Should I use mapPartitions() somewhere up the chain?

 Jan



 [1] The use case being saving ‘done' sessions during web tracking.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org