Re: Joining DataFrames - Causing Cartesian Product
This is fixed in Spark 1.6. On Fri, Dec 18, 2015 at 3:06 PM, Prasad Ravilla wrote: > Changing equality check from “<=>”to “===“ solved the problem. > Performance skyrocketed. > > I am wondering why “<=>” cause a performance degrade? > > val dates = new RetailDates() > val dataStructures = new DataStructures() > > // Reading CSV Format input files -- retailDates > // This DF has 75 records > val retailDatesWithSchema = sqlContext.read > .format("com.databricks.spark.csv") > .option("delimiter", ",") > .schema(dates.retailDatesSchema) > .load(datesFile) > .coalesce(1) > .cache() > > // Create UDF to convert String to Date > val dateUDF: (String => java.sql.Date) = (dateString: String) => new > java.sql.Date(customerDateFormat.parse(dateString).getTime()) > val stringToDateUDF = udf(dateUDF) > > // Reading Avro Format Input Files > // This DF has 500 million records > val userInputDf = sqlContext.read.avro(“customerLocation") > val userDf = userInputDf.withColumn("CAL_DT", > stringToDateUDF(col("CAL_DT"))).select( > "CAL_DT","USER_ID","USER_CNTRY_ID" > ) > > val userDimDf = > sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID") > // This DF has 800 million records > > val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema) > val userDimDfBroadcast = sc.broadcast(userDimDf) > > val userAndRetailDates = userDnaSdDf > .join((retailDatesWithSchemaBroadcast.value).as("retailDates"), > userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", > $"retailDates.WEEK_END_DATE") > , "inner") > > > > val userAndRetailDatesAndUserDim = userAndRetailDates > .join((userDimDfBroadcast.value) > .withColumnRenamed("USER_ID", "USER_DIM_USER_ID") > .withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID") > .as("userdim") > , userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID" > && userAndRetailDates("USER_CNTRY_ID") <=> > $"userdim.USER_DIM_COUNTRY_ID" > , "inner") > > userAndRetailDatesAndUserDim.show() > > > From: Prasad Ravilla > Date: Friday, December 18, 2015 at 7:38 AM > To: user > Subject: Joining DataFrames - Causing Cartesian Product > > Hi, > > I am running into performance issue when joining data frames created from > avro files using spark-avro library. > > The data frames are created from 120K avro files and the total size is > around 1.5 TB. > The two data frames are very huge with billions of records. > > *The join for these two DataFrames runs forever.* > This process runs on a yarn cluster with 300 executors with 4 executor > cores and 8GB memory. > > Any insights on this join will help. I have posted the explain plan below. > I notice a CartesianProduct in the Physical Plan. I am wondering if this > is causing the performance issue. > > > Below is the logical plan and the physical plan. ( Due to the confidential > nature, I am unable to post any of the column names or the file names here ) > > == Optimized Logical Plan == > Limit 21 > Join Inner, [ Join Conditions ] > Join Inner, [ Join Conditions ] >Project [ List of columns ] > Relation [ List of columns ] AvroRelation[ fileName1 ] -- Another > large file >InMemoryRelation [List of columsn ], true, 1, StorageLevel(true, > true, false, true, 1), (Repartition 1, false), None > Project [ List of Columns ] >Relation[ List of Columns] AvroRelation[ filename2 ] -- This is a very > large file > > == Physical Plan == > Limit 21 > Filter (filter conditions) > CartesianProduct >Filter (more filter conditions) > CartesianProduct > Project (selecting a few columns and applying a UDF to one column) > Scan AvroRelation[avro file][ columns in Avro File ] > InMemoryColumnarTableScan [List of columns ], true, 1, > StorageLevel(true, true, false, true, 1), (Repartition 1, false), None) >Project [ List of Columns ] > Scan AvroRelation[Avro File][List of Columns] > > Code Generation: true > > > Thanks, > Prasad. >
Re: Joining DataFrames - Causing Cartesian Product
Changing equality check from “<=>”to “===“ solved the problem. Performance skyrocketed. I am wondering why “<=>” cause a performance degrade? val dates = new RetailDates() val dataStructures = new DataStructures() // Reading CSV Format input files -- retailDates // This DF has 75 records val retailDatesWithSchema = sqlContext.read .format("com.databricks.spark.csv") .option("delimiter", ",") .schema(dates.retailDatesSchema) .load(datesFile) .coalesce(1) .cache() // Create UDF to convert String to Date val dateUDF: (String => java.sql.Date) = (dateString: String) => new java.sql.Date(customerDateFormat.parse(dateString).getTime()) val stringToDateUDF = udf(dateUDF) // Reading Avro Format Input Files // This DF has 500 million records val userInputDf = sqlContext.read.avro(“customerLocation") val userDf = userInputDf.withColumn("CAL_DT", stringToDateUDF(col("CAL_DT"))).select( "CAL_DT","USER_ID","USER_CNTRY_ID" ) val userDimDf = sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID") // This DF has 800 million records val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema) val userDimDfBroadcast = sc.broadcast(userDimDf) val userAndRetailDates = userDnaSdDf .join((retailDatesWithSchemaBroadcast.value).as("retailDates"), userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", $"retailDates.WEEK_END_DATE") , "inner") val userAndRetailDatesAndUserDim = userAndRetailDates .join((userDimDfBroadcast.value) .withColumnRenamed("USER_ID", "USER_DIM_USER_ID") .withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID") .as("userdim") , userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID" && userAndRetailDates("USER_CNTRY_ID") <=> $"userdim.USER_DIM_COUNTRY_ID" , "inner") userAndRetailDatesAndUserDim.show() From: Prasad Ravilla Date: Friday, December 18, 2015 at 7:38 AM To: user Subject: Joining DataFrames - Causing Cartesian Product Hi, I am running into performance issue when joining data frames created from avro files using spark-avro library. The data frames are created from 120K avro files and the total size is around 1.5 TB. The two data frames are very huge with billions of records. The join for these two DataFrames runs forever. This process runs on a yarn cluster with 300 executors with 4 executor cores and 8GB memory. Any insights on this join will help. I have posted the explain plan below. I notice a CartesianProduct in the Physical Plan. I am wondering if this is causing the performance issue. Below is the logical plan and the physical plan. ( Due to the confidential nature, I am unable to post any of the column names or the file names here ) == Optimized Logical Plan == Limit 21 Join Inner, [ Join Conditions ] Join Inner, [ Join Conditions ] Project [ List of columns ] Relation [ List of columns ] AvroRelation[ fileName1 ] -- Another large file InMemoryRelation [List of columsn ], true, 1, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None Project [ List of Columns ] Relation[ List of Columns] AvroRelation[ filename2 ] -- This is a very large file == Physical Plan == Limit 21 Filter (filter conditions) CartesianProduct Filter (more filter conditions) CartesianProduct Project (selecting a few columns and applying a UDF to one column) Scan AvroRelation[avro file][ columns in Avro File ] InMemoryColumnarTableScan [List of columns ], true, 1, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None) Project [ List of Columns ] Scan AvroRelation[Avro File][List of Columns] Code Generation: true Thanks, Prasad.
Re: Joining DataFrames - Causing Cartesian Product
Can you try the lastest 1.6.0 RC which includes SPARK-1 ? Cheers On Fri, Dec 18, 2015 at 7:38 AM, Prasad Ravilla wrote: > Hi, > > I am running into performance issue when joining data frames created from > avro files using spark-avro library. > > The data frames are created from 120K avro files and the total size is > around 1.5 TB. > The two data frames are very huge with billions of records. > > *The join for these two DataFrames runs forever.* > This process runs on a yarn cluster with 300 executors with 4 executor > cores and 8GB memory. > > Any insights on this join will help. I have posted the explain plan below. > I notice a CartesianProduct in the Physical Plan. I am wondering if this > is causing the performance issue. > > > Below is the logical plan and the physical plan. ( Due to the confidential > nature, I am unable to post any of the column names or the file names here ) > > == Optimized Logical Plan == > Limit 21 > Join Inner, [ Join Conditions ] > Join Inner, [ Join Conditions ] >Project [ List of columns ] > Relation [ List of columns ] AvroRelation[ fileName1 ] -- Another > large file >InMemoryRelation [List of columsn ], true, 1, StorageLevel(true, > true, false, true, 1), (Repartition 1, false), None > Project [ List of Columns ] >Relation[ List of Columns] AvroRelation[ filename2 ] -- This is a very > large file > > == Physical Plan == > Limit 21 > Filter (filter conditions) > CartesianProduct >Filter (more filter conditions) > CartesianProduct > Project (selecting a few columns and applying a UDF to one column) > Scan AvroRelation[avro file][ columns in Avro File ] > InMemoryColumnarTableScan [List of columns ], true, 1, > StorageLevel(true, true, false, true, 1), (Repartition 1, false), None) >Project [ List of Columns ] > Scan AvroRelation[Avro File][List of Columns] > > Code Generation: true > > > Thanks, > Prasad. >