I want to filter a DataFrame based on a Date column. 

 

If the DataFrame object is constructed from a scala case class, it's
working (either compare as String or Date). But if the DataFrame is
generated by specifying a Schema to an RDD, it doesn't work. Below is
the exception and test code.

 

Do you have any idea about the error? Thank you very much!

 

================exception=================

java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer

    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

    at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$
anonfun$apply$6.apply(Cast.scala:116)

    at
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata
lyst$expressions$Cast$$buildCast(Cast.scala:111)

    at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a
pply(Cast.scala:116)

    at
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426)

    at
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic
ates.scala:305)

    at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

    at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 

================code=================

 

    val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

    val sc = new SparkContext(conf)

    val sqlCtx = new HiveContext(sc)

    import sqlCtx.implicits._

    

    case class Test(dt: java.sql.Date)

 

    val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7)))).toDF

    

    var r = df.filter("dt >= '2015-05-06'")

    r.explain(true)

    r.show    

    println("======")

    var r2 = df.filter("dt >= cast('2015-05-06' as DATE)")

    r2.explain(true)

    r2.show    

    println("======")

 

    // "df2" doesn't do filter correct!!

    val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7)))))

    

    val schema = StructType(Array(StructField("dt", DateType, false)))

    

    val df2 = sqlCtx.applySchema(rdd2, schema) 

    

    r = df2.filter("dt >= '2015-05-06'")

    r.explain(true)

    r.show    

    println("======")

    

    r2 = df2.filter("dt >= cast('2015-05-06' as DATE)")

    r2.explain(true)

    r2.show    

 

Reply via email to