Here is the output:
== Parsed Logical Plan ==Project [400+ columns]+- Project [400+ columns]   +- 
Project [400+ columns]      +- Project [400+ columns]         +- Join Inner, 
Some((((visid_high#460L = visid_high#948L) && (visid_low#461L = 
visid_low#949L)) && (date_time#25L > date_time#513L)))            :- 
Relation[400+ columns] ParquetRelation            +- BroadcastHint              
 +- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]             
     +- Filter (instr(event_list#105,202) > 0)                     +- 
Relation[400+ columns] ParquetRelation
== Analyzed Logical Plan ==400+ columnsProject [400+ columns]+- Project [400+ 
columns]   +- Project [400+ columns]      +- Project [400+ columns]         +- 
Join Inner, Some((((visid_high#460L = visid_high#948L) && (visid_low#461L = 
visid_low#949L)) && (date_time#25L > date_time#513L)))            :- 
Relation[400+ columns] ParquetRelation            +- BroadcastHint              
 +- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]             
     +- Filter (instr(event_list#105,202) > 0)                     +- 
Relation[400+ columns] ParquetRelation
== Optimized Logical Plan ==Project [400+ columns]+- Join Inner, 
Some((((visid_high#460L = visid_high#948L) && (visid_low#461L = 
visid_low#949L)) && (date_time#25L > date_time#513L)))   :- Relation[400+ 
columns] ParquetRelation   +- Project 
[date_time#25L,visid_low#461L,visid_high#460L,account_id#976]      +- 
BroadcastHint         +- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]            
+- Filter (instr(event_list#105,202) > 0)               +- Relation[400+ 
columns] ParquetRelation
== Physical Plan ==Project [400+ columns]+- Filter (date_time#25L > 
date_time#513L)   +- SortMergeJoin [visid_high#948L,visid_low#949L], 
[visid_high#460L,visid_low#461L]      :- Sort [visid_high#948L 
ASC,visid_low#949L ASC], false, 0      :  +- TungstenExchange 
hashpartitioning(visid_high#948L,visid_low#949L,200), None      :     +- Scan 
ParquetRelation[400+ columns] InputPaths: hdfs://xxx/2015/12/17, 
hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19, hdfs://xxx/2015/12/20, 
hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22, hdfs://xxx/2015/12/23, 
hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25, hdfs://xxx/2015/12/26, 
hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28, hdfs://xxx/2015/12/29, 
hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31, hdfs://xxx/2016/01/01, 
hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03, hdfs://xxx/2016/01/04, 
hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06, hdfs://xxx/2016/01/07, 
hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09, hdfs://xxx/2016/01/10, 
hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12, hdfs://xxx/2016/01/13, 
hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15, hdfs://xxx/2016/01/16, 
hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18, hdfs://xxx/2016/01/19, 
hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21, hdfs://xxx/2016/01/22, 
hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24, hdfs://xxx/2016/01/25, 
hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27, hdfs://xxx/2016/01/28, 
hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30, hdfs://xxx/2016/01/31, 
hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02, hdfs://xxx/2016/02/03, 
hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05, hdfs://xxx/2016/02/06, 
hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08, hdfs://xxx/2016/02/09, 
hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11, hdfs://xxx/2016/02/12, 
hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14, hdfs://xxx/2016/02/15, 
hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17, hdfs://xxx/2016/02/18, 
hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20, hdfs://xxx/2016/02/21, 
hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23, hdfs://xxx/2016/02/24, 
hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26, hdfs://xxx/2016/02/27, 
hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29, hdfs://xxx/2016/03/01, 
hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03, hdfs://xxx/2016/03/04, 
hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06, hdfs://xxx/2016/03/07, 
hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09, hdfs://xxx/2016/03/10, 
hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12, hdfs://xxx/2016/03/13, 
hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15, hdfs://xxx/2016/03/16, 
hdfs://xxx/2016/03/17      +- Sort [visid_high#460L ASC,visid_low#461L ASC], 
false, 0         +- TungstenExchange 
hashpartitioning(visid_high#460L,visid_low#461L,200), None            +- 
Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]           
    +- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]             
     +- Filter (instr(event_list#105,202) > 0)                     +- Scan 
ParquetRelation[visid_low#461L,ip#127,soid_e1#30,event_list#105,visid_high#460L,date_time#25L]
 InputPaths: hdfs://xxx/2016/03/17
This dataset has more than 480 columns in parquet file, so I replaced them with 
"400+ columns", without blow out the email, but I don't think this could do 
anything with "broadcast" problem.
Thanks
Yong

> Date: Wed, 23 Mar 2016 10:14:19 -0700
> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> From: dav...@databricks.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
> 
> The broadcast hint does not work as expected in this case, could you
> also how the logical plan by 'explain(true)'?
> 
> On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com> wrote:
> >
> > So I am testing this code to understand "broadcast" feature of DF on Spark 
> > 1.6.1.
> > This time I am not disable "tungsten". Everything is default value, except 
> > setting memory and cores of my job on 1.6.1.
> >
> > I am testing the join2 case
> >
> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> > historyRaw("visid_high") &&  trialRaw("visid_low") === 
> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> >
> > and here is the DAG visualization in the runtime of my testing job:
> >
> >
> >
> >
> >
> > So now, I don't understand how the "broadcast" works on DateFrame in Spark. 
> > I originally thought it will be the same as "mapjoin" in the hive, but can 
> > someone explain the DAG above me?
> >
> > I have one day data about 1.5G compressed parquet file, filter by 
> > "instr(loadRaw("event_list"), "202") > 0", which will only output about 
> > 1494 rows (very small), and it is the "trailRaw" DF in my example.
> > Stage 3 has a filter, which I thought is for the trailRaw data, but the 
> > stage statics doesn't match with the data. I don't know why the input is 
> > only 78M, and shuffle write is about 97.6KB
> >
> >
> >
> >
> > The historyRaw will be about 90 days history data, which should be about 
> > 100G, so it looks like stage 4 is scanning it
> >
> >
> >
> >
> > Now, my original thought is that small data will be broadcasted to all the 
> > nodes, and most of history data will be filtered out by the join keys, at 
> > least that will be the "mapjoin" in Hive will do, but from the DAG above, I 
> > didn't see it working this way.
> > It is more like that Spark use the SortMerge join to shuffle both data 
> > across network, and filter on the "reducers" side by the join keys, to get 
> > the final output. But that is not the "broadcast" join supposed to do, 
> > correct?
> > In the last stage, it will be very slow, until it reach and process all the 
> > history data,  shown below as "shuffle read" reaching 720G, to finish.
> >
> >
> >
> >
> > One thing I notice that if tungsten is enable, the shuffle write volume on 
> > stage 4 is larger (720G) than when tungsten is disable (506G) in my 
> > originally run, for the exactly same input. It is an interesting point, 
> > does anyone have some idea about this?
> >
> >
> > Overall, for my test case, "broadcast" join is the exactly most optimized 
> > way I should use; but somehow, I cannot make it do the same way as 
> > "mapjoin" of Hive, even in Spark 1.6.1.
> >
> > As I said, this is a just test case. We have some business cases making 
> > sense to use "broadcast" join, but until I understand exactly how to make 
> > it work as I expect in Spark, I don't know what to do.
> >
> > Yong
> >
> > ________________________________
> > From: java8...@hotmail.com
> > To: user@spark.apache.org
> > Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in 
> > the last step
> > Date: Tue, 22 Mar 2016 13:08:31 -0400
> >
> >
> > Please help me understand how the "broadcast" will work on DF in Spark 
> > 1.5.2.
> >
> > Below are the 2 joins I tested and the physical plan I dumped:
> >
> > val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === 
> > historyRaw("visid_high") &&  trialRaw("visid_low") === 
> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> > historyRaw("visid_high") &&  trialRaw("visid_low") === 
> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> >
> > join1.explain(true)
> > == Physical Plan ==
> > Filter (date_time#25L > date_time#513L)
> >  SortMergeJoin [visid_high#948L,visid_low#949L], 
> > [visid_high#460L,visid_low#461L]
> >   ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
> >    Exchange hashpartitioning(visid_high#948L,visid_low#949L)
> >     Scan ParquetRelation[hdfs://xxxxxxxx]
> >   ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
> >    Exchange hashpartitioning(visid_high#460L,visid_low#461L)
> >     Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
> >      Filter (instr(event_list#105,202) > 0)
> >       Scan 
> > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> >
> > join2.explain(true)
> > == Physical Plan ==
> > Filter (date_time#25L > date_time#513L)
> >  BroadcastHashJoin [visid_high#948L,visid_low#949L], 
> > [visid_high#460L,visid_low#461L], BuildRight
> >     Scan ParquetRelation[hdfs://xxxxxxxx]
> > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
> >    Filter (instr(event_list#105,202) > 0)
> >     Scan 
> > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> >
> > Obvious, the explain plans are different, but the performance and the job 
> > execution steps are almost exactly same, as shown in the original picture 
> > in the email below.
> > Keep in mind that I have to run with "--conf 
> > spark.sql.tungsten.enabled=false", otherwise, the execution plan will do 
> > the tungsten sort.
> >
> > Now what confusing me is following:
> > When using the broadcast join, the job still generates 3 stages, same as 
> > SortMergeJoin, but I am not sure this makes sense.
> > Ideally, in "Broadcast", the first stage scan the "trialRaw" data, using 
> > the filter (instr(event_list#105,202) > 0), which BTW will filter out 99% 
> > of data, then "broadcasting" remaining data to all the nodes. Then scan 
> > "historyRaw", while filtering by join with broadcasted data. In the end, we 
> > can say there is one more stage to save the data in the default "200" 
> > partitions. So there should be ONLY 2 stages enough for this case. Why 
> > there are still 3 stages in this case, just same as "SortMergeJoin", it 
> > looks like "broadcast" not taking effect at all? But the physical plan 
> > clearly shows that "Broadcast" hint?
> >
> > Thanks
> >
> > Yong
> >
> >
> > ________________________________
> > From: java8...@hotmail.com
> > To: user@spark.apache.org
> > Subject: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> > last step
> > Date: Fri, 18 Mar 2016 16:54:16 -0400
> >
> > Hi, Sparkers:
> >
> > I have some questions related to generate the parquet output in Spark 1.5.2.
> >
> > I have 2 data sets to join, and I know one is much smaller than the other 
> > one, so I have the following test code:
> >
> > val loadRaw = sqlContext.read.parquet("one days of data in parquet format")
> > val historyRaw = sqlContext.read.parquet("90 days of history data in 
> > parquet format")
> >
> > // the trailRaw will be very small, normally only thousands of row from 20M 
> > of one day's data
> > val trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") > 
> > 0).selectExpr("e1 as account_id", "visid_high", "visid_low", "ip")
> >
> > trialRaw.count
> > res0: Long = 1494
> >
> > // so the trailRaw data is small
> >
> > val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> > historyRaw("visid_high") &&  trialRaw("visid_low") === 
> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> >
> > val col_1 = trialRaw("visid_high")
> > val col_2 = trialRaw("visid_low")
> > val col_3 = trialRaw("date_time")
> > val col_4 = trialRaw("ip")
> >
> > // drop the duplicate columns after join
> > val output = join.drop(col1).drop(col2).drop(col3).drop(col4)
> > output.write.parquet("hdfs location")
> >
> > First problem, I think I am facing Spark-10309
> >
> > Caused by: java.io.IOException: Unable to acquire 67108864 bytes of memory
> > at 
> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
> > at 
> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
> > at 
> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
> >
> >
> > so I have to disable tungsten (spark.sql.tungsten.enabled=false),
> >
> > Now the problem is the Spark finishes this job very slow, even worse than 
> > same logic done in  Hive.
> > The explain shows the broadcast join is used:
> > join.explain(true)
> >
> > .....
> > == Physical Plan ==
> > Filter (date_time#25L > date_time#519L)
> >  BroadcastHashJoin [visid_high#954L,visid_low#955L], 
> > [visid_high#460L,visid_low#461L], BuildRight
> >   ConvertToUnsafe
> >    Scan ParquetRelation[hdfs://xxxxxx][400+ columns shown up here]
> >   ConvertToUnsafe
> >    Project [soid_e1#30 AS 
> > account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >     Filter (instr(event_list#105,202) > 0)
> >      Scan 
> > ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> > Code Generation: true
> >
> >  I don't understand the statistics shown in the GUI below:
> >
> >
> >
> > It looks like the last task will shuffle read all 506.6G data, but this 
> > DOESN'T make any sense. The final output of 200 files shown below:
> >
> > hadoop fs -ls hdfs://finalPath | sort -u -k5n
> > Found 203 items
> > -rw-r--r--   3 biginetl biginetl      44237 2016-03-18 16:47 
> > finalPath/_common_metadata
> > -rw-r--r--   3 biginetl biginetl     105534 2016-03-18 15:45 
> > finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> > -rw-r--r--   3 biginetl biginetl     107098 2016-03-18 16:24 
> > finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> > .............
> > -rw-r--r--   3 biginetl biginetl    1031400 2016-03-18 16:35 
> > finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> > -rw-r--r--   3 biginetl biginetl    1173678 2016-03-18 16:21 
> > finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> > -rw-r--r--   3 biginetl biginetl   12257423 2016-03-18 16:47 
> > finalPath/_metadata
> >
> > As we can see, the largest file is only 1.1M, so the total output is just 
> > about 150M for all 200 files.
> > I really don't understand why stage 5 is so slow, and why the shuffle read 
> > is so BIG.
> > Understanding the "broadcast" join in Spark 1.5 is very important for our 
> > use case, Please tell me what could the reasons behind this.
> >
> > Thanks
> >
> > Yong
> >
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
                                          

Reply via email to