Thanks Chris,
That is what I wanted to know :)

A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Mar 2, 2015, at 2:04 AM, Chris Fregly <ch...@fregly.com> wrote:

> hey AKM!
> 
> this is a very common problem.  the streaming programming guide addresses 
> this issue here, actually:  
> http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
> 
> the tl;dr is this:
> 1) you want to use foreachPartition() to operate on a whole partition versus 
> a single record with foreachRDD()
> 2) you want to get/release the ConnectionPool within each worker
> 3) make sure you initialize the ConnectionPool first - or do it lazily upon 
> getting the first connection.
> 
> here's the sample code referenced in the link above with some additional 
> comments for clarity:
> 
> dstream.foreachRDD { rdd =>
>   // everything within here runs on the Driver
> 
>   rdd.foreachPartition { partitionOfRecords =>
>    // everything within here runs on the Worker and operates on a partition 
> of records
> 
>     // ConnectionPool is a static, lazily initialized singleton pool of 
> connections that runs within the Worker JVM 
> 
>     // retrieve a connection from the pool
>     val connection = ConnectionPool.getConnection()
> 
>     // perform the application logic here - parse and write to mongodb using 
> the connection
>     partitionOfRecords.foreach(record => connection.send(record))
> 
>     // return to the pool for future reuse
>     ConnectionPool.returnConnection(connection)
>   }
> }
> 
> hope that helps!
> 
> -chris
> 
> 
> 
> 
> On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman 
> <ashrafuzzaman...@gmail.com> wrote:
> Sorry guys may bad,
> Here is a high level code sample,
> 
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
>   rdd.foreach(tweet =>
>     val strTweet = new String(tweet, "UTF-8")
>     val interaction = InteractionParser.parser(strTweet)
>     interactionDAL.insert(interaction)
>   )
> })
> 
> Here I have to close the connection for interactionDAL other wise the JVM 
> gives me error that the connection is open. I tried with sticky connection as 
> well with keep_alive true. So my guess was that at the point of 
> “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send 
> to workers and workers un-marshals and execute the process, which is why the 
> connection is alway opened for each RDD. I might be completely wrong. I would 
> love to know what is going on underneath.
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 

Reply via email to