Re: Which is the best way to get a connection to an external database per task in Spark Streaming?

2014-07-09 Thread Juan Rodríguez Hortalá
Hi Jerry, it's all clear to me now, I will try with something like Apache
DBCP for the connection pool

Thanks a lot for your help!


2014-07-09 3:08 GMT+02:00 Shao, Saisai saisai.s...@intel.com:

  Yes, that would be the Java equivalence to use static class member, but
 you should carefully program to prevent resource leakage. A good choice is
 to use third-party DB connection library which supports connection pool,
 that will alleviate your programming efforts.



 Thanks

 Jerry



 *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com]
 *Sent:* Tuesday, July 08, 2014 6:54 PM
 *To:* user@spark.apache.org

 *Subject:* Re: Which is the best way to get a connection to an external
 database per task in Spark Streaming?



 Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and
 I only have rudimentary knowledge about Scala, how could I recreate in Java
 the lazy creation of a singleton object that you propose for Scala? Maybe a
 static class member in Java for the connection would be the solution?

 Thanks again for your help,

 Best Regards,

 Juan



 2014-07-08 11:44 GMT+02:00 Shao, Saisai saisai.s...@intel.com:

 I think you can maintain a connection pool or keep the connection as a
 long-lived object in executor side (like lazily creating a singleton object
 in object { } in Scala), so your task can get this connection each time
 executing a task, not creating a new one, that would be good for your
 scenario, since create a connection is quite expensive for each task.



 Thanks

 Jerry



 *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com]
 *Sent:* Tuesday, July 08, 2014 5:19 PM
 *To:* Tobias Pfeiffer
 *Cc:* user@spark.apache.org
 *Subject:* Re: Which is the best way to get a connection to an external
 database per task in Spark Streaming?



 Hi Tobias, thanks for your help. I understand that with that code we
 obtain a database connection per partition, but I also suspect that with
 that code a new database connection is created per each execution of the
 function used as argument for mapPartitions(). That would be very
 inefficient because a new object and a new database connection would be
 created for each batch of the DStream. But my knowledge about the lifecycle
 of Functions in Spark Streaming is very limited, so maybe I'm wrong, what
 do you think?

 Greetings,

 Juan



 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jp:

 Juan,



 I am doing something similar, just not insert into SQL database, but
 issue some RPC call. I think mapPartitions() may be helpful to you. You
 could do something like



 dstream.mapPartitions(iter = {

   val db = new DbConnection()

   // maybe only do the above if !iter.isEmpty

   iter.map(item = {

 db.call(...)

 // do some cleanup if !iter.hasNext here

 item

   })

 }).count() // force output



 Keep in mind though that the whole idea about RDDs is that operations are
 idempotent and in theory could be run on multiple hosts (to take the result
 from the fastest server) or multiple times (to deal with failures/timeouts)
 etc., which is maybe something you want to deal with in your SQL.



 Tobias





 On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá 
 juan.rodriguez.hort...@gmail.com wrote:

 Hi list,

 I'm writing a Spark Streaming program that reads from a kafka topic,
 performs some transformations on the data, and then inserts each record in
 a database with foreachRDD. I was wondering which is the best way to handle
 the connection to the database so each worker, or even each task, uses a
 different connection to the database, and then database inserts/updates
 would be performed in parallel.
 - I understand that using a final variable in the driver code is not a
 good idea because then the communication with the database would be
 performed in the driver code, which leads to a bottleneck, according to
 http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
 - I think creating a new connection in the call() method of the Function
 passed to foreachRDD is also a bad idea, because then I wouldn't be reusing
 the connection to the database for each batch RDD in the DStream
 - I'm not sure that a broadcast variable with the connection handler is a
 good idea in case the target database is distributed, because if the same
 handler is used for all the nodes of the Spark cluster then than could have
 a negative effect in the data locality of the connection to the database.
 - From
 http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
 I understand that by using an static variable and referencing it in the
 call() method of the Function passed to foreachRDD we get a different
 connection per Spark worker, I guess it's because there is a different JVM
 per worker. But then all the tasks in the same worker would share the same
 database handler object, am I right?
 - Another idea is 

Re: Which is the best way to get a connection to an external database per task in Spark Streaming?

2014-07-08 Thread Juan Rodríguez Hortalá
Hi Tobias, thanks for your help. I understand that with that code we obtain
a database connection per partition, but I also suspect that with that code
a new database connection is created per each execution of the function
used as argument for mapPartitions(). That would be very inefficient
because a new object and a new database connection would be created for
each batch of the DStream. But my knowledge about the lifecycle of
Functions in Spark Streaming is very limited, so maybe I'm wrong, what do
you think?


Greetings,

Juan


2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jp:

 Juan,

 I am doing something similar, just not insert into SQL database, but
 issue some RPC call. I think mapPartitions() may be helpful to you. You
 could do something like

 dstream.mapPartitions(iter = {
   val db = new DbConnection()
   // maybe only do the above if !iter.isEmpty
   iter.map(item = {
 db.call(...)
 // do some cleanup if !iter.hasNext here
 item
   })
 }).count() // force output

 Keep in mind though that the whole idea about RDDs is that operations are
 idempotent and in theory could be run on multiple hosts (to take the result
 from the fastest server) or multiple times (to deal with failures/timeouts)
 etc., which is maybe something you want to deal with in your SQL.

 Tobias



 On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá 
 juan.rodriguez.hort...@gmail.com wrote:

 Hi list,

 I'm writing a Spark Streaming program that reads from a kafka topic,
 performs some transformations on the data, and then inserts each record in
 a database with foreachRDD. I was wondering which is the best way to handle
 the connection to the database so each worker, or even each task, uses a
 different connection to the database, and then database inserts/updates
 would be performed in parallel.
 - I understand that using a final variable in the driver code is not a
 good idea because then the communication with the database would be
 performed in the driver code, which leads to a bottleneck, according to
 http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
 - I think creating a new connection in the call() method of the Function
 passed to foreachRDD is also a bad idea, because then I wouldn't be reusing
 the connection to the database for each batch RDD in the DStream
 - I'm not sure that a broadcast variable with the connection handler is a
 good idea in case the target database is distributed, because if the same
 handler is used for all the nodes of the Spark cluster then than could have
 a negative effect in the data locality of the connection to the database.
 - From
 http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
 I understand that by using an static variable and referencing it in the
 call() method of the Function passed to foreachRDD we get a different
 connection per Spark worker, I guess it's because there is a different JVM
 per worker. But then all the tasks in the same worker would share the same
 database handler object, am I right?
 - Another idea is using updateStateByKey() using the database handler as
 the state, but I guess that would only work for Serializable database
 handlers, and for example not for an org.apache.hadoop.hbase.client.HTable
 object.

 So my question is, which is the best way to get a connection to an
 external database per task in Spark Streaming? Or at least per worker. In
 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
 there is a partial solution to this question, but there the database
 handler object is missing. This other question
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
 is closer to mine, but there is no answer for it yet

 Thanks in advance,

 Greetings,

 Juan





RE: Which is the best way to get a connection to an external database per task in Spark Streaming?

2014-07-08 Thread Shao, Saisai
I think you can maintain a connection pool or keep the connection as a 
long-lived object in executor side (like lazily creating a singleton object in 
object { } in Scala), so your task can get this connection each time executing 
a task, not creating a new one, that would be good for your scenario, since 
create a connection is quite expensive for each task.

Thanks
Jerry

From: Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com]
Sent: Tuesday, July 08, 2014 5:19 PM
To: Tobias Pfeiffer
Cc: user@spark.apache.org
Subject: Re: Which is the best way to get a connection to an external database 
per task in Spark Streaming?

Hi Tobias, thanks for your help. I understand that with that code we obtain a 
database connection per partition, but I also suspect that with that code a new 
database connection is created per each execution of the function used as 
argument for mapPartitions(). That would be very inefficient because a new 
object and a new database connection would be created for each batch of the 
DStream. But my knowledge about the lifecycle of Functions in Spark Streaming 
is very limited, so maybe I'm wrong, what do you think?

Greetings,
Juan

2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer 
t...@preferred.jpmailto:t...@preferred.jp:
Juan,

I am doing something similar, just not insert into SQL database, but issue 
some RPC call. I think mapPartitions() may be helpful to you. You could do 
something like

dstream.mapPartitions(iter = {
  val db = new DbConnection()
  // maybe only do the above if !iter.isEmpty
  iter.map(item = {
db.call(...)
// do some cleanup if !iter.hasNext here
item
  })
}).count() // force output

Keep in mind though that the whole idea about RDDs is that operations are 
idempotent and in theory could be run on multiple hosts (to take the result 
from the fastest server) or multiple times (to deal with failures/timeouts) 
etc., which is maybe something you want to deal with in your SQL.

Tobias


On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.commailto:juan.rodriguez.hort...@gmail.com 
wrote:
Hi list,

I'm writing a Spark Streaming program that reads from a kafka topic, performs 
some transformations on the data, and then inserts each record in a database 
with foreachRDD. I was wondering which is the best way to handle the connection 
to the database so each worker, or even each task, uses a different connection 
to the database, and then database inserts/updates would be performed in 
parallel.
- I understand that using a final variable in the driver code is not a good 
idea because then the communication with the database would be performed in the 
driver code, which leads to a bottleneck, according to 
http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
- I think creating a new connection in the call() method of the Function passed 
to foreachRDD is also a bad idea, because then I wouldn't be reusing the 
connection to the database for each batch RDD in the DStream
- I'm not sure that a broadcast variable with the connection handler is a good 
idea in case the target database is distributed, because if the same handler is 
used for all the nodes of the Spark cluster then than could have a negative 
effect in the data locality of the connection to the database.
- From 
http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
 I understand that by using an static variable and referencing it in the call() 
method of the Function passed to foreachRDD we get a different connection per 
Spark worker, I guess it's because there is a different JVM per worker. But 
then all the tasks in the same worker would share the same database handler 
object, am I right?
- Another idea is using updateStateByKey() using the database handler as the 
state, but I guess that would only work for Serializable database handlers, and 
for example not for an org.apache.hadoop.hbase.client.HTable object.

So my question is, which is the best way to get a connection to an external 
database per task in Spark Streaming? Or at least per worker. In 
http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
 there is a partial solution to this question, but there the database handler 
object is missing. This other question 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
 is closer to mine, but there is no answer for it yet
Thanks in advance,
Greetings,
Juan





Re: Which is the best way to get a connection to an external database per task in Spark Streaming?

2014-07-08 Thread Juan Rodríguez Hortalá
Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and I
only have rudimentary knowledge about Scala, how could I recreate in Java
the lazy creation of a singleton object that you propose for Scala? Maybe a
static class member in Java for the connection would be the solution?

Thanks again for your help,

Best Regards,

Juan


