Can you provide the analyzed and optimized plans (explain(true)) On Thu, Jan 28, 2016 at 12:26 PM, Srikanth <srikanth...@gmail.com> wrote:
> Hello, > > I have a use case where one large table has to be joined with several > smaller tables. > I've added broadcast hint for all small tables in the joins. > > val largeTableDF = sqlContext.read.format("com.databricks.spark.csv") > > val metaActionDF = sqlContext.read.format("json") > val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv") > val metaLocationDF = > sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile) > .join(broadcast(metaActionDF), > "campaign_id") > .join(broadcast(cidOrgDF), > List("organization_id"), "left_outer") > > val metaCreativeDF = sqlContext.read.format("json") > val metaExchangeDF = sqlContext.read.format("json") > val localizationDF = sqlContext.read.format("com.databricks.spark.csv") > val techKeyDF = sqlContext.read.format("com.databricks.spark.csv") > > val joinedBidderDF = largeTableDF.as("BID") > .join(broadcast(metaLocationDF), > "strategy_id") > .join(broadcast(metaCreativeDF), "creative_id") > .join(broadcast(metaExchangeDF), > $"exchange_id" === $"id" , "left_outer") > .join(broadcast(techKeyDF).as("TK"), > $"BID.tech_id" === $"TK.tech_key" , "left_outer") > .join(broadcast(localizationDF).as("BL"), > $"BID.language" === $"BL.id" , "left_outer") > > When I look at the execution plan, all the joins are marked as > broadcastjoin. > But when I look at the spark job UI, the DAG visualization shows that some > joins are sortmerged with shuffle involved. > The ones that I've highlighted in yellow were shuffled. > DAG can be viewed here - > https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0 > > Why is the actual execution as seen in the DAG different from the physical > plan pasted below. > I'm trying not to shuffle my largeTable. Any idea what is causing this? > > == Physical Plan == > > BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None > > :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None > > : :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L], > LeftOuter, None > > : : :- Project [...] > > : : : +- BroadcastHashJoin [cast(creative_id#9 as bigint)], > [creative_id#131L], BuildRight > > : : : :- Project [...] > > : : : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], > [strategy_id#127L], BuildRight > > : : : : :- ConvertToUnsafe > > : : : : : +- Scan > CsvRelation(<function0>,Some(file:///shared/data/bidder/*.lzo),false, > > : : : : +- Project [...] > > : : : : +- BroadcastHashOuterJoin [organization_id#90L], > [cast(organization_id#102 as bigint)], LeftOuter, None > > : : : : :- Project [...] > > : : : : : +- BroadcastHashJoin [campaign_id#105L], > [campaign_id#75L], BuildRight > > : : : : : :- Project [...] > > : : : : : : +- Scan > JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths: > file:/shared/data/t1_meta/t1_meta_strategy.jsonl > > : : : : : +- Scan JSONRelation[] InputPaths: > file:/shared/data/t1_meta/t1_meta_campaign.jsonl > > : : : : +- ConvertToUnsafe > > : : : : +- Scan > CsvRelation(<function0>,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,, > > : : : +- Scan > JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] > InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl > > : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: > file:/shared/data/t1_meta/t1_meta_exchange.jsonl > > : +- ConvertToUnsafe > > : +- Scan > CsvRelation(<function0>,Some(file:///shared/data/t1_meta/technology_key.txt),false, > > > +- ConvertToUnsafe > > +- Scan > CsvRelation(<function0>,Some(file:///shared/data/t1_meta/browser_languages.osv),false > > > > Srikanth >