Re: Connection pool in workers

2015-03-02 Thread A.K.M. Ashrafuzzaman
Thanks Chris,
That is what I wanted to know :)

A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Mar 2, 2015, at 2:04 AM, Chris Fregly  wrote:

> hey AKM!
> 
> this is a very common problem.  the streaming programming guide addresses 
> this issue here, actually:  
> http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
> 
> the tl;dr is this:
> 1) you want to use foreachPartition() to operate on a whole partition versus 
> a single record with foreachRDD()
> 2) you want to get/release the ConnectionPool within each worker
> 3) make sure you initialize the ConnectionPool first - or do it lazily upon 
> getting the first connection.
> 
> here's the sample code referenced in the link above with some additional 
> comments for clarity:
> 
> dstream.foreachRDD { rdd =>
>   // everything within here runs on the Driver
> 
>   rdd.foreachPartition { partitionOfRecords =>
>// everything within here runs on the Worker and operates on a partition 
> of records
> 
> // ConnectionPool is a static, lazily initialized singleton pool of 
> connections that runs within the Worker JVM 
> 
> // retrieve a connection from the pool
> val connection = ConnectionPool.getConnection()
> 
> // perform the application logic here - parse and write to mongodb using 
> the connection
> partitionOfRecords.foreach(record => connection.send(record))
> 
> // return to the pool for future reuse
> ConnectionPool.returnConnection(connection)
>   }
> }
> 
> hope that helps!
> 
> -chris
> 
> 
> 
> 
> On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman 
>  wrote:
> Sorry guys may bad,
> Here is a high level code sample,
> 
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
>   rdd.foreach(tweet =>
> val strTweet = new String(tweet, "UTF-8")
> val interaction = InteractionParser.parser(strTweet)
> interactionDAL.insert(interaction)
>   )
> })
> 
> Here I have to close the connection for interactionDAL other wise the JVM 
> gives me error that the connection is open. I tried with sticky connection as 
> well with keep_alive true. So my guess was that at the point of 
> “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send 
> to workers and workers un-marshals and execute the process, which is why the 
> connection is alway opened for each RDD. I might be completely wrong. I would 
> love to know what is going on underneath.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 



Re: Connection pool in workers

2015-03-01 Thread Chris Fregly
hey AKM!

this is a very common problem.  the streaming programming guide addresses
this issue here, actually:
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

the tl;dr is this:
1) you want to use foreachPartition() to operate on a whole partition
versus a single record with foreachRDD()
2) you want to get/release the ConnectionPool within each worker
3) make sure you initialize the ConnectionPool first - or do it lazily upon
getting the first connection.

here's the sample code referenced in the link above with some additional
comments for clarity:

dstream.foreachRDD { rdd =>
  // everything within here runs on the Driver

  rdd.foreachPartition { partitionOfRecords =>
   // everything within here runs on the Worker and operates on a partition
of records

// ConnectionPool is a static, lazily initialized singleton pool of
connections that runs within the Worker JVM

// retrieve a connection from the pool
val connection = ConnectionPool.getConnection()

// perform the application logic here - parse and write to mongodb
using the connection
partitionOfRecords.foreach(record => connection.send(record))

// return to the pool for future reuse
ConnectionPool.returnConnection(connection)
  }
}

hope that helps!

-chris




On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman <
ashrafuzzaman...@gmail.com> wrote:

> Sorry guys may bad,
> Here is a high level code sample,
>
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
>   rdd.foreach(tweet =>
> val strTweet = new String(tweet, "UTF-8")
> val interaction = InteractionParser.parser(strTweet)
> interactionDAL.insert(interaction)
>   )
> })
>
> Here I have to close the connection for interactionDAL other wise the JVM
> gives me error that the connection is open. I tried with sticky connection
> as well with keep_alive true. So my guess was that at the point of
> “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and
> send to workers and workers un-marshals and execute the process, which is
> why the connection is alway opened for each RDD. I might be completely
> wrong. I would love to know what is going on underneath.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Connection pool in workers

2015-03-01 Thread A.K.M. Ashrafuzzaman
Sorry guys may bad,
Here is a high level code sample,

val unionStreams = ssc.union(kinesisStreams)
unionStreams.foreachRDD(rdd => {
  rdd.foreach(tweet =>
val strTweet = new String(tweet, "UTF-8")
val interaction = InteractionParser.parser(strTweet)
interactionDAL.insert(interaction)
  )
})

Here I have to close the connection for interactionDAL other wise the JVM gives 
me error that the connection is open. I tried with sticky connection as well 
with keep_alive true. So my guess was that at the point of 
“unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to 
workers and workers un-marshals and execute the process, which is why the 
connection is alway opened for each RDD. I might be completely wrong. I would 
love to know what is going on underneath.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Connection pool in workers

2015-02-28 Thread A . K . M . Ashrafuzzaman
Hi guys,
I am new to spark and we are running a small project that collects data from 
Kinesis and inserts in to mongo.
I would like to share a high level view of how it is done and would love you 
input on it.

I am fetching kinesis data and for each RDD
  -> Parsing String data
  -> Inserting into a mongo storage

So what I understand is when in each RDD "we are parsing data”, that is 
serialized and send to workers. So when I would want to write to mongo.
Each workers creates a new connection to write to data.

Is there any way I can use a connection pool? By the way I am using scala and 
spark streaming.


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources