Re: Connection pool in workers
Thanks Chris, That is what I wanted to know :) A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Mar 2, 2015, at 2:04 AM, Chris Fregly wrote: > hey AKM! > > this is a very common problem. the streaming programming guide addresses > this issue here, actually: > http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > the tl;dr is this: > 1) you want to use foreachPartition() to operate on a whole partition versus > a single record with foreachRDD() > 2) you want to get/release the ConnectionPool within each worker > 3) make sure you initialize the ConnectionPool first - or do it lazily upon > getting the first connection. > > here's the sample code referenced in the link above with some additional > comments for clarity: > > dstream.foreachRDD { rdd => > // everything within here runs on the Driver > > rdd.foreachPartition { partitionOfRecords => >// everything within here runs on the Worker and operates on a partition > of records > > // ConnectionPool is a static, lazily initialized singleton pool of > connections that runs within the Worker JVM > > // retrieve a connection from the pool > val connection = ConnectionPool.getConnection() > > // perform the application logic here - parse and write to mongodb using > the connection > partitionOfRecords.foreach(record => connection.send(record)) > > // return to the pool for future reuse > ConnectionPool.returnConnection(connection) > } > } > > hope that helps! > > -chris > > > > > On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman > wrote: > Sorry guys may bad, > Here is a high level code sample, > > val unionStreams = ssc.union(kinesisStreams) > unionStreams.foreachRDD(rdd => { > rdd.foreach(tweet => > val strTweet = new String(tweet, "UTF-8") > val interaction = InteractionParser.parser(strTweet) > interactionDAL.insert(interaction) > ) > }) > > Here I have to close the connection for interactionDAL other wise the JVM > gives me error that the connection is open. I tried with sticky connection as > well with keep_alive true. So my guess was that at the point of > “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send > to workers and workers un-marshals and execute the process, which is why the > connection is alway opened for each RDD. I might be completely wrong. I would > love to know what is going on underneath. > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Connection pool in workers
hey AKM! this is a very common problem. the streaming programming guide addresses this issue here, actually: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd the tl;dr is this: 1) you want to use foreachPartition() to operate on a whole partition versus a single record with foreachRDD() 2) you want to get/release the ConnectionPool within each worker 3) make sure you initialize the ConnectionPool first - or do it lazily upon getting the first connection. here's the sample code referenced in the link above with some additional comments for clarity: dstream.foreachRDD { rdd => // everything within here runs on the Driver rdd.foreachPartition { partitionOfRecords => // everything within here runs on the Worker and operates on a partition of records // ConnectionPool is a static, lazily initialized singleton pool of connections that runs within the Worker JVM // retrieve a connection from the pool val connection = ConnectionPool.getConnection() // perform the application logic here - parse and write to mongodb using the connection partitionOfRecords.foreach(record => connection.send(record)) // return to the pool for future reuse ConnectionPool.returnConnection(connection) } } hope that helps! -chris On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman < ashrafuzzaman...@gmail.com> wrote: > Sorry guys may bad, > Here is a high level code sample, > > val unionStreams = ssc.union(kinesisStreams) > unionStreams.foreachRDD(rdd => { > rdd.foreach(tweet => > val strTweet = new String(tweet, "UTF-8") > val interaction = InteractionParser.parser(strTweet) > interactionDAL.insert(interaction) > ) > }) > > Here I have to close the connection for interactionDAL other wise the JVM > gives me error that the connection is open. I tried with sticky connection > as well with keep_alive true. So my guess was that at the point of > “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and > send to workers and workers un-marshals and execute the process, which is > why the connection is alway opened for each RDD. I might be completely > wrong. I would love to know what is going on underneath. > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Connection pool in workers
Sorry guys may bad, Here is a high level code sample, val unionStreams = ssc.union(kinesisStreams) unionStreams.foreachRDD(rdd => { rdd.foreach(tweet => val strTweet = new String(tweet, "UTF-8") val interaction = InteractionParser.parser(strTweet) interactionDAL.insert(interaction) ) }) Here I have to close the connection for interactionDAL other wise the JVM gives me error that the connection is open. I tried with sticky connection as well with keep_alive true. So my guess was that at the point of “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to workers and workers un-marshals and execute the process, which is why the connection is alway opened for each RDD. I might be completely wrong. I would love to know what is going on underneath. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Connection pool in workers
Hi guys, I am new to spark and we are running a small project that collects data from Kinesis and inserts in to mongo. I would like to share a high level view of how it is done and would love you input on it. I am fetching kinesis data and for each RDD -> Parsing String data -> Inserting into a mongo storage So what I understand is when in each RDD "we are parsing data”, that is serialized and send to workers. So when I would want to write to mongo. Each workers creates a new connection to write to data. Is there any way I can use a connection pool? By the way I am using scala and spark streaming. A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources