On 12 Jun 2015, at 22:59, Cody Koeninger <c...@koeninger.org> wrote:

> Close.  the mapPartitions call doesn't need to do anything at all to the iter.
> 
> mapPartitions { iter =>
>   SomeDb.conn.init
>   iter
> }

Yes, thanks!

Maybe you can confirm two more things and then you helped me make a giant leap 
today:

a) When using spark streaming, will this happen exactly once per executor? I 
mean: is mapPartitions called once per executor for the lifetime of the stream?

Or should I rather think once per stage?


b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP 
request to store the data), not a DB connection - I presume this does not 
changethe concept?


Jan



> 
> On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 <algermissen1...@icloud.com> 
> wrote:
> Cody,
> 
> On 12 Jun 2015, at 17:26, Cody Koeninger <c...@koeninger.org> wrote:
> 
> > There are several database apis that use a thread local or singleton 
> > reference to a connection pool (we use ScalikeJDBC currently, but there are 
> > others).
> >
> > You can use mapPartitions earlier in the chain to make sure the connection 
> > pool is set up on that executor, then use it inside updateStateByKey
> >
> 
> Thanks. You are saying I should just make an arbitrary use of the 
> ‘connection’ to invoke the ‘lazy’. E.g. like this:
> 
> object SomeDB {
> 
>   lazy val conn = new SomeDB( “some serializable config")
> 
> }
> 
> 
> Then somewhere else:
> 
> theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
>   SomeDb.conn.init
>   pair
>    }
> )).updateStateByKey[Session](myUpdateFunction _)
> 
> 
> An in myUpdateFunction
> 
> def myUpdateFunction( …) {
> 
>     SomeDb.conn.store(  … )
> 
> }
> 
> 
> Correct?
> 
> Jan
> 
> 
> 
> 
> > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
> > <algermissen1...@icloud.com> wrote:
> > Hi,
> >
> > I have a scenario with spark streaming, where I need to write to a database 
> > from within updateStateByKey[1].
> >
> > That means that inside my update function I need a connection.
> >
> > I have so far understood that I should create a new (lazy) connection for 
> > every partition. But since I am not working in foreachRDD I wonder where I 
> > can iterate over the partitions.
> >
> > Should I use mapPartitions() somewhere up the chain?
> >
> > Jan
> >
> >
> >
> > [1] The use case being saving ‘done' sessions during web tracking.
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
> 
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to