2014-07-08 11:44 GMT+02:00 Shao, Saisai saisai.s...@intel.com:

  I think you can maintain a connection pool or keep the connection as a
 long-lived object in executor side (like lazily creating a singleton object
 in object { } in Scala), so your task can get this connection each time
 executing a task, not creating a new one, that would be good for your
 scenario, since create a connection is quite expensive for each task.



 Thanks

 Jerry



 *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com]
 *Sent:* Tuesday, July 08, 2014 5:19 PM
 *To:* Tobias Pfeiffer
 *Cc:* user@spark.apache.org
 *Subject:* Re: Which is the best way to get a connection to an external
 database per task in Spark Streaming?



 Hi Tobias, thanks for your help. I understand that with that code we
 obtain a database connection per partition, but I also suspect that with
 that code a new database connection is created per each execution of the
 function used as argument for mapPartitions(). That would be very
 inefficient because a new object and a new database connection would be
 created for each batch of the DStream. But my knowledge about the lifecycle
 of Functions in Spark Streaming is very limited, so maybe I'm wrong, what
 do you think?

   Greetings,

 Juan



 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jp:

 Juan,



 I am doing something similar, just not insert into SQL database, but
 issue some RPC call. I think mapPartitions() may be helpful to you. You
 could do something like



 dstream.mapPartitions(iter = {

   val db = new DbConnection()

   // maybe only do the above if !iter.isEmpty

   iter.map(item = {

 db.call(...)

 // do some cleanup if !iter.hasNext here

 item

   })

 }).count() // force output



 Keep in mind though that the whole idea about RDDs is that operations are
 idempotent and in theory could be run on multiple hosts (to take the result
 from the fastest server) or multiple times (to deal with failures/timeouts)
 etc., which is maybe something you want to deal with in your SQL.



 Tobias





 On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá 
 juan.rodriguez.hort...@gmail.com wrote:

 Hi list,

 I'm writing a Spark Streaming program that reads from a kafka topic,
 performs some transformations on the data, and then inserts each record in
 a database with foreachRDD. I was wondering which is the best way to handle
 the connection to the database so each worker, or even each task, uses a
 different connection to the database, and then database inserts/updates
 would be performed in parallel.
 - I understand that using a final variable in the driver code is not a
 good idea because then the communication with the database would be
 performed in the driver code, which leads to a bottleneck, according to
 http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
 - I think creating a new connection in the call() method of the Function
 passed to foreachRDD is also a bad idea, because then I wouldn't be reusing
 the connection to the database for each batch RDD in the DStream
 - I'm not sure that a broadcast variable with the connection handler is a
 good idea in case the target database is distributed, because if the same
 handler is used for all the nodes of the Spark cluster then than could have
 a negative effect in the data locality of the connection to the database.
 - From
 http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
 I understand that by using an static variable and referencing it in the
 call() method of the Function passed to foreachRDD we get a different
 connection per Spark worker, I guess it's because there is a different JVM
 per worker. But then all the tasks in the same worker would share the same
 database handler object, am I right?
 - Another idea is using updateStateByKey() using the database handler as
 the state, but I guess that would only work for Serializable database
 handlers, and for example not for an org.apache.hadoop.hbase.client.HTable
 object.

 So my question is, which is the best way to get a connection to an
 external database per task in Spark Streaming? Or at least per worker. In
 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
 there is a partial solution to this question, but there the database
 handler object is missing. This other question
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
 is closer to mine, but there is no answer for it yet

 Thanks in advance,

 

RE: Which is the best way to get a connection to an external database per task in Spark Streaming?

2014-07-08 Thread Shao, Saisai
Yes, that would be the Java equivalence to use static class member, but you 
should carefully program to prevent resource leakage. A good choice is to use 
third-party DB connection library which supports connection pool, that will 
alleviate your programming efforts.

Thanks
Jerry

From: Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com]
Sent: Tuesday, July 08, 2014 6:54 PM
To: user@spark.apache.org
Subject: Re: Which is the best way to get a connection to an external database 
per task in Spark Streaming?

Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and I 
only have rudimentary knowledge about Scala, how could I recreate in Java the 
lazy creation of a singleton object that you propose for Scala? Maybe a static 
class member in Java for the connection would be the solution?
Thanks again for your help,
Best Regards,
Juan

2014-07-08 11:44 GMT+02:00 Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com:
I think you can maintain a connection pool or keep the connection as a 
long-lived object in executor side (like lazily creating a singleton object in 
object { } in Scala), so your task can get this connection each time executing 
a task, not creating a new one, that would be good for your scenario, since 
create a connection is quite expensive for each task.

Thanks
Jerry

From: Juan Rodríguez Hortalá 
[mailto:juan.rodriguez.hort...@gmail.commailto:juan.rodriguez.hort...@gmail.com]
Sent: Tuesday, July 08, 2014 5:19 PM
To: Tobias Pfeiffer
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Which is the best way to get a connection to an external database 
per task in Spark Streaming?

Hi Tobias, thanks for your help. I understand that with that code we obtain a 
database connection per partition, but I also suspect that with that code a new 
database connection is created per each execution of the function used as 
argument for mapPartitions(). That would be very inefficient because a new 
object and a new database connection would be created for each batch of the 
DStream. But my knowledge about the lifecycle of Functions in Spark Streaming 
is very limited, so maybe I'm wrong, what do you think?
Greetings,
Juan

2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer 
t...@preferred.jpmailto:t...@preferred.jp:
Juan,

I am doing something similar, just not insert into SQL database, but issue 
some RPC call. I think mapPartitions() may be helpful to you. You could do 
something like

dstream.mapPartitions(iter = {
  val db = new DbConnection()
  // maybe only do the above if !iter.isEmpty
  iter.map(item = {
db.call(...)
// do some cleanup if !iter.hasNext here
item
  })
}).count() // force output

Keep in mind though that the whole idea about RDDs is that operations are 
idempotent and in theory could be run on multiple hosts (to take the result 
from the fastest server) or multiple times (to deal with failures/timeouts) 
etc., which is maybe something you want to deal with in your SQL.

Tobias


On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.commailto:juan.rodriguez.hort...@gmail.com 
wrote:
Hi list,

I'm writing a Spark Streaming program that reads from a kafka topic, performs 
some transformations on the data, and then inserts each record in a database 
with foreachRDD. I was wondering which is the best way to handle the connection 
to the database so each worker, or even each task, uses a different connection 
to the database, and then database inserts/updates would be performed in 
parallel.
- I understand that using a final variable in the driver code is not a good 
idea because then the communication with the database would be performed in the 
driver code, which leads to a bottleneck, according to 
http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
- I think creating a new connection in the call() method of the Function passed 
to foreachRDD is also a bad idea, because then I wouldn't be reusing the 
connection to the database for each batch RDD in the DStream
- I'm not sure that a broadcast variable with the connection handler is a good 
idea in case the target database is distributed, because if the same handler is 
used for all the nodes of the Spark cluster then than could have a negative 
effect in the data locality of the connection to the database.
- From 
http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
 I understand that by using an static variable and referencing it in the call() 
method of the Function passed to foreachRDD we get a different connection per 
Spark worker, I guess it's because there is a different JVM per worker. But 
then all the tasks in the same worker would share the same database handler 
object, am I right?
- Another idea is using updateStateByKey() using the database handler as the 
state, but I guess that would only work for Serializable database handlers, and 
for 

Re: Which is the best way to get a connection to an external database per task in Spark Streaming?

2014-07-07 Thread Tobias Pfeiffer
Juan,

I am doing something similar, just not insert into SQL database, but
issue some RPC call. I think mapPartitions() may be helpful to you. You
could do something like

dstream.mapPartitions(iter = {
  val db = new DbConnection()
  // maybe only do the above if !iter.isEmpty
  iter.map(item = {
db.call(...)
// do some cleanup if !iter.hasNext here
item
  })
}).count() // force output

Keep in mind though that the whole idea about RDDs is that operations are
idempotent and in theory could be run on multiple hosts (to take the result
from the fastest server) or multiple times (to deal with failures/timeouts)
etc., which is maybe something you want to deal with in your SQL.

Tobias



On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi list,

 I'm writing a Spark Streaming program that reads from a kafka topic,
 performs some transformations on the data, and then inserts each record in
 a database with foreachRDD. I was wondering which is the best way to handle
 the connection to the database so each worker, or even each task, uses a
 different connection to the database, and then database inserts/updates
 would be performed in parallel.
 - I understand that using a final variable in the driver code is not a
 good idea because then the communication with the database would be
 performed in the driver code, which leads to a bottleneck, according to
 http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
 - I think creating a new connection in the call() method of the Function
 passed to foreachRDD is also a bad idea, because then I wouldn't be reusing
 the connection to the database for each batch RDD in the DStream
 - I'm not sure that a broadcast variable with the connection handler is a
 good idea in case the target database is distributed, because if the same
 handler is used for all the nodes of the Spark cluster then than could have
 a negative effect in the data locality of the connection to the database.
 - From
 http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
 I understand that by using an static variable and referencing it in the
 call() method of the Function passed to foreachRDD we get a different
 connection per Spark worker, I guess it's because there is a different JVM
 per worker. But then all the tasks in the same worker would share the same
 database handler object, am I right?
 - Another idea is using updateStateByKey() using the database handler as
 the state, but I guess that would only work for Serializable database
 handlers, and for example not for an org.apache.hadoop.hbase.client.HTable
 object.

 So my question is, which is the best way to get a connection to an
 external database per task in Spark Streaming? Or at least per worker. In
 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
 there is a partial solution to this question, but there the database
 handler object is missing. This other question
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
 is closer to mine, but there is no answer for it yet

 Thanks in advance,

 Greetings,

 Juan