Hello, I 'm trying to develop with the new Dataframe API, but I'm running into
an error.

I have an existing MySQL database and I want to insert rows.
I create a Dataframe from an RDD, then use the "insertIntoJDBC" function.
It appear that dataframes reorder the data inside them.
As a result, I get an error because the fields are not inserted in the proper
order.

Is there a way to specify the name or the order of my variables inside the
database?

If it is a bug, here is an example to reproduce it:

My table:
==========
CREATE TABLE `reference` (
  `zvalue` text,
  `avalue` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
==========


My class:
==========
package org.mypackage.testspark;

import java.util.Arrays;

public class MysqlInsert {

    public static class rowStruct implements java.io.Serializable {

        private static final long serialVersionUID = 1L;

        public java.lang.String zvalue;

        public java.lang.Integer avalue;

        public rowStruct() {
        }

        public java.lang.String getZvalue() {
            return this.zvalue;
        }

        public java.lang.Integer getAvalue() {
            return this.avalue;
        }

        public void setZvalue(java.lang.String zvalue) {
            this.zvalue = zvalue;
        }

        public void setAvalue(java.lang.Integer avalue) {
            this.avalue = avalue;
        }

    }

    public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new SparkConf().setAppName("myApp");
        sparkConf.setMaster("local[2]");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);

        rowStruct rowStruct = new rowStruct();
        rowStruct.setZvalue("test");
        rowStruct.setAvalue(1);

org.apache.spark.api.java.JavaRDD<rowStruct> rdd_row6 = ctx.parallelize(Arrays.asList(rowStruct)); org.apache.spark.sql.SQLContext sqlCtx = new org.apache.spark.sql.SQLContext(ctx);

org.apache.spark.sql.DataFrame df = sqlCtx.createDataFrame(rdd_row6, rowStruct.class); df.insertIntoJDBC("jdbc:mysql://172.17.0.2:3306/mysql?user=root&password=pass", "reference", false);
    }

}
==========

My error log:
==========
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/27 18:31:19 INFO SparkContext: Running Spark version 1.3.0-SNAPSHOT
15/03/27 18:31:19 WARN Utils: Your hostname, Tlnd-pbailly resolves to a loopback address: 127.0.1.1; using 10.42.20.124 instead (on interface wlan0) 15/03/27 18:31:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/03/27 18:31:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/27 18:31:20 INFO SecurityManager: Changing view acls to: pbailly
15/03/27 18:31:20 INFO SecurityManager: Changing modify acls to: pbailly
15/03/27 18:31:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pbailly); users with modify permissions: Set(pbailly)
15/03/27 18:31:20 INFO Slf4jLogger: Slf4jLogger started
15/03/27 18:31:20 INFO Remoting: Starting remoting
15/03/27 18:31:20 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.42.20.124:58185] 15/03/27 18:31:20 INFO Utils: Successfully started service 'sparkDriver' on port 58185.
15/03/27 18:31:20 INFO SparkEnv: Registering MapOutputTracker
15/03/27 18:31:20 INFO SparkEnv: Registering BlockManagerMaster
15/03/27 18:31:20 INFO DiskBlockManager: Created local directory at /tmp/spark-1baef5a9-8c70-4c88-aaa6-7462f473c5b6/blockmgr-20176350-a69c-4170-b704-6621ca393889 15/03/27 18:31:20 INFO MemoryStore: MemoryStore started with capacity 947.7 MB 15/03/27 18:31:20 INFO HttpFileServer: HTTP File server directory is /tmp/spark-1ff51d4d-6172-4231-98c0-5e69edc6e64e/httpd-4eb77dcf-da49-438e-b5db-ecbf07193245
15/03/27 18:31:20 INFO HttpServer: Starting HTTP Server
15/03/27 18:31:20 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/27 18:31:20 INFO AbstractConnector: Started SocketConnector@0.0.0.0:43576 15/03/27 18:31:20 INFO Utils: Successfully started service 'HTTP file server' on port 43576.
15/03/27 18:31:20 INFO SparkEnv: Registering OutputCommitCoordinator
15/03/27 18:31:20 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/27 18:31:20 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/27 18:31:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/27 18:31:20 INFO SparkUI: Started SparkUI at http://10.42.20.124:4040
15/03/27 18:31:20 INFO Executor: Starting executor ID <driver> on host localhost 15/03/27 18:31:20 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.42.20.124:58185/user/HeartbeatReceiver
15/03/27 18:31:21 INFO NettyBlockTransferService: Server created on 43013
15/03/27 18:31:21 INFO BlockManagerMaster: Trying to register BlockManager
15/03/27 18:31:21 INFO BlockManagerMasterActor: Registering block manager localhost:43013 with 947.7 MB RAM, BlockManagerId(<driver>, localhost, 43013)
15/03/27 18:31:21 INFO BlockManagerMaster: Registered BlockManager
15/03/27 18:31:21 INFO SparkContext: Starting job: foreachPartition at DataFrame.scala:778 15/03/27 18:31:21 INFO DAGScheduler: Got job 0 (foreachPartition at DataFrame.scala:778) with 2 output partitions (allowLocal=false) 15/03/27 18:31:21 INFO DAGScheduler: Final stage: Stage 0(foreachPartition at DataFrame.scala:778)
15/03/27 18:31:21 INFO DAGScheduler: Parents of final stage: List()
15/03/27 18:31:21 INFO DAGScheduler: Missing parents: List()
15/03/27 18:31:21 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at map at DataFrame.scala:853), which has no missing parents 15/03/27 18:31:21 INFO MemoryStore: ensureFreeSpace(3968) called with curMem=0, maxMem=993735475 15/03/27 18:31:21 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.9 KB, free 947.7 MB) 15/03/27 18:31:21 INFO MemoryStore: ensureFreeSpace(2836) called with curMem=3968, maxMem=993735475 15/03/27 18:31:21 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.8 KB, free 947.7 MB) 15/03/27 18:31:21 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43013 (size: 2.8 KB, free: 947.7 MB) 15/03/27 18:31:21 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/27 18:31:21 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 15/03/27 18:31:21 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at map at DataFrame.scala:853)
15/03/27 18:31:21 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/27 18:31:21 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1262 bytes) 15/03/27 18:31:22 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1450 bytes)
15/03/27 18:31:22 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/27 18:31:22 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/03/27 18:31:22 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 620 bytes result sent to driver 15/03/27 18:31:22 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 228 ms on localhost (1/2)
15/03/27 18:31:22 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.sql.SQLException: Incorrect integer value: 'test' for column 'avalue' at row 1 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
               at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
               at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2141) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2077) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2062) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.savePartition(jdbc.scala:95) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:180) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:179) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
               at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
               at java.lang.Thread.run(Thread.java:745)
15/03/27 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.sql.SQLException: Incorrect integer value: 'test' for column 'avalue' at row 1
     at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)
     at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
     at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
     at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
     at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
     at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2141) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2077) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2062) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.savePartition(jdbc.scala:95) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:180) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:179) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
     at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:745)

15/03/27 18:31:22 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job 15/03/27 18:31:22 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/03/27 18:31:22 INFO TaskSchedulerImpl: Cancelling stage 0
15/03/27 18:31:22 INFO DAGScheduler: Job 0 failed: foreachPartition at DataFrame.scala:778, took 0.367799 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.sql.SQLException: Incorrect integer value: 'test' for column 'avalue' at row 1
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)
      at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
      at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
      at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
      at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
      at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2141) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2077) at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2062) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.savePartition(jdbc.scala:95) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:180) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:179) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1504)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
      at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

==========

My jar version:
spark-sql_2.10:1.3.0
mysql-connector-java:5.1.34


Thank you for you advice,
--
Pierre Bailly
Talend*

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to