Re: Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-20 Thread fightf...@163.com
OK. I am trying to use the jdbc read datasource with predicate like the 
following : 
   sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
I can see that the task goes to 62 partitions. But I still get exception and 
the parquet 
file did not write successfully. Do I need to increase the partitions? Or is 
there any other 
alternatives I can choose to tune this ? 

Best,
Sun.



fightf...@163.com
 
From: fightf...@163.com
Date: 2016-01-20 15:06
To: 刘虓
CC: user
Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
Thanks a lot for your suggestion. I then tried the following code :
val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")
prop.setProperty("partitionColumn", "added_year")
prop.setProperty("lowerBound", "1985")
prop.setProperty("upperBound","2015")
prop.setProperty("numPartitions", "200")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4")

The added_year column in mysql table contains range of (1985-2015), and I pass 
the numPartitions property 
to get the partition purpose. Is this what you recommend ? Can you advice a 
little more implementation on this ? 

Best,
Sun.



fightf...@163.com
 
From: 刘虓
Date: 2016-01-20 11:26
To: fightf...@163.com
CC: user
Subject: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql table

2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>:
Hi , 
I want to load really large volumn datasets from mysql using spark dataframe 
api. And then save as 
parquet file or orc file to facilitate that with hive / Impala. The datasets 
size is about 1 billion records and 
when I am using the following naive code to run that , Error occurs and 
executor lost failure.

val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")

I can see from the executor log and the message is like the following. I can 
see from the log that the wait_timeout threshold reached 
and there is no retry mechanism in the code process. So I am asking you experts 
to help on tuning this. Or should I try to use a jdbc
connection pool to increase parallelism ? 

   16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Caused by: java.io.EOFException: Can not read response from server. Expected to 
read 4 bytes, read 1 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
... 22 more
16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
4
16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
(TID 2)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure





fightf...@163.com



Re: Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-20 Thread 刘虓
Hi,
I think you can view the spark job ui to find out whether the partition
works or not,pay attention to the storage page to the partition size and
which stage / task fails

2016-01-20 16:25 GMT+08:00 fightf...@163.com <fightf...@163.com>:

> OK. I am trying to use the jdbc read datasource with predicate like the
> following :
>sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
> I can see that the task goes to 62 partitions. But I still get exception
> and the parquet
> file did not write successfully. Do I need to increase the partitions? Or
> is there any other
> alternatives I can choose to tune this ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* fightf...@163.com
> *Date:* 2016-01-20 15:06
> *To:* 刘虓 <ipf...@gmail.com>
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: Re: spark dataframe jdbc read/write using dbcp connection
> pool
> Hi,
> Thanks a lot for your suggestion. I then tried the following code :
> val prop = new java.util.Properties
> prop.setProperty("user","test")
> prop.setProperty("password", "test")
> prop.setProperty("partitionColumn", "added_year")
> prop.setProperty("lowerBound", "1985")
> prop.setProperty("upperBound","2015")
> prop.setProperty("numPartitions", "200")
>
> val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
> val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
> val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop)
> val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop)
>
> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4
> ")
>
> The added_year column in mysql table contains range of (1985-2015), and I
> pass the numPartitions property
> to get the partition purpose. Is this what you recommend ? Can you advice
> a little more implementation on this ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* 刘虓 <ipf...@gmail.com>
> *Date:* 2016-01-20 11:26
> *To:* fightf...@163.com
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: spark dataframe jdbc read/write using dbcp connection pool
> Hi,
> I suggest you partition the JDBC reading on a indexed column of the mysql
> table
>
> 2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>:
>
>> Hi ,
>> I want to load really large volumn datasets from mysql using spark
>> dataframe api. And then save as
>> parquet file or orc file to facilitate that with hive / Impala. The
>> datasets size is about 1 billion records and
>> when I am using the following naive code to run that , Error occurs and
>> executor lost failure.
>>
>> val prop = new java.util.Properties
>> prop.setProperty("user","test")
>> prop.setProperty("password", "test")
>>
>> val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
>> val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
>> val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
>> val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)
>>
>> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
>> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf
>> ")
>>
>> I can see from the executor log and the message is like the following. I
>> can see from the log that the wait_timeout threshold reached
>> and there is no retry mechanism in the code process. So I am asking you
>> experts to help on tuning this. Or should I try to use a jdbc
>> connection pool to increase parallelism ?
>>
>>
>> 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
>> 0.0 (TID 0)
>>
>> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
>> failure
>>
>>
>> The last packet successfully received from the server was 377,769 
>> milliseconds ago.  The last packet sent successfully to the server was 
>> 377,790 milliseconds ago.
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>
>> Caused by:
>> java.io.EOFException: Can not read response from server. Expected to read 4 
>> bytes, read 1 bytes before connection was unexpectedly lost.
>> at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
>> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
>> ... 22 more
>>
>> 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
>> task 4
>> 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
>>
>> 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 
>> 0.0 (TID 2)
>>
>> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
>> failure
>>
>>
>>
>> --
>> fightf...@163.com
>>
>
>


Re: Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-20 Thread fightf...@163.com
OK. I see there actually goes more partitions when I use predicate from the 
spark job ui. But each task then failed 
with the same error message :

   com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.

Do I need to increase the partitions ? Or shall I write parquet file for each 
partition in a iterable way ? 

Thanks a lot for your advice.

Best,
Sun.



fightf...@163.com
 
From: 刘虓
Date: 2016-01-20 18:31
To: fightf...@163.com
CC: user
Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
I think you can view the spark job ui to find out whether the partition works 
or not,pay attention to the storage page to the partition size and which stage 
/ task fails

2016-01-20 16:25 GMT+08:00 fightf...@163.com <fightf...@163.com>:
OK. I am trying to use the jdbc read datasource with predicate like the 
following : 
   sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
I can see that the task goes to 62 partitions. But I still get exception and 
the parquet 
file did not write successfully. Do I need to increase the partitions? Or is 
there any other 
alternatives I can choose to tune this ? 

Best,
Sun.



fightf...@163.com
 
From: fightf...@163.com
Date: 2016-01-20 15:06
To: 刘虓
CC: user
Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
Thanks a lot for your suggestion. I then tried the following code :
val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")
prop.setProperty("partitionColumn", "added_year")
prop.setProperty("lowerBound", "1985")
prop.setProperty("upperBound","2015")
prop.setProperty("numPartitions", "200")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4")

The added_year column in mysql table contains range of (1985-2015), and I pass 
the numPartitions property 
to get the partition purpose. Is this what you recommend ? Can you advice a 
little more implementation on this ? 

Best,
Sun.



fightf...@163.com
 
From: 刘虓
Date: 2016-01-20 11:26
To: fightf...@163.com
CC: user
Subject: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql table

2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>:
Hi , 
I want to load really large volumn datasets from mysql using spark dataframe 
api. And then save as 
parquet file or orc file to facilitate that with hive / Impala. The datasets 
size is about 1 billion records and 
when I am using the following naive code to run that , Error occurs and 
executor lost failure.

val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")

I can see from the executor log and the message is like the following. I can 
see from the log that the wait_timeout threshold reached 
and there is no retry mechanism in the code process. So I am asking you experts 
to help on tuning this. Or should I try to use a jdbc
connection pool to increase parallelism ? 

   16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Caused by: java.io.EOFException: Can not read response from server. Expected to 
read 4 bytes, read 1 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
... 22 more
16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
4
16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
(TID 2)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure





fightf...@163.com




spark dataframe jdbc read/write using dbcp connection pool

2016-01-19 Thread fightf...@163.com
Hi , 
I want to load really large volumn datasets from mysql using spark dataframe 
api. And then save as 
parquet file or orc file to facilitate that with hive / Impala. The datasets 
size is about 1 billion records and 
when I am using the following naive code to run that , Error occurs and 
executor lost failure.

val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")

I can see from the executor log and the message is like the following. I can 
see from the log that the wait_timeout threshold reached 
and there is no retry mechanism in the code process. So I am asking you experts 
to help on tuning this. Or should I try to use a jdbc
connection pool to increase parallelism ? 

   16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Caused by: java.io.EOFException: Can not read response from server. Expected to 
read 4 bytes, read 1 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
... 22 more
16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
4
16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
(TID 2)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure





fightf...@163.com


Re: Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-19 Thread fightf...@163.com
Hi,
Thanks a lot for your suggestion. I then tried the following code :
val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")
prop.setProperty("partitionColumn", "added_year")
prop.setProperty("lowerBound", "1985")
prop.setProperty("upperBound","2015")
prop.setProperty("numPartitions", "200")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4")

The added_year column in mysql table contains range of (1985-2015), and I pass 
the numPartitions property 
to get the partition purpose. Is this what you recommend ? Can you advice a 
little more implementation on this ? 

Best,
Sun.



fightf...@163.com
 
From: 刘虓
Date: 2016-01-20 11:26
To: fightf...@163.com
CC: user
Subject: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql table

2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>:
Hi , 
I want to load really large volumn datasets from mysql using spark dataframe 
api. And then save as 
parquet file or orc file to facilitate that with hive / Impala. The datasets 
size is about 1 billion records and 
when I am using the following naive code to run that , Error occurs and 
executor lost failure.

val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")

I can see from the executor log and the message is like the following. I can 
see from the log that the wait_timeout threshold reached 
and there is no retry mechanism in the code process. So I am asking you experts 
to help on tuning this. Or should I try to use a jdbc
connection pool to increase parallelism ? 

   16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Caused by: java.io.EOFException: Can not read response from server. Expected to 
read 4 bytes, read 1 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
... 22 more
16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
4
16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
(TID 2)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure





fightf...@163.com



Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-19 Thread 刘虓
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql
table

2016-01-20 10:11 GMT+08:00 fightf...@163.com :

> Hi ,
> I want to load really large volumn datasets from mysql using spark
> dataframe api. And then save as
> parquet file or orc file to facilitate that with hive / Impala. The
> datasets size is about 1 billion records and
> when I am using the following naive code to run that , Error occurs and
> executor lost failure.
>
> val prop = new java.util.Properties
> prop.setProperty("user","test")
> prop.setProperty("password", "test")
>
> val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
> val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
> val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
> val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)
>
> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")
>
> I can see from the executor log and the message is like the following. I
> can see from the log that the wait_timeout threshold reached
> and there is no retry mechanism in the code process. So I am asking you
> experts to help on tuning this. Or should I try to use a jdbc
> connection pool to increase parallelism ?
>
>
> 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 
> (TID 0)
>
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
>
>
> The last packet successfully received from the server was 377,769 
> milliseconds ago.  The last packet sent successfully to the server was 
> 377,790 milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> Caused by:
> java.io.EOFException: Can not read response from server. Expected to read 4 
> bytes, read 1 bytes before connection was unexpectedly lost.
> at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
> ... 22 more
>
> 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 4
> 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
>
> 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
> (TID 2)
>
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
>
>
>
> --
> fightf...@163.com
>