hey AKM! this is a very common problem. the streaming programming guide addresses this issue here, actually: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
the tl;dr is this: 1) you want to use foreachPartition() to operate on a whole partition versus a single record with foreachRDD() 2) you want to get/release the ConnectionPool within each worker 3) make sure you initialize the ConnectionPool first - or do it lazily upon getting the first connection. here's the sample code referenced in the link above with some additional comments for clarity: dstream.foreachRDD { rdd => // everything within here runs on the Driver rdd.foreachPartition { partitionOfRecords => // everything within here runs on the Worker and operates on a partition of records // ConnectionPool is a static, lazily initialized singleton pool of connections that runs within the Worker JVM // retrieve a connection from the pool val connection = ConnectionPool.getConnection() // perform the application logic here - parse and write to mongodb using the connection partitionOfRecords.foreach(record => connection.send(record)) // return to the pool for future reuse ConnectionPool.returnConnection(connection) } } hope that helps! -chris On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman < ashrafuzzaman...@gmail.com> wrote: > Sorry guys may bad, > Here is a high level code sample, > > val unionStreams = ssc.union(kinesisStreams) > unionStreams.foreachRDD(rdd => { > rdd.foreach(tweet => > val strTweet = new String(tweet, "UTF-8") > val interaction = InteractionParser.parser(strTweet) > interactionDAL.insert(interaction) > ) > }) > > Here I have to close the connection for interactionDAL other wise the JVM > gives me error that the connection is open. I tried with sticky connection > as well with keep_alive true. So my guess was that at the point of > “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and > send to workers and workers un-marshals and execute the process, which is > why the connection is alway opened for each RDD. I might be completely > wrong. I would love to know what is going on underneath. > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >