Shrikant created SPARK-26231:
--------------------------------

             Summary: Dataframes inner join on double datatype columns 
resulting in Cartesian product
                 Key: SPARK-26231
                 URL: https://issues.apache.org/jira/browse/SPARK-26231
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.6.1, 1.6.0
            Reporter: Shrikant


Following code snippet explains the bug. The join on the Double columns results 
in catersian , when both columns typecasted to String it works.

please see the explain plan belolw

Error: scala> cartesianJoinErr.explain()
== Physical Plan ==
CartesianProduct
:- ConvertToSafe
:  +- Project [name#143,group#144,data#145,name#143 AS name1#146]
:     +- Filter (name#143 = name#143)
:        +- Scan ExistingRDD[name#143,group#144,data#145]
+- Scan ExistingRDD[name#147,group#148,data#149]

-----------------------------------------------------------

After conversion to String explain plan

stringColJoinWorks.explain()
== Physical Plan ==
SortMergeJoin [name1String#151], [name2String#152]
:- Sort [name1String#151 ASC], false, 0
:  +- TungstenExchange hashpartitioning(name1String#151,200), None
:     +- Project [name#143,group#144,data#145,cast(name#143 as string) AS 
name1String#151]
:        +- Scan ExistingRDD[name#143,group#144,data#145]
+- Sort [name2String#152 ASC], false, 0
   +- TungstenExchange hashpartitioning(name2String#152,200), None
      +- Project [name#153,group#154,data#155,cast(name#153 as string) AS 
name2String#152]
         +- Scan ExistingRDD[name#153,group#154,data#155]

 

 

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions

val doubleRDD = sc.parallelize(Seq(
    Row(11111.0, 2, 1),
    Row(22222.0, 8, 2),
    Row(33333.0, 10, 3),
    Row(44444.0, 10, 4)))
    
val testSchema = StructType(Seq(
    StructField("name", DoubleType, nullable = true),
    StructField("group", IntegerType, nullable = true),
    StructField("data", IntegerType, nullable = true)))
    
val doubleRDDCartesian = sqlContext.createDataFrame(doubleRDD, testSchema)

val cartNewCol = doubleRDDCartesian.select($"name" , $"group", $"data")

val newColName1DF = cartNewCol.withColumn("name1", $"name")
val cartesianJoinErr = newColName1DF.join(doubleRDDCartesian, 
newColName1DF("name1")===(doubleRDDCartesian("name")))
cartesianJoinErr.show
cartesianJoinErr.explain()

//Convert both into StringType
val stringColDF1 = 
doubleRDDCartesian.withColumn("name1String",$"name".cast("String"))
val stringColDF2 = cartNewCol.withColumn("name2String", $"name".cast("String"))

val stringColJoinWorks = stringColDF1.join(stringColDF2, 
stringColDF1("name1String")===(stringColDF2("name2String")))
stringColJoinWorks.show
stringColJoinWorks.explain()

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to