Jerry Lam created SPARK-14309: --------------------------------- Summary: Dataframe returns wrong results due to parsing incorrectly Key: SPARK-14309 URL: https://issues.apache.org/jira/browse/SPARK-14309 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.1 Reporter: Jerry Lam
I observed the below behavior using dataframe. The expected answer should be 60 but there is no way to get the value unless to turn dataframe into rdd and access it in the Row. I have include the SQL statement and it returns the correct result because I believe, it is using Hive parser. {code} val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50 to 99)).toDF("id", "label") val d1 = base.where($"label" < 60).as("d1") val d2 = base.where($"label" === 60).as("d2") d1.join(d2, "id").show +---+-----+-----+ | id|label|label| +---+-----+-----+ | 40| 40| 60| +---+-----+-----+ d1.join(d2, "id").select(d1("label")).show +-----+ |label| +-----+ | 40| +-----+ (expected answer: 40, right!) d1.join(d2, "id").map{row => row.getAs[Int](2)} d1.join(d2, "id").select(d2("label")).show +-----+ |label| +-----+ | 40| +-----+ (expected answer: 60, wrong!) d1.join(d2, "id").select(d2("label")).explain(true) scala> d1.join(d2, "id").select(d2("label")).explain(true) == Parsed Logical Plan == Project [label#3] Project [id#2,label#3,label#7] Join Inner, Some((id#2 = id#6)) Subquery d1 Filter (label#3 < 60) Project [_1#0 AS id#2,_2#1 AS label#3] LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 Subquery d2 Filter (label#7 = 60) Project [_1#0 AS id#6,_2#1 AS label#7] LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 == Analyzed Logical Plan == label: int Project [label#3] Project [id#2,label#3,label#7] Join Inner, Some((id#2 = id#6)) Subquery d1 Filter (label#3 < 60) Project [_1#0 AS id#2,_2#1 AS label#3] LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 Subquery d2 Filter (label#7 = 60) Project [_1#0 AS id#6,_2#1 AS label#7] LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 == Optimized Logical Plan == Project [label#3] Join Inner, Some((id#2 = id#6)) Project [_1#0 AS id#2,_2#1 AS label#3] Filter (_2#1 < 60) LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 Project [_1#0 AS id#6] Filter (_2#1 = 60) LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 == Physical Plan == TungstenProject [label#3] SortMergeJoin [id#2], [id#6] TungstenSort [id#2 ASC], false, 0 TungstenExchange hashpartitioning(id#2) TungstenProject [_1#0 AS id#2,_2#1 AS label#3] Filter (_2#1 < 60) Scan PhysicalRDD[_1#0,_2#1] TungstenSort [id#6 ASC], false, 0 TungstenExchange hashpartitioning(id#6) TungstenProject [_1#0 AS id#6] Filter (_2#1 = 60) Scan PhysicalRDD[_1#0,_2#1] def (d1 :DataFrame, d2: DataFrame) base.registerTempTable("base") sqlContext.sql("select d2.label from (select * from base where label < 60) as d1 inner join (select * from base where label = 60) as d2 on d1.id = d2.id").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias('d2.label)] 'Join Inner, Some(('d1.id = 'd2.id)) 'Subquery d1 'Project [unresolvedalias(*)] 'Filter ('label < 60) 'UnresolvedRelation [base], None 'Subquery d2 'Project [unresolvedalias(*)] 'Filter ('label = 60) 'UnresolvedRelation [base], None == Analyzed Logical Plan == label: int Project [label#15] Join Inner, Some((id#2 = id#14)) Subquery d1 Project [id#2,label#3] Filter (label#3 < 60) Subquery base Project [_1#0 AS id#2,_2#1 AS label#3] LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 Subquery d2 Project [id#14,label#15] Filter (label#15 = 60) Subquery base Project [_1#0 AS id#14,_2#1 AS label#15] LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 == Optimized Logical Plan == Project [label#15] Join Inner, Some((id#2 = id#14)) Project [_1#0 AS id#2] Filter (_2#1 < 60) LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 Project [_1#0 AS id#14,_2#1 AS label#15] Filter (_2#1 = 60) LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21 == Physical Plan == TungstenProject [label#15] SortMergeJoin [id#2], [id#14] TungstenSort [id#2 ASC], false, 0 TungstenExchange hashpartitioning(id#2) TungstenProject [_1#0 AS id#2] Filter (_2#1 < 60) Scan PhysicalRDD[_1#0,_2#1] TungstenSort [id#14 ASC], false, 0 TungstenExchange hashpartitioning(id#14) TungstenProject [_1#0 AS id#14,_2#1 AS label#15] Filter (_2#1 = 60) Scan PhysicalRDD[_1#0,_2#1] {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org