Juan,

I am doing something similar, just not "insert into SQL database", but
"issue some RPC call". I think mapPartitions() may be helpful to you. You
could do something like

dstream.mapPartitions(iter => {
  val db = new DbConnection()
  // maybe only do the above if !iter.isEmpty
  iter.map(item => {
    db.call(...)
    // do some cleanup if !iter.hasNext here
    item
  })
}).count() // force output

Keep in mind though that the whole idea about RDDs is that operations are
idempotent and in theory could be run on multiple hosts (to take the result
from the fastest server) or multiple times (to deal with failures/timeouts)
etc., which is maybe something you want to deal with in your SQL.

Tobias



On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi list,
>
> I'm writing a Spark Streaming program that reads from a kafka topic,
> performs some transformations on the data, and then inserts each record in
> a database with foreachRDD. I was wondering which is the best way to handle
> the connection to the database so each worker, or even each task, uses a
> different connection to the database, and then database inserts/updates
> would be performed in parallel.
> - I understand that using a final variable in the driver code is not a
> good idea because then the communication with the database would be
> performed in the driver code, which leads to a bottleneck, according to
> http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
> - I think creating a new connection in the call() method of the Function
> passed to foreachRDD is also a bad idea, because then I wouldn't be reusing
> the connection to the database for each batch RDD in the DStream
> - I'm not sure that a broadcast variable with the connection handler is a
> good idea in case the target database is distributed, because if the same
> handler is used for all the nodes of the Spark cluster then than could have
> a negative effect in the data locality of the connection to the database.
> - From
> http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
> I understand that by using an static variable and referencing it in the
> call() method of the Function passed to foreachRDD we get a different
> connection per Spark worker, I guess it's because there is a different JVM
> per worker. But then all the tasks in the same worker would share the same
> database handler object, am I right?
> - Another idea is using updateStateByKey() using the database handler as
> the state, but I guess that would only work for Serializable database
> handlers, and for example not for an org.apache.hadoop.hbase.client.HTable
> object.
>
> So my question is, which is the best way to get a connection to an
> external database per task in Spark Streaming? Or at least per worker. In
> http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
> there is a partial solution to this question, but there the database
> handler object is missing. This other question
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
> is closer to mine, but there is no answer for it yet
>
> Thanks in advance,
>
> Greetings,
>
> Juan
>
>

Reply via email to