This is fixed in Spark 1.6. On Fri, Dec 18, 2015 at 3:06 PM, Prasad Ravilla <pras...@slalom.com> wrote:
> Changing equality check from “<=>”to “===“ solved the problem. > Performance skyrocketed. > > I am wondering why “<=>” cause a performance degrade? > > val dates = new RetailDates() > val dataStructures = new DataStructures() > > // Reading CSV Format input files -- retailDates > // This DF has 75 records > val retailDatesWithSchema = sqlContext.read > .format("com.databricks.spark.csv") > .option("delimiter", ",") > .schema(dates.retailDatesSchema) > .load(datesFile) > .coalesce(1) > .cache() > > // Create UDF to convert String to Date > val dateUDF: (String => java.sql.Date) = (dateString: String) => new > java.sql.Date(customerDateFormat.parse(dateString).getTime()) > val stringToDateUDF = udf(dateUDF) > > // Reading Avro Format Input Files > // This DF has 500 million records > val userInputDf = sqlContext.read.avro(“customerLocation") > val userDf = userInputDf.withColumn("CAL_DT", > stringToDateUDF(col("CAL_DT"))).select( > "CAL_DT","USER_ID","USER_CNTRY_ID" > ) > > val userDimDf = > sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID") > // This DF has 800 million records > > val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema) > val userDimDfBroadcast = sc.broadcast(userDimDf) > > val userAndRetailDates = userDnaSdDf > .join((retailDatesWithSchemaBroadcast.value).as("retailDates"), > userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", > $"retailDates.WEEK_END_DATE") > , "inner") > > > > val userAndRetailDatesAndUserDim = userAndRetailDates > .join((userDimDfBroadcast.value) > .withColumnRenamed("USER_ID", "USER_DIM_USER_ID") > .withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID") > .as("userdim") > , userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID" > && userAndRetailDates("USER_CNTRY_ID") <=> > $"userdim.USER_DIM_COUNTRY_ID" > , "inner") > > userAndRetailDatesAndUserDim.show() > > > From: Prasad Ravilla > Date: Friday, December 18, 2015 at 7:38 AM > To: user > Subject: Joining DataFrames - Causing Cartesian Product > > Hi, > > I am running into performance issue when joining data frames created from > avro files using spark-avro library. > > The data frames are created from 120K avro files and the total size is > around 1.5 TB. > The two data frames are very huge with billions of records. > > *The join for these two DataFrames runs forever.* > This process runs on a yarn cluster with 300 executors with 4 executor > cores and 8GB memory. > > Any insights on this join will help. I have posted the explain plan below. > I notice a CartesianProduct in the Physical Plan. I am wondering if this > is causing the performance issue. > > > Below is the logical plan and the physical plan. ( Due to the confidential > nature, I am unable to post any of the column names or the file names here ) > > == Optimized Logical Plan == > Limit 21 > Join Inner, [ Join Conditions ] > Join Inner, [ Join Conditions ] > Project [ List of columns ] > Relation [ List of columns ] AvroRelation[ fileName1 ] -- Another > large file > InMemoryRelation [List of columsn ], true, 10000, StorageLevel(true, > true, false, true, 1), (Repartition 1, false), None > Project [ List of Columns ] > Relation[ List of Columns] AvroRelation[ filename2 ] -- This is a very > large file > > == Physical Plan == > Limit 21 > Filter (filter conditions) > CartesianProduct > Filter (more filter conditions) > CartesianProduct > Project (selecting a few columns and applying a UDF to one column) > Scan AvroRelation[avro file][ columns in Avro File ] > InMemoryColumnarTableScan [List of columns ], true, 10000, > StorageLevel(true, true, false, true, 1), (Repartition 1, false), None) > Project [ List of Columns ] > Scan AvroRelation[Avro File][List of Columns] > > Code Generation: true > > > Thanks, > Prasad. >