Hi Jerry, it's all clear to me now, I will try with something like Apache
DBCP for the connection pool

Thanks a lot for your help!


2014-07-09 3:08 GMT+02:00 Shao, Saisai <saisai.s...@intel.com>:

>  Yes, that would be the Java equivalence to use static class member, but
> you should carefully program to prevent resource leakage. A good choice is
> to use third-party DB connection library which supports connection pool,
> that will alleviate your programming efforts.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com]
> *Sent:* Tuesday, July 08, 2014 6:54 PM
> *To:* user@spark.apache.org
>
> *Subject:* Re: Which is the best way to get a connection to an external
> database per task in Spark Streaming?
>
>
>
> Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and
> I only have rudimentary knowledge about Scala, how could I recreate in Java
> the lazy creation of a singleton object that you propose for Scala? Maybe a
> static class member in Java for the connection would be the solution?
>
> Thanks again for your help,
>
> Best Regards,
>
> Juan
>
>
>
> 2014-07-08 11:44 GMT+02:00 Shao, Saisai <saisai.s...@intel.com>:
>
> I think you can maintain a connection pool or keep the connection as a
> long-lived object in executor side (like lazily creating a singleton object
> in object { } in Scala), so your task can get this connection each time
> executing a task, not creating a new one, that would be good for your
> scenario, since create a connection is quite expensive for each task.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com]
> *Sent:* Tuesday, July 08, 2014 5:19 PM
> *To:* Tobias Pfeiffer
> *Cc:* user@spark.apache.org
> *Subject:* Re: Which is the best way to get a connection to an external
> database per task in Spark Streaming?
>
>
>
> Hi Tobias, thanks for your help. I understand that with that code we
> obtain a database connection per partition, but I also suspect that with
> that code a new database connection is created per each execution of the
> function used as argument for mapPartitions(). That would be very
> inefficient because a new object and a new database connection would be
> created for each batch of the DStream. But my knowledge about the lifecycle
> of Functions in Spark Streaming is very limited, so maybe I'm wrong, what
> do you think?
>
> Greetings,
>
> Juan
>
>
>
> 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer <t...@preferred.jp>:
>
> Juan,
>
>
>
> I am doing something similar, just not "insert into SQL database", but
> "issue some RPC call". I think mapPartitions() may be helpful to you. You
> could do something like
>
>
>
> dstream.mapPartitions(iter => {
>
>   val db = new DbConnection()
>
>   // maybe only do the above if !iter.isEmpty
>
>   iter.map(item => {
>
>     db.call(...)
>
>     // do some cleanup if !iter.hasNext here
>
>     item
>
>   })
>
> }).count() // force output
>
>
>
> Keep in mind though that the whole idea about RDDs is that operations are
> idempotent and in theory could be run on multiple hosts (to take the result
> from the fastest server) or multiple times (to deal with failures/timeouts)
> etc., which is maybe something you want to deal with in your SQL.
>
>
>
> Tobias
>
>
>
>
>
> On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
> Hi list,
>
> I'm writing a Spark Streaming program that reads from a kafka topic,
> performs some transformations on the data, and then inserts each record in
> a database with foreachRDD. I was wondering which is the best way to handle
> the connection to the database so each worker, or even each task, uses a
> different connection to the database, and then database inserts/updates
> would be performed in parallel.
> - I understand that using a final variable in the driver code is not a
> good idea because then the communication with the database would be
> performed in the driver code, which leads to a bottleneck, according to
> http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
> - I think creating a new connection in the call() method of the Function
> passed to foreachRDD is also a bad idea, because then I wouldn't be reusing
> the connection to the database for each batch RDD in the DStream
> - I'm not sure that a broadcast variable with the connection handler is a
> good idea in case the target database is distributed, because if the same
> handler is used for all the nodes of the Spark cluster then than could have
> a negative effect in the data locality of the connection to the database.
> - From
> http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
> I understand that by using an static variable and referencing it in the
> call() method of the Function passed to foreachRDD we get a different
> connection per Spark worker, I guess it's because there is a different JVM
> per worker. But then all the tasks in the same worker would share the same
> database handler object, am I right?
> - Another idea is using updateStateByKey() using the database handler as
> the state, but I guess that would only work for Serializable database
> handlers, and for example not for an org.apache.hadoop.hbase.client.HTable
> object.
>
> So my question is, which is the best way to get a connection to an
> external database per task in Spark Streaming? Or at least per worker. In
> http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
> there is a partial solution to this question, but there the database
> handler object is missing. This other question
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
> is closer to mine, but there is no answer for it yet
>
> Thanks in advance,
>
> Greetings,
>
> Juan
>
>
>
>
>
>
>
>
>

Reply via email to