Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
On 12 Jun 2015, at 23:19, Cody Koeninger c...@koeninger.org wrote: A. No, it's called once per partition. Usually you have more partitions than executors, so it will end up getting called multiple times per executor. But you can use a lazy val, singleton, etc to make sure the setup only takes place once per JVM. B. I cant speak to the specifics there ... but as long as you're making sure the setup gets called at most once per executor, before the work that needs it ... should be ok. Great thanks so much - (I guess I am not yet clear about the relationship of partition / executor / stage, but I get the idea.) Jan On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com wrote: On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } Yes, thanks! Maybe you can confirm two more things and then you helped me make a giant leap today: a) When using spark streaming, will this happen exactly once per executor? I mean: is mapPartitions called once per executor for the lifetime of the stream? Or should I rather think once per stage? b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP request to store the data), not a DB connection - I presume this does not changethe concept? Jan On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
A. No, it's called once per partition. Usually you have more partitions than executors, so it will end up getting called multiple times per executor. But you can use a lazy val, singleton, etc to make sure the setup only takes place once per JVM. B. I cant speak to the specifics there ... but as long as you're making sure the setup gets called at most once per executor, before the work that needs it ... should be ok. On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com wrote: On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } Yes, thanks! Maybe you can confirm two more things and then you helped me make a giant leap today: a) When using spark streaming, will this happen exactly once per executor? I mean: is mapPartitions called once per executor for the lifetime of the stream? Or should I rather think once per stage? b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP request to store the data), not a DB connection - I presume this does not changethe concept? Jan On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } Yes, thanks! Maybe you can confirm two more things and then you helped me make a giant leap today: a) When using spark streaming, will this happen exactly once per executor? I mean: is mapPartitions called once per executor for the lifetime of the stream? Or should I rather think once per stage? b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP request to store the data), not a DB connection - I presume this does not changethe concept? Jan On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote: There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey Thanks. You are saying I should just make an arbitrary use of the ‘connection’ to invoke the ‘lazy’. E.g. like this: object SomeDB { lazy val conn = new SomeDB( “some serializable config) } Then somewhere else: theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = { SomeDb.conn.init pair } )).updateStateByKey[Session](myUpdateFunction _) An in myUpdateFunction def myUpdateFunction( …) { SomeDb.conn.store( … ) } Correct? Jan On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection
There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I have a scenario with spark streaming, where I need to write to a database from within updateStateByKey[1]. That means that inside my update function I need a connection. I have so far understood that I should create a new (lazy) connection for every partition. But since I am not working in foreachRDD I wonder where I can iterate over the partitions. Should I use mapPartitions() somewhere up the chain? Jan [1] The use case being saving ‘done' sessions during web tracking. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org