Thanks Chris, That is what I wanted to know :) A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred
(M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Mar 2, 2015, at 2:04 AM, Chris Fregly <ch...@fregly.com> wrote: > hey AKM! > > this is a very common problem. the streaming programming guide addresses > this issue here, actually: > http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > the tl;dr is this: > 1) you want to use foreachPartition() to operate on a whole partition versus > a single record with foreachRDD() > 2) you want to get/release the ConnectionPool within each worker > 3) make sure you initialize the ConnectionPool first - or do it lazily upon > getting the first connection. > > here's the sample code referenced in the link above with some additional > comments for clarity: > > dstream.foreachRDD { rdd => > // everything within here runs on the Driver > > rdd.foreachPartition { partitionOfRecords => > // everything within here runs on the Worker and operates on a partition > of records > > // ConnectionPool is a static, lazily initialized singleton pool of > connections that runs within the Worker JVM > > // retrieve a connection from the pool > val connection = ConnectionPool.getConnection() > > // perform the application logic here - parse and write to mongodb using > the connection > partitionOfRecords.foreach(record => connection.send(record)) > > // return to the pool for future reuse > ConnectionPool.returnConnection(connection) > } > } > > hope that helps! > > -chris > > > > > On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman > <ashrafuzzaman...@gmail.com> wrote: > Sorry guys may bad, > Here is a high level code sample, > > val unionStreams = ssc.union(kinesisStreams) > unionStreams.foreachRDD(rdd => { > rdd.foreach(tweet => > val strTweet = new String(tweet, "UTF-8") > val interaction = InteractionParser.parser(strTweet) > interactionDAL.insert(interaction) > ) > }) > > Here I have to close the connection for interactionDAL other wise the JVM > gives me error that the connection is open. I tried with sticky connection as > well with keep_alive true. So my guess was that at the point of > “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send > to workers and workers un-marshals and execute the process, which is why the > connection is alway opened for each RDD. I might be completely wrong. I would > love to know what is going on underneath. > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >