Looks like you DF is based on a MySQL DB using jdbc, and error is thrown from mySQL. Can you see what SQL is finally getting fired in MySQL? Spark is pushing down the predicate to mysql so its not a spark problem perse
On Wed, Apr 29, 2015 at 9:56 PM, Francesco Bigarella < [email protected]> wrote: > Hi all, > > I was testing the DataFrame filter functionality and I found what I think > is a strange behaviour. > My dataframe testDF, obtained loading aMySQL table via jdbc, has the > following schema: > root > | -- id: long (nullable = false) > | -- title: string (nullable = true) > | -- value: string (nullable = false) > | -- status: string (nullable = false) > > What I want to do is filter my dataset to obtain all rows that have a > status = "new". > > scala> testDF.filter(testDF("id") === 1234).first() > works fine (also with the integer value within double quotes), however if > I try to use the same statement to filter on the status column (also with > changes in the syntax - see below), suddenly the program breaks. > > Any of the following > scala> testDF.filter(testDF("status") === "new") > scala> testDF.filter("status = 'new'") > scala> testDF.filter($"status" === "new") > > generates the error: > > INFO scheduler.DAGScheduler: Job 3 failed: runJob at SparkPlan.scala:121, > took 0.277907 s > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 3.0 (TID 12, <node name>): > com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column > 'new' in 'where clause' > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) > at com.mysql.jdbc.Util.getInstance(Util.java:386) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3597) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3529) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1990) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2625) > at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2119) > at > com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2283) > at org.apache.spark.sql.jdbc.JDBCRDD$anon$1.<init>(JDBCRDD.scala:328) > at org.apache.spark.sql.jdbc.JDBCRDD.compute(JDBCRDD.scala:309) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244 > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Does filter work only on columns of the integer type? What is the exact > behaviour of the filter function and what is the best way to handle the > query I am trying to execute? > > Thank you, > Francesco > > -- Best Regards, Ayan Guha
