Shrikant created SPARK-26231: -------------------------------- Summary: Dataframes inner join on double datatype columns resulting in Cartesian product Key: SPARK-26231 URL: https://issues.apache.org/jira/browse/SPARK-26231 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.6.1, 1.6.0 Reporter: Shrikant
Following code snippet explains the bug. The join on the Double columns results in catersian , when both columns typecasted to String it works. please see the explain plan belolw Error: scala> cartesianJoinErr.explain() == Physical Plan == CartesianProduct :- ConvertToSafe : +- Project [name#143,group#144,data#145,name#143 AS name1#146] : +- Filter (name#143 = name#143) : +- Scan ExistingRDD[name#143,group#144,data#145] +- Scan ExistingRDD[name#147,group#148,data#149] ----------------------------------------------------------- After conversion to String explain plan stringColJoinWorks.explain() == Physical Plan == SortMergeJoin [name1String#151], [name2String#152] :- Sort [name1String#151 ASC], false, 0 : +- TungstenExchange hashpartitioning(name1String#151,200), None : +- Project [name#143,group#144,data#145,cast(name#143 as string) AS name1String#151] : +- Scan ExistingRDD[name#143,group#144,data#145] +- Sort [name2String#152 ASC], false, 0 +- TungstenExchange hashpartitioning(name2String#152,200), None +- Project [name#153,group#154,data#155,cast(name#153 as string) AS name2String#152] +- Scan ExistingRDD[name#153,group#154,data#155] import org.apache.spark.sql.Row import org.apache.spark.sql.Dataset import org.apache.spark.sql.types._ import org.apache.spark.sql.functions val doubleRDD = sc.parallelize(Seq( Row(11111.0, 2, 1), Row(22222.0, 8, 2), Row(33333.0, 10, 3), Row(44444.0, 10, 4))) val testSchema = StructType(Seq( StructField("name", DoubleType, nullable = true), StructField("group", IntegerType, nullable = true), StructField("data", IntegerType, nullable = true))) val doubleRDDCartesian = sqlContext.createDataFrame(doubleRDD, testSchema) val cartNewCol = doubleRDDCartesian.select($"name" , $"group", $"data") val newColName1DF = cartNewCol.withColumn("name1", $"name") val cartesianJoinErr = newColName1DF.join(doubleRDDCartesian, newColName1DF("name1")===(doubleRDDCartesian("name"))) cartesianJoinErr.show cartesianJoinErr.explain() //Convert both into StringType val stringColDF1 = doubleRDDCartesian.withColumn("name1String",$"name".cast("String")) val stringColDF2 = cartNewCol.withColumn("name2String", $"name".cast("String")) val stringColJoinWorks = stringColDF1.join(stringColDF2, stringColDF1("name1String")===(stringColDF2("name2String"))) stringColJoinWorks.show stringColJoinWorks.explain() -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org