I am using spark 1.5.2 with Memsql Database as a persistent repository

 

I am trying to update rows (based on the primary key), if it is appears more
than 1 time (basically run the save load as a Upsert operation)

 

val UpSertConf = SaveToMemSQLConf(msc.memSQLConf, 

                                     Some(SaveMode.Overwrite),

                                     //Some(SaveMode.Append),

                                     Map(

                                         "duplicate_key_behavior" ->
"Replace"

                                         //,"insertBatchSize" -> "100"

                                         

                                         )

                                    )

 

 

When I set the SaveMode to "Overwrite", I get the following errors, bcos of
object locking

 

 

[cloudera@quickstart scala-2.10]$ spark-submit GLBalance-assembly-1.0.jar
/home/cloudera/Downloads/cloud_code/output/FULL/2016-09-09_15-44-17

(number of records ,204919)


[Stage 7:>                                                          (0 + 4)
/ 7]16/09/09 21:26:01 ERROR Executor: Exception in task 1.0 in stage 7.0
(TID 22)

java.sql.SQLException: Leaf Error (127.0.0.1:3308): Lock wait timeout
exceeded; try restarting transaction

    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)

    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)

    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)

    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:870)

    at com.mysql.jdbc.MysqlIO.sendFileToServer(MysqlIO.java:3790)

    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:2995)

    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2245)

    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2638)

    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2526)

    at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1618)

    at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1549)

    at
org.apache.commons.dbcp2.DelegatingStatement.executeUpdate(DelegatingStateme
nt.java:234)

    at
org.apache.commons.dbcp2.DelegatingStatement.executeUpdate(DelegatingStateme
nt.java:234)

 

 

If I change the SaveMode to "append", the load completes but all the
duplicate records gets rejected. I was told that there is a known bug 

 

https://issues.apache.org/jira/browse/SPARK-13699 (edited)

 

If there another way for me to save the data (in an Upsert mode) ?

 

Subhs

Reply via email to