hey AKM!
this is a very common problem. the streaming programming guide addresses
this issue here, actually:
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
the tl;dr is this:
1) you want to use foreachPartition() to operate on a whole partition
versus a single record with foreachRDD()
2) you want to get/release the ConnectionPool within each worker
3) make sure you initialize the ConnectionPool first - or do it lazily upon
getting the first connection.
here's the sample code referenced in the link above with some additional
comments for clarity:
dstream.foreachRDD { rdd =>
// everything within here runs on the Driver
rdd.foreachPartition { partitionOfRecords =>
// everything within here runs on the Worker and operates on a partition
of records
// ConnectionPool is a static, lazily initialized singleton pool of
connections that runs within the Worker JVM
// retrieve a connection from the pool
val connection = ConnectionPool.getConnection()
// perform the application logic here - parse and write to mongodb
using the connection
partitionOfRecords.foreach(record => connection.send(record))
// return to the pool for future reuse
ConnectionPool.returnConnection(connection)
}
}
hope that helps!
-chris
On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman <
[email protected]> wrote:
> Sorry guys may bad,
> Here is a high level code sample,
>
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
> rdd.foreach(tweet =>
> val strTweet = new String(tweet, "UTF-8")
> val interaction = InteractionParser.parser(strTweet)
> interactionDAL.insert(interaction)
> )
> })
>
> Here I have to close the connection for interactionDAL other wise the JVM
> gives me error that the connection is open. I tried with sticky connection
> as well with keep_alive true. So my guess was that at the point of
> “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and
> send to workers and workers un-marshals and execute the process, which is
> why the connection is alway opened for each RDD. I might be completely
> wrong. I would love to know what is going on underneath.
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>