This is fixed in Spark 1.6.

On Fri, Dec 18, 2015 at 3:06 PM, Prasad Ravilla <pras...@slalom.com> wrote:

> Changing equality check from “<=>”to “===“ solved the problem.
> Performance skyrocketed.
>
> I am wondering why “<=>” cause a performance degrade?
>
> val dates = new RetailDates()
> val dataStructures = new DataStructures()
>
> // Reading CSV Format input files -- retailDates
> // This DF has 75 records
> val retailDatesWithSchema = sqlContext.read
>   .format("com.databricks.spark.csv")
>   .option("delimiter", ",")
>   .schema(dates.retailDatesSchema)
>   .load(datesFile)
>   .coalesce(1)
>   .cache()
>
> // Create UDF to convert String to Date
> val dateUDF: (String => java.sql.Date) = (dateString: String) => new 
> java.sql.Date(customerDateFormat.parse(dateString).getTime())
> val stringToDateUDF = udf(dateUDF)
>
> // Reading Avro Format Input Files
> // This DF has 500 million records
> val userInputDf = sqlContext.read.avro(“customerLocation")
> val userDf = userInputDf.withColumn("CAL_DT", 
> stringToDateUDF(col("CAL_DT"))).select(
>                       "CAL_DT","USER_ID","USER_CNTRY_ID"
>                     )
>
> val userDimDf = 
> sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID")
>  // This DF has 800 million records
>
> val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema)
> val userDimDfBroadcast = sc.broadcast(userDimDf)
>
> val userAndRetailDates = userDnaSdDf
>   .join((retailDatesWithSchemaBroadcast.value).as("retailDates"),
>   userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", 
> $"retailDates.WEEK_END_DATE")
>   , "inner")
>
>
>
> val userAndRetailDatesAndUserDim = userAndRetailDates
>   .join((userDimDfBroadcast.value)
>     .withColumnRenamed("USER_ID", "USER_DIM_USER_ID")
>     .withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID")
>     .as("userdim")
>     , userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID"
>       && userAndRetailDates("USER_CNTRY_ID") <=> 
> $"userdim.USER_DIM_COUNTRY_ID"
>     , "inner")
>
> userAndRetailDatesAndUserDim.show()
>
>
> From: Prasad Ravilla
> Date: Friday, December 18, 2015 at 7:38 AM
> To: user
> Subject: Joining DataFrames - Causing Cartesian Product
>
> Hi,
>
> I am running into performance issue when joining data frames created from
> avro files using spark-avro library.
>
> The data frames are created from 120K avro files and the total size is
> around 1.5 TB.
> The two data frames are very huge with billions of records.
>
> *The join for these two DataFrames runs forever.*
> This process runs on a yarn cluster with 300 executors with 4 executor
> cores and 8GB  memory.
>
> Any insights on this join will help. I have posted the explain plan below.
> I notice a CartesianProduct in the Physical Plan. I am wondering if this
> is causing the performance issue.
>
>
> Below is the logical plan and the physical plan. ( Due to the confidential
> nature, I am unable to post any of the column names or the file names here )
>
> == Optimized Logical Plan ==
> Limit 21
>  Join Inner, [ Join Conditions ]
>   Join Inner, [ Join Conditions ]
>    Project [ List of columns ]
>     Relation [ List of columns ] AvroRelation[ fileName1 ] -- Another
> large file
>    InMemoryRelation  [List of columsn ], true, 10000, StorageLevel(true,
> true, false, true, 1), (Repartition 1, false), None
>   Project [ List of Columns ]
>    Relation[ List of Columns] AvroRelation[ filename2 ] -- This is a very
> large file
>
> == Physical Plan ==
> Limit 21
>  Filter (filter conditions)
>   CartesianProduct
>    Filter (more filter conditions)
>     CartesianProduct
>      Project (selecting a few columns and applying a UDF to one column)
>       Scan AvroRelation[avro file][ columns in Avro File ]
>      InMemoryColumnarTableScan [List of columns ], true, 10000,
> StorageLevel(true, true, false, true, 1), (Repartition 1, false), None)
>    Project [ List of Columns ]
>     Scan AvroRelation[Avro File][List of Columns]
>
> Code Generation: true
>
>
> Thanks,
> Prasad.
>

Reply via email to