Re: Which is the best way to get a connection to an external database per task in Spark Streaming?
Hi Jerry, it's all clear to me now, I will try with something like Apache DBCP for the connection pool Thanks a lot for your help! 2014-07-09 3:08 GMT+02:00 Shao, Saisai saisai.s...@intel.com: Yes, that would be the Java equivalence to use static class member, but you should carefully program to prevent resource leakage. A good choice is to use third-party DB connection library which supports connection pool, that will alleviate your programming efforts. Thanks Jerry *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] *Sent:* Tuesday, July 08, 2014 6:54 PM *To:* user@spark.apache.org *Subject:* Re: Which is the best way to get a connection to an external database per task in Spark Streaming? Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and I only have rudimentary knowledge about Scala, how could I recreate in Java the lazy creation of a singleton object that you propose for Scala? Maybe a static class member in Java for the connection would be the solution? Thanks again for your help, Best Regards, Juan 2014-07-08 11:44 GMT+02:00 Shao, Saisai saisai.s...@intel.com: I think you can maintain a connection pool or keep the connection as a long-lived object in executor side (like lazily creating a singleton object in object { } in Scala), so your task can get this connection each time executing a task, not creating a new one, that would be good for your scenario, since create a connection is quite expensive for each task. Thanks Jerry *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] *Sent:* Tuesday, July 08, 2014 5:19 PM *To:* Tobias Pfeiffer *Cc:* user@spark.apache.org *Subject:* Re: Which is the best way to get a connection to an external database per task in Spark Streaming? Hi Tobias, thanks for your help. I understand that with that code we obtain a database connection per partition, but I also suspect that with that code a new database connection is created per each execution of the function used as argument for mapPartitions(). That would be very inefficient because a new object and a new database connection would be created for each batch of the DStream. But my knowledge about the lifecycle of Functions in Spark Streaming is very limited, so maybe I'm wrong, what do you think? Greetings, Juan 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jp: Juan, I am doing something similar, just not insert into SQL database, but issue some RPC call. I think mapPartitions() may be helpful to you. You could do something like dstream.mapPartitions(iter = { val db = new DbConnection() // maybe only do the above if !iter.isEmpty iter.map(item = { db.call(...) // do some cleanup if !iter.hasNext here item }) }).count() // force output Keep in mind though that the whole idea about RDDs is that operations are idempotent and in theory could be run on multiple hosts (to take the result from the fastest server) or multiple times (to deal with failures/timeouts) etc., which is maybe something you want to deal with in your SQL. Tobias On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi list, I'm writing a Spark Streaming program that reads from a kafka topic, performs some transformations on the data, and then inserts each record in a database with foreachRDD. I was wondering which is the best way to handle the connection to the database so each worker, or even each task, uses a different connection to the database, and then database inserts/updates would be performed in parallel. - I understand that using a final variable in the driver code is not a good idea because then the communication with the database would be performed in the driver code, which leads to a bottleneck, according to http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ - I think creating a new connection in the call() method of the Function passed to foreachRDD is also a bad idea, because then I wouldn't be reusing the connection to the database for each batch RDD in the DStream - I'm not sure that a broadcast variable with the connection handler is a good idea in case the target database is distributed, because if the same handler is used for all the nodes of the Spark cluster then than could have a negative effect in the data locality of the connection to the database. - From http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html I understand that by using an static variable and referencing it in the call() method of the Function passed to foreachRDD we get a different connection per Spark worker, I guess it's because there is a different JVM per worker. But then all the tasks in the same worker would share the same database handler object, am I right? - Another idea is
Re: Which is the best way to get a connection to an external database per task in Spark Streaming?
Hi Tobias, thanks for your help. I understand that with that code we obtain a database connection per partition, but I also suspect that with that code a new database connection is created per each execution of the function used as argument for mapPartitions(). That would be very inefficient because a new object and a new database connection would be created for each batch of the DStream. But my knowledge about the lifecycle of Functions in Spark Streaming is very limited, so maybe I'm wrong, what do you think? Greetings, Juan 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jp: Juan, I am doing something similar, just not insert into SQL database, but issue some RPC call. I think mapPartitions() may be helpful to you. You could do something like dstream.mapPartitions(iter = { val db = new DbConnection() // maybe only do the above if !iter.isEmpty iter.map(item = { db.call(...) // do some cleanup if !iter.hasNext here item }) }).count() // force output Keep in mind though that the whole idea about RDDs is that operations are idempotent and in theory could be run on multiple hosts (to take the result from the fastest server) or multiple times (to deal with failures/timeouts) etc., which is maybe something you want to deal with in your SQL. Tobias On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi list, I'm writing a Spark Streaming program that reads from a kafka topic, performs some transformations on the data, and then inserts each record in a database with foreachRDD. I was wondering which is the best way to handle the connection to the database so each worker, or even each task, uses a different connection to the database, and then database inserts/updates would be performed in parallel. - I understand that using a final variable in the driver code is not a good idea because then the communication with the database would be performed in the driver code, which leads to a bottleneck, according to http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ - I think creating a new connection in the call() method of the Function passed to foreachRDD is also a bad idea, because then I wouldn't be reusing the connection to the database for each batch RDD in the DStream - I'm not sure that a broadcast variable with the connection handler is a good idea in case the target database is distributed, because if the same handler is used for all the nodes of the Spark cluster then than could have a negative effect in the data locality of the connection to the database. - From http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html I understand that by using an static variable and referencing it in the call() method of the Function passed to foreachRDD we get a different connection per Spark worker, I guess it's because there is a different JVM per worker. But then all the tasks in the same worker would share the same database handler object, am I right? - Another idea is using updateStateByKey() using the database handler as the state, but I guess that would only work for Serializable database handlers, and for example not for an org.apache.hadoop.hbase.client.HTable object. So my question is, which is the best way to get a connection to an external database per task in Spark Streaming? Or at least per worker. In http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html there is a partial solution to this question, but there the database handler object is missing. This other question http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html is closer to mine, but there is no answer for it yet Thanks in advance, Greetings, Juan
RE: Which is the best way to get a connection to an external database per task in Spark Streaming?
I think you can maintain a connection pool or keep the connection as a long-lived object in executor side (like lazily creating a singleton object in object { } in Scala), so your task can get this connection each time executing a task, not creating a new one, that would be good for your scenario, since create a connection is quite expensive for each task. Thanks Jerry From: Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] Sent: Tuesday, July 08, 2014 5:19 PM To: Tobias Pfeiffer Cc: user@spark.apache.org Subject: Re: Which is the best way to get a connection to an external database per task in Spark Streaming? Hi Tobias, thanks for your help. I understand that with that code we obtain a database connection per partition, but I also suspect that with that code a new database connection is created per each execution of the function used as argument for mapPartitions(). That would be very inefficient because a new object and a new database connection would be created for each batch of the DStream. But my knowledge about the lifecycle of Functions in Spark Streaming is very limited, so maybe I'm wrong, what do you think? Greetings, Juan 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jpmailto:t...@preferred.jp: Juan, I am doing something similar, just not insert into SQL database, but issue some RPC call. I think mapPartitions() may be helpful to you. You could do something like dstream.mapPartitions(iter = { val db = new DbConnection() // maybe only do the above if !iter.isEmpty iter.map(item = { db.call(...) // do some cleanup if !iter.hasNext here item }) }).count() // force output Keep in mind though that the whole idea about RDDs is that operations are idempotent and in theory could be run on multiple hosts (to take the result from the fastest server) or multiple times (to deal with failures/timeouts) etc., which is maybe something you want to deal with in your SQL. Tobias On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.commailto:juan.rodriguez.hort...@gmail.com wrote: Hi list, I'm writing a Spark Streaming program that reads from a kafka topic, performs some transformations on the data, and then inserts each record in a database with foreachRDD. I was wondering which is the best way to handle the connection to the database so each worker, or even each task, uses a different connection to the database, and then database inserts/updates would be performed in parallel. - I understand that using a final variable in the driver code is not a good idea because then the communication with the database would be performed in the driver code, which leads to a bottleneck, according to http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ - I think creating a new connection in the call() method of the Function passed to foreachRDD is also a bad idea, because then I wouldn't be reusing the connection to the database for each batch RDD in the DStream - I'm not sure that a broadcast variable with the connection handler is a good idea in case the target database is distributed, because if the same handler is used for all the nodes of the Spark cluster then than could have a negative effect in the data locality of the connection to the database. - From http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html I understand that by using an static variable and referencing it in the call() method of the Function passed to foreachRDD we get a different connection per Spark worker, I guess it's because there is a different JVM per worker. But then all the tasks in the same worker would share the same database handler object, am I right? - Another idea is using updateStateByKey() using the database handler as the state, but I guess that would only work for Serializable database handlers, and for example not for an org.apache.hadoop.hbase.client.HTable object. So my question is, which is the best way to get a connection to an external database per task in Spark Streaming? Or at least per worker. In http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html there is a partial solution to this question, but there the database handler object is missing. This other question http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html is closer to mine, but there is no answer for it yet Thanks in advance, Greetings, Juan
Re: Which is the best way to get a connection to an external database per task in Spark Streaming?
Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and I only have rudimentary knowledge about Scala, how could I recreate in Java the lazy creation of a singleton object that you propose for Scala? Maybe a static class member in Java for the connection would be the solution? Thanks again for your help, Best Regards, Juan 2014-07-08 11:44 GMT+02:00 Shao, Saisai saisai.s...@intel.com: I think you can maintain a connection pool or keep the connection as a long-lived object in executor side (like lazily creating a singleton object in object { } in Scala), so your task can get this connection each time executing a task, not creating a new one, that would be good for your scenario, since create a connection is quite expensive for each task. Thanks Jerry *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] *Sent:* Tuesday, July 08, 2014 5:19 PM *To:* Tobias Pfeiffer *Cc:* user@spark.apache.org *Subject:* Re: Which is the best way to get a connection to an external database per task in Spark Streaming? Hi Tobias, thanks for your help. I understand that with that code we obtain a database connection per partition, but I also suspect that with that code a new database connection is created per each execution of the function used as argument for mapPartitions(). That would be very inefficient because a new object and a new database connection would be created for each batch of the DStream. But my knowledge about the lifecycle of Functions in Spark Streaming is very limited, so maybe I'm wrong, what do you think? Greetings, Juan 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jp: Juan, I am doing something similar, just not insert into SQL database, but issue some RPC call. I think mapPartitions() may be helpful to you. You could do something like dstream.mapPartitions(iter = { val db = new DbConnection() // maybe only do the above if !iter.isEmpty iter.map(item = { db.call(...) // do some cleanup if !iter.hasNext here item }) }).count() // force output Keep in mind though that the whole idea about RDDs is that operations are idempotent and in theory could be run on multiple hosts (to take the result from the fastest server) or multiple times (to deal with failures/timeouts) etc., which is maybe something you want to deal with in your SQL. Tobias On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi list, I'm writing a Spark Streaming program that reads from a kafka topic, performs some transformations on the data, and then inserts each record in a database with foreachRDD. I was wondering which is the best way to handle the connection to the database so each worker, or even each task, uses a different connection to the database, and then database inserts/updates would be performed in parallel. - I understand that using a final variable in the driver code is not a good idea because then the communication with the database would be performed in the driver code, which leads to a bottleneck, according to http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ - I think creating a new connection in the call() method of the Function passed to foreachRDD is also a bad idea, because then I wouldn't be reusing the connection to the database for each batch RDD in the DStream - I'm not sure that a broadcast variable with the connection handler is a good idea in case the target database is distributed, because if the same handler is used for all the nodes of the Spark cluster then than could have a negative effect in the data locality of the connection to the database. - From http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html I understand that by using an static variable and referencing it in the call() method of the Function passed to foreachRDD we get a different connection per Spark worker, I guess it's because there is a different JVM per worker. But then all the tasks in the same worker would share the same database handler object, am I right? - Another idea is using updateStateByKey() using the database handler as the state, but I guess that would only work for Serializable database handlers, and for example not for an org.apache.hadoop.hbase.client.HTable object. So my question is, which is the best way to get a connection to an external database per task in Spark Streaming? Or at least per worker. In http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html there is a partial solution to this question, but there the database handler object is missing. This other question http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html is closer to mine, but there is no answer for it yet Thanks in advance,
RE: Which is the best way to get a connection to an external database per task in Spark Streaming?
Yes, that would be the Java equivalence to use static class member, but you should carefully program to prevent resource leakage. A good choice is to use third-party DB connection library which supports connection pool, that will alleviate your programming efforts. Thanks Jerry From: Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] Sent: Tuesday, July 08, 2014 6:54 PM To: user@spark.apache.org Subject: Re: Which is the best way to get a connection to an external database per task in Spark Streaming? Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and I only have rudimentary knowledge about Scala, how could I recreate in Java the lazy creation of a singleton object that you propose for Scala? Maybe a static class member in Java for the connection would be the solution? Thanks again for your help, Best Regards, Juan 2014-07-08 11:44 GMT+02:00 Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com: I think you can maintain a connection pool or keep the connection as a long-lived object in executor side (like lazily creating a singleton object in object { } in Scala), so your task can get this connection each time executing a task, not creating a new one, that would be good for your scenario, since create a connection is quite expensive for each task. Thanks Jerry From: Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.commailto:juan.rodriguez.hort...@gmail.com] Sent: Tuesday, July 08, 2014 5:19 PM To: Tobias Pfeiffer Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Which is the best way to get a connection to an external database per task in Spark Streaming? Hi Tobias, thanks for your help. I understand that with that code we obtain a database connection per partition, but I also suspect that with that code a new database connection is created per each execution of the function used as argument for mapPartitions(). That would be very inefficient because a new object and a new database connection would be created for each batch of the DStream. But my knowledge about the lifecycle of Functions in Spark Streaming is very limited, so maybe I'm wrong, what do you think? Greetings, Juan 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer t...@preferred.jpmailto:t...@preferred.jp: Juan, I am doing something similar, just not insert into SQL database, but issue some RPC call. I think mapPartitions() may be helpful to you. You could do something like dstream.mapPartitions(iter = { val db = new DbConnection() // maybe only do the above if !iter.isEmpty iter.map(item = { db.call(...) // do some cleanup if !iter.hasNext here item }) }).count() // force output Keep in mind though that the whole idea about RDDs is that operations are idempotent and in theory could be run on multiple hosts (to take the result from the fastest server) or multiple times (to deal with failures/timeouts) etc., which is maybe something you want to deal with in your SQL. Tobias On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.commailto:juan.rodriguez.hort...@gmail.com wrote: Hi list, I'm writing a Spark Streaming program that reads from a kafka topic, performs some transformations on the data, and then inserts each record in a database with foreachRDD. I was wondering which is the best way to handle the connection to the database so each worker, or even each task, uses a different connection to the database, and then database inserts/updates would be performed in parallel. - I understand that using a final variable in the driver code is not a good idea because then the communication with the database would be performed in the driver code, which leads to a bottleneck, according to http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ - I think creating a new connection in the call() method of the Function passed to foreachRDD is also a bad idea, because then I wouldn't be reusing the connection to the database for each batch RDD in the DStream - I'm not sure that a broadcast variable with the connection handler is a good idea in case the target database is distributed, because if the same handler is used for all the nodes of the Spark cluster then than could have a negative effect in the data locality of the connection to the database. - From http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html I understand that by using an static variable and referencing it in the call() method of the Function passed to foreachRDD we get a different connection per Spark worker, I guess it's because there is a different JVM per worker. But then all the tasks in the same worker would share the same database handler object, am I right? - Another idea is using updateStateByKey() using the database handler as the state, but I guess that would only work for Serializable database handlers, and for
Re: Which is the best way to get a connection to an external database per task in Spark Streaming?
Juan, I am doing something similar, just not insert into SQL database, but issue some RPC call. I think mapPartitions() may be helpful to you. You could do something like dstream.mapPartitions(iter = { val db = new DbConnection() // maybe only do the above if !iter.isEmpty iter.map(item = { db.call(...) // do some cleanup if !iter.hasNext here item }) }).count() // force output Keep in mind though that the whole idea about RDDs is that operations are idempotent and in theory could be run on multiple hosts (to take the result from the fastest server) or multiple times (to deal with failures/timeouts) etc., which is maybe something you want to deal with in your SQL. Tobias On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi list, I'm writing a Spark Streaming program that reads from a kafka topic, performs some transformations on the data, and then inserts each record in a database with foreachRDD. I was wondering which is the best way to handle the connection to the database so each worker, or even each task, uses a different connection to the database, and then database inserts/updates would be performed in parallel. - I understand that using a final variable in the driver code is not a good idea because then the communication with the database would be performed in the driver code, which leads to a bottleneck, according to http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ - I think creating a new connection in the call() method of the Function passed to foreachRDD is also a bad idea, because then I wouldn't be reusing the connection to the database for each batch RDD in the DStream - I'm not sure that a broadcast variable with the connection handler is a good idea in case the target database is distributed, because if the same handler is used for all the nodes of the Spark cluster then than could have a negative effect in the data locality of the connection to the database. - From http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html I understand that by using an static variable and referencing it in the call() method of the Function passed to foreachRDD we get a different connection per Spark worker, I guess it's because there is a different JVM per worker. But then all the tasks in the same worker would share the same database handler object, am I right? - Another idea is using updateStateByKey() using the database handler as the state, but I guess that would only work for Serializable database handlers, and for example not for an org.apache.hadoop.hbase.client.HTable object. So my question is, which is the best way to get a connection to an external database per task in Spark Streaming? Or at least per worker. In http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html there is a partial solution to this question, but there the database handler object is missing. This other question http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html is closer to mine, but there is no answer for it yet Thanks in advance, Greetings, Juan