I think you can maintain a connection pool or keep the connection as a
long-lived object in executor side (like lazily creating a singleton object in
object { } in Scala), so your task can get this connection each time executing
a task, not creating a new one, that would be good for your scenario, since
create a connection is quite expensive for each task.
Thanks
Jerry
From: Juan Rodríguez Hortalá [mailto:[email protected]]
Sent: Tuesday, July 08, 2014 5:19 PM
To: Tobias Pfeiffer
Cc: [email protected]
Subject: Re: Which is the best way to get a connection to an external database
per task in Spark Streaming?
Hi Tobias, thanks for your help. I understand that with that code we obtain a
database connection per partition, but I also suspect that with that code a new
database connection is created per each execution of the function used as
argument for mapPartitions(). That would be very inefficient because a new
object and a new database connection would be created for each batch of the
DStream. But my knowledge about the lifecycle of Functions in Spark Streaming
is very limited, so maybe I'm wrong, what do you think?
Greetings,
Juan
2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer
<[email protected]<mailto:[email protected]>>:
Juan,
I am doing something similar, just not "insert into SQL database", but "issue
some RPC call". I think mapPartitions() may be helpful to you. You could do
something like
dstream.mapPartitions(iter => {
val db = new DbConnection()
// maybe only do the above if !iter.isEmpty
iter.map(item => {
db.call(...)
// do some cleanup if !iter.hasNext here
item
})
}).count() // force output
Keep in mind though that the whole idea about RDDs is that operations are
idempotent and in theory could be run on multiple hosts (to take the result
from the fastest server) or multiple times (to deal with failures/timeouts)
etc., which is maybe something you want to deal with in your SQL.
Tobias
On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá
<[email protected]<mailto:[email protected]>>
wrote:
Hi list,
I'm writing a Spark Streaming program that reads from a kafka topic, performs
some transformations on the data, and then inserts each record in a database
with foreachRDD. I was wondering which is the best way to handle the connection
to the database so each worker, or even each task, uses a different connection
to the database, and then database inserts/updates would be performed in
parallel.
- I understand that using a final variable in the driver code is not a good
idea because then the communication with the database would be performed in the
driver code, which leads to a bottleneck, according to
http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
- I think creating a new connection in the call() method of the Function passed
to foreachRDD is also a bad idea, because then I wouldn't be reusing the
connection to the database for each batch RDD in the DStream
- I'm not sure that a broadcast variable with the connection handler is a good
idea in case the target database is distributed, because if the same handler is
used for all the nodes of the Spark cluster then than could have a negative
effect in the data locality of the connection to the database.
- From
http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
I understand that by using an static variable and referencing it in the call()
method of the Function passed to foreachRDD we get a different connection per
Spark worker, I guess it's because there is a different JVM per worker. But
then all the tasks in the same worker would share the same database handler
object, am I right?
- Another idea is using updateStateByKey() using the database handler as the
state, but I guess that would only work for Serializable database handlers, and
for example not for an org.apache.hadoop.hbase.client.HTable object.
So my question is, which is the best way to get a connection to an external
database per task in Spark Streaming? Or at least per worker. In
http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
there is a partial solution to this question, but there the database handler
object is missing. This other question
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
is closer to mine, but there is no answer for it yet
Thanks in advance,
Greetings,
Juan