Re: Re: spark dataframe jdbc read/write using dbcp connection pool
OK. I am trying to use the jdbc read datasource with predicate like the following : sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props) I can see that the task goes to 62 partitions. But I still get exception and the parquet file did not write successfully. Do I need to increase the partitions? Or is there any other alternatives I can choose to tune this ? Best, Sun. fightf...@163.com From: fightf...@163.com Date: 2016-01-20 15:06 To: 刘虓 CC: user Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, Thanks a lot for your suggestion. I then tried the following code : val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") prop.setProperty("partitionColumn", "added_year") prop.setProperty("lowerBound", "1985") prop.setProperty("upperBound","2015") prop.setProperty("numPartitions", "200") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4") The added_year column in mysql table contains range of (1985-2015), and I pass the numPartitions property to get the partition purpose. Is this what you recommend ? Can you advice a little more implementation on this ? Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 11:26 To: fightf...@163.com CC: user Subject: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I suggest you partition the JDBC reading on a indexed column of the mysql table 2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>: Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
Re: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi, I think you can view the spark job ui to find out whether the partition works or not,pay attention to the storage page to the partition size and which stage / task fails 2016-01-20 16:25 GMT+08:00 fightf...@163.com <fightf...@163.com>: > OK. I am trying to use the jdbc read datasource with predicate like the > following : >sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props) > I can see that the task goes to 62 partitions. But I still get exception > and the parquet > file did not write successfully. Do I need to increase the partitions? Or > is there any other > alternatives I can choose to tune this ? > > Best, > Sun. > > -- > fightf...@163.com > > > *From:* fightf...@163.com > *Date:* 2016-01-20 15:06 > *To:* 刘虓 <ipf...@gmail.com> > *CC:* user <user@spark.apache.org> > *Subject:* Re: Re: spark dataframe jdbc read/write using dbcp connection > pool > Hi, > Thanks a lot for your suggestion. I then tried the following code : > val prop = new java.util.Properties > prop.setProperty("user","test") > prop.setProperty("password", "test") > prop.setProperty("partitionColumn", "added_year") > prop.setProperty("lowerBound", "1985") > prop.setProperty("upperBound","2015") > prop.setProperty("numPartitions", "200") > > val url1 = "jdbc:mysql://172.16.54.136:3306/db1" > val url2 = "jdbc:mysql://172.16.54.138:3306/db1" > val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop) > val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop) > > val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) > jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4 > ") > > The added_year column in mysql table contains range of (1985-2015), and I > pass the numPartitions property > to get the partition purpose. Is this what you recommend ? Can you advice > a little more implementation on this ? > > Best, > Sun. > > -- > fightf...@163.com > > > *From:* 刘虓 <ipf...@gmail.com> > *Date:* 2016-01-20 11:26 > *To:* fightf...@163.com > *CC:* user <user@spark.apache.org> > *Subject:* Re: spark dataframe jdbc read/write using dbcp connection pool > Hi, > I suggest you partition the JDBC reading on a indexed column of the mysql > table > > 2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>: > >> Hi , >> I want to load really large volumn datasets from mysql using spark >> dataframe api. And then save as >> parquet file or orc file to facilitate that with hive / Impala. The >> datasets size is about 1 billion records and >> when I am using the following naive code to run that , Error occurs and >> executor lost failure. >> >> val prop = new java.util.Properties >> prop.setProperty("user","test") >> prop.setProperty("password", "test") >> >> val url1 = "jdbc:mysql://172.16.54.136:3306/db1" >> val url2 = "jdbc:mysql://172.16.54.138:3306/db1" >> val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) >> val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) >> >> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) >> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf >> ") >> >> I can see from the executor log and the message is like the following. I >> can see from the log that the wait_timeout threshold reached >> and there is no retry mechanism in the code process. So I am asking you >> experts to help on tuning this. Or should I try to use a jdbc >> connection pool to increase parallelism ? >> >> >> 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage >> 0.0 (TID 0) >> >> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link >> failure >> >> >> The last packet successfully received from the server was 377,769 >> milliseconds ago. The last packet sent successfully to the server was >> 377,790 milliseconds ago. >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> >> Caused by: >> java.io.EOFException: Can not read response from server. Expected to read 4 >> bytes, read 1 bytes before connection was unexpectedly lost. >> at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) >> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) >> ... 22 more >> >> 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned >> task 4 >> 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection >> >> 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage >> 0.0 (TID 2) >> >> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link >> failure >> >> >> >> -- >> fightf...@163.com >> > >
Re: Re: spark dataframe jdbc read/write using dbcp connection pool
OK. I see there actually goes more partitions when I use predicate from the spark job ui. But each task then failed with the same error message : com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. Do I need to increase the partitions ? Or shall I write parquet file for each partition in a iterable way ? Thanks a lot for your advice. Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 18:31 To: fightf...@163.com CC: user Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I think you can view the spark job ui to find out whether the partition works or not,pay attention to the storage page to the partition size and which stage / task fails 2016-01-20 16:25 GMT+08:00 fightf...@163.com <fightf...@163.com>: OK. I am trying to use the jdbc read datasource with predicate like the following : sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props) I can see that the task goes to 62 partitions. But I still get exception and the parquet file did not write successfully. Do I need to increase the partitions? Or is there any other alternatives I can choose to tune this ? Best, Sun. fightf...@163.com From: fightf...@163.com Date: 2016-01-20 15:06 To: 刘虓 CC: user Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, Thanks a lot for your suggestion. I then tried the following code : val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") prop.setProperty("partitionColumn", "added_year") prop.setProperty("lowerBound", "1985") prop.setProperty("upperBound","2015") prop.setProperty("numPartitions", "200") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4") The added_year column in mysql table contains range of (1985-2015), and I pass the numPartitions property to get the partition purpose. Is this what you recommend ? Can you advice a little more implementation on this ? Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 11:26 To: fightf...@163.com CC: user Subject: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I suggest you partition the JDBC reading on a indexed column of the mysql table 2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>: Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
spark dataframe jdbc read/write using dbcp connection pool
Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
Re: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi, Thanks a lot for your suggestion. I then tried the following code : val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") prop.setProperty("partitionColumn", "added_year") prop.setProperty("lowerBound", "1985") prop.setProperty("upperBound","2015") prop.setProperty("numPartitions", "200") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4") The added_year column in mysql table contains range of (1985-2015), and I pass the numPartitions property to get the partition purpose. Is this what you recommend ? Can you advice a little more implementation on this ? Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 11:26 To: fightf...@163.com CC: user Subject: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I suggest you partition the JDBC reading on a indexed column of the mysql table 2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>: Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
Re: spark dataframe jdbc read/write using dbcp connection pool
Hi, I suggest you partition the JDBC reading on a indexed column of the mysql table 2016-01-20 10:11 GMT+08:00 fightf...@163.com: > Hi , > I want to load really large volumn datasets from mysql using spark > dataframe api. And then save as > parquet file or orc file to facilitate that with hive / Impala. The > datasets size is about 1 billion records and > when I am using the following naive code to run that , Error occurs and > executor lost failure. > > val prop = new java.util.Properties > prop.setProperty("user","test") > prop.setProperty("password", "test") > > val url1 = "jdbc:mysql://172.16.54.136:3306/db1" > val url2 = "jdbc:mysql://172.16.54.138:3306/db1" > val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) > val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) > > val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) > jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") > > I can see from the executor log and the message is like the following. I > can see from the log that the wait_timeout threshold reached > and there is no retry mechanism in the code process. So I am asking you > experts to help on tuning this. Or should I try to use a jdbc > connection pool to increase parallelism ? > > > 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 > (TID 0) > > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link > failure > > > The last packet successfully received from the server was 377,769 > milliseconds ago. The last packet sent successfully to the server was > 377,790 milliseconds ago. > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > Caused by: > java.io.EOFException: Can not read response from server. Expected to read 4 > bytes, read 1 bytes before connection was unexpectedly lost. > at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) > at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) > ... 22 more > > 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned > task 4 > 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection > > 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 > (TID 2) > > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link > failure > > > > -- > fightf...@163.com >