Hi,
Thanks a lot for your suggestion. I then tried the following code :
    val prop = new java.util.Properties
    prop.setProperty("user","test")
    prop.setProperty("password", "test")
    prop.setProperty("partitionColumn", "added_year")
    prop.setProperty("lowerBound", "1985")
    prop.setProperty("upperBound","2015")
    prop.setProperty("numPartitions", "200")
    
    val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
    val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
    val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop)
    val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop)
    
    val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
    jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4")

The added_year column in mysql table contains range of (1985-2015), and I pass 
the numPartitions property 
to get the partition purpose. Is this what you recommend ? Can you advice a 
little more implementation on this ? 

Best,
Sun.



fightf...@163.com
 
From: 刘虓
Date: 2016-01-20 11:26
To: fightf...@163.com
CC: user
Subject: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql table

2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>:
Hi , 
I want to load really large volumn datasets from mysql using spark dataframe 
api. And then save as 
parquet file or orc file to facilitate that with hive / Impala. The datasets 
size is about 1 billion records and 
when I am using the following naive code to run that , Error occurs and 
executor lost failure.

    val prop = new java.util.Properties
    prop.setProperty("user","test")
    prop.setProperty("password", "test")
    
    val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
    val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
    val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
    val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)
    
    val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
    jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")
    
I can see from the executor log and the message is like the following. I can 
see from the log that the wait_timeout threshold reached 
and there is no retry mechanism in the code process. So I am asking you experts 
to help on tuning this. Or should I try to use a jdbc
connection pool to increase parallelism ? 

   16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Caused by: java.io.EOFException: Can not read response from server. Expected to 
read 4 bytes, read 1 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
... 22 more
16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
4
16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
(TID 2)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure





fightf...@163.com

Reply via email to