SPARK-13383 is fixed in 2.0 only, as of this moment. Any chance of backporting to branch-1.6 ?
Thanks On Wed, Mar 23, 2016 at 4:20 PM, Davies Liu <dav...@databricks.com> wrote: > On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang <java8...@hotmail.com> wrote: > > Here is the output: > > > > == Parsed Logical Plan == > > Project [400+ columns] > > +- Project [400+ columns] > > +- Project [400+ columns] > > +- Project [400+ columns] > > +- Join Inner, Some((((visid_high#460L = visid_high#948L) && > > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L))) > > :- Relation[400+ columns] ParquetRelation > > +- BroadcastHint > > +- Project [soid_e1#30 AS > > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127] > > +- Filter (instr(event_list#105,202) > 0) > > +- Relation[400+ columns] ParquetRelation > > > > == Analyzed Logical Plan == > > 400+ columns > > Project [400+ columns] > > +- Project [400+ columns] > > +- Project [400+ columns] > > +- Project [400+ columns] > > +- Join Inner, Some((((visid_high#460L = visid_high#948L) && > > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L))) > > :- Relation[400+ columns] ParquetRelation > > +- BroadcastHint > > +- Project [soid_e1#30 AS > > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127] > > +- Filter (instr(event_list#105,202) > 0) > > +- Relation[400+ columns] ParquetRelation > > > > == Optimized Logical Plan == > > Project [400+ columns] > > +- Join Inner, Some((((visid_high#460L = visid_high#948L) && > (visid_low#461L > > = visid_low#949L)) && (date_time#25L > date_time#513L))) > > :- Relation[400+ columns] ParquetRelation > > +- Project > [date_time#25L,visid_low#461L,visid_high#460L,account_id#976] > > +- BroadcastHint > > +- Project [soid_e1#30 AS > > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127] > > +- Filter (instr(event_list#105,202) > 0) > > +- Relation[400+ columns] ParquetRelation > > There is a Project on top of BroadcastHint, which is inserted by > column pruning rule, that make > the SparkStratege can not regonize BroadcastHint anymore, it's fixed > recently in master [1] > > https://github.com/apache/spark/pull/11260 > > Your join should run as expected in master. > > > == Physical Plan == > > Project [400+ columns] > > +- Filter (date_time#25L > date_time#513L) > > +- SortMergeJoin [visid_high#948L,visid_low#949L], > > [visid_high#460L,visid_low#461L] > > :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0 > > : +- TungstenExchange > > hashpartitioning(visid_high#948L,visid_low#949L,200), None > > : +- Scan ParquetRelation[400+ columns] InputPaths: > > hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19, > > hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22, > > hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25, > > hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28, > > hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31, > > hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03, > > hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06, > > hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09, > > hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12, > > hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15, > > hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18, > > hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21, > > hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24, > > hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27, > > hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30, > > hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02, > > hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05, > > hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08, > > hdfs://xxx/2016/02/09, hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11, > > hdfs://xxx/2016/02/12, hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14, > > hdfs://xxx/2016/02/15, hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17, > > hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20, > > hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23, > > hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26, > > hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29, > > hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03, > > hdfs://xxx/2016/03/04, hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06, > > hdfs://xxx/2016/03/07, hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09, > > hdfs://xxx/2016/03/10, hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12, > > hdfs://xxx/2016/03/13, hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15, > > hdfs://xxx/2016/03/16, hdfs://xxx/2016/03/17 > > +- Sort [visid_high#460L ASC,visid_low#461L ASC], false, 0 > > +- TungstenExchange > > hashpartitioning(visid_high#460L,visid_low#461L,200), None > > +- Project > > [date_time#25L,visid_low#461L,visid_high#460L,account_id#976] > > +- Project [soid_e1#30 AS > > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127] > > +- Filter (instr(event_list#105,202) > 0) > > +- Scan > > > ParquetRelation[visid_low#461L,ip#127,soid_e1#30,event_list#105,visid_high#460L,date_time#25L] > > InputPaths: hdfs://xxx/2016/03/17 > > > > This dataset has more than 480 columns in parquet file, so I replaced > them > > with "400+ columns", without blow out the email, but I don't think this > > could do anything with "broadcast" problem. > > > > Thanks > > > > Yong > > > > > >> Date: Wed, 23 Mar 2016 10:14:19 -0700 > >> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in > >> the last step > >> From: dav...@databricks.com > >> To: java8...@hotmail.com > >> CC: user@spark.apache.org > > > >> > >> The broadcast hint does not work as expected in this case, could you > >> also how the logical plan by 'explain(true)'? > >> > >> On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com> > wrote: > >> > > >> > So I am testing this code to understand "broadcast" feature of DF on > >> > Spark 1.6.1. > >> > This time I am not disable "tungsten". Everything is default value, > >> > except setting memory and cores of my job on 1.6.1. > >> > > >> > I am testing the join2 case > >> > > >> > val join2 = historyRaw.join(broadcast(trialRaw), > trialRaw("visid_high") > >> > === historyRaw("visid_high") && trialRaw("visid_low") === > >> > historyRaw("visid_low") && trialRaw("date_time") > > historyRaw("date_time")) > >> > > >> > and here is the DAG visualization in the runtime of my testing job: > >> > > >> > > >> > > >> > > >> > > >> > So now, I don't understand how the "broadcast" works on DateFrame in > >> > Spark. I originally thought it will be the same as "mapjoin" in the > hive, > >> > but can someone explain the DAG above me? > >> > > >> > I have one day data about 1.5G compressed parquet file, filter by > >> > "instr(loadRaw("event_list"), "202") > 0", which will only output > about 1494 > >> > rows (very small), and it is the "trailRaw" DF in my example. > >> > Stage 3 has a filter, which I thought is for the trailRaw data, but > the > >> > stage statics doesn't match with the data. I don't know why the input > is > >> > only 78M, and shuffle write is about 97.6KB > >> > > >> > > >> > > >> > > >> > The historyRaw will be about 90 days history data, which should be > about > >> > 100G, so it looks like stage 4 is scanning it > >> > > >> > > >> > > >> > > >> > Now, my original thought is that small data will be broadcasted to all > >> > the nodes, and most of history data will be filtered out by the join > keys, > >> > at least that will be the "mapjoin" in Hive will do, but from the DAG > above, > >> > I didn't see it working this way. > >> > It is more like that Spark use the SortMerge join to shuffle both data > >> > across network, and filter on the "reducers" side by the join keys, > to get > >> > the final output. But that is not the "broadcast" join supposed to do, > >> > correct? > >> > In the last stage, it will be very slow, until it reach and process > all > >> > the history data, shown below as "shuffle read" reaching 720G, to > finish. > >> > > >> > > >> > > >> > > >> > One thing I notice that if tungsten is enable, the shuffle write > volume > >> > on stage 4 is larger (720G) than when tungsten is disable (506G) in my > >> > originally run, for the exactly same input. It is an interesting > point, does > >> > anyone have some idea about this? > >> > > >> > > >> > Overall, for my test case, "broadcast" join is the exactly most > >> > optimized way I should use; but somehow, I cannot make it do the same > way as > >> > "mapjoin" of Hive, even in Spark 1.6.1. > >> > > >> > As I said, this is a just test case. We have some business cases > making > >> > sense to use "broadcast" join, but until I understand exactly how to > make it > >> > work as I expect in Spark, I don't know what to do. > >> > > >> > Yong > >> > > >> > ________________________________ > >> > From: java8...@hotmail.com > >> > To: user@spark.apache.org > >> > Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data > in > >> > the last step > >> > Date: Tue, 22 Mar 2016 13:08:31 -0400 > >> > > >> > > >> > Please help me understand how the "broadcast" will work on DF in Spark > >> > 1.5.2. > >> > > >> > Below are the 2 joins I tested and the physical plan I dumped: > >> > > >> > val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === > >> > historyRaw("visid_high") && trialRaw("visid_low") === > >> > historyRaw("visid_low") && trialRaw("date_time") > > historyRaw("date_time")) > >> > val join2 = historyRaw.join(broadcast(trialRaw), > trialRaw("visid_high") > >> > === historyRaw("visid_high") && trialRaw("visid_low") === > >> > historyRaw("visid_low") && trialRaw("date_time") > > historyRaw("date_time")) > >> > > >> > join1.explain(true) > >> > == Physical Plan == > >> > Filter (date_time#25L > date_time#513L) > >> > SortMergeJoin [visid_high#948L,visid_low#949L], > >> > [visid_high#460L,visid_low#461L] > >> > ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false > >> > Exchange hashpartitioning(visid_high#948L,visid_low#949L) > >> > Scan ParquetRelation[hdfs://xxxxxxxx] > >> > ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false > >> > Exchange hashpartitioning(visid_high#460L,visid_low#461L) > >> > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L] > >> > Filter (instr(event_list#105,202) > 0) > >> > Scan > >> > > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30] > >> > > >> > join2.explain(true) > >> > == Physical Plan == > >> > Filter (date_time#25L > date_time#513L) > >> > BroadcastHashJoin [visid_high#948L,visid_low#949L], > >> > [visid_high#460L,visid_low#461L], BuildRight > >> > Scan ParquetRelation[hdfs://xxxxxxxx] > >> > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L] > >> > Filter (instr(event_list#105,202) > 0) > >> > Scan > >> > > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30] > >> > > >> > Obvious, the explain plans are different, but the performance and the > >> > job execution steps are almost exactly same, as shown in the original > >> > picture in the email below. > >> > Keep in mind that I have to run with "--conf > >> > spark.sql.tungsten.enabled=false", otherwise, the execution plan will > do the > >> > tungsten sort. > >> > > >> > Now what confusing me is following: > >> > When using the broadcast join, the job still generates 3 stages, same > as > >> > SortMergeJoin, but I am not sure this makes sense. > >> > Ideally, in "Broadcast", the first stage scan the "trialRaw" data, > using > >> > the filter (instr(event_list#105,202) > 0), which BTW will filter out > 99% of > >> > data, then "broadcasting" remaining data to all the nodes. Then scan > >> > "historyRaw", while filtering by join with broadcasted data. In the > end, we > >> > can say there is one more stage to save the data in the default "200" > >> > partitions. So there should be ONLY 2 stages enough for this case. > Why there > >> > are still 3 stages in this case, just same as "SortMergeJoin", it > looks like > >> > "broadcast" not taking effect at all? But the physical plan clearly > shows > >> > that "Broadcast" hint? > >> > > >> > Thanks > >> > > >> > Yong > >> > > >> > > >> > ________________________________ > >> > From: java8...@hotmail.com > >> > To: user@spark.apache.org > >> > Subject: Spark 1.5.2, why the broadcast join shuffle so much data in > the > >> > last step > >> > Date: Fri, 18 Mar 2016 16:54:16 -0400 > >> > > >> > Hi, Sparkers: > >> > > >> > I have some questions related to generate the parquet output in Spark > >> > 1.5.2. > >> > > >> > I have 2 data sets to join, and I know one is much smaller than the > >> > other one, so I have the following test code: > >> > > >> > val loadRaw = sqlContext.read.parquet("one days of data in parquet > >> > format") > >> > val historyRaw = sqlContext.read.parquet("90 days of history data in > >> > parquet format") > >> > > >> > // the trailRaw will be very small, normally only thousands of row > from > >> > 20M of one day's data > >> > val trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") > > >> > 0).selectExpr("e1 as account_id", "visid_high", "visid_low", "ip") > >> > > >> > trialRaw.count > >> > res0: Long = 1494 > >> > > >> > // so the trailRaw data is small > >> > > >> > val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") > >> > === historyRaw("visid_high") && trialRaw("visid_low") === > >> > historyRaw("visid_low") && trialRaw("date_time") > > historyRaw("date_time")) > >> > > >> > val col_1 = trialRaw("visid_high") > >> > val col_2 = trialRaw("visid_low") > >> > val col_3 = trialRaw("date_time") > >> > val col_4 = trialRaw("ip") > >> > > >> > // drop the duplicate columns after join > >> > val output = join.drop(col1).drop(col2).drop(col3).drop(col4) > >> > output.write.parquet("hdfs location") > >> > > >> > First problem, I think I am facing Spark-10309 > >> > > >> > Caused by: java.io.IOException: Unable to acquire 67108864 bytes of > >> > memory > >> > at > >> > > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) > >> > at > >> > > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138) > >> > at > >> > > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) > >> > > >> > > >> > so I have to disable tungsten (spark.sql.tungsten.enabled=false), > >> > > >> > Now the problem is the Spark finishes this job very slow, even worse > >> > than same logic done in Hive. > >> > The explain shows the broadcast join is used: > >> > join.explain(true) > >> > > >> > ..... > >> > == Physical Plan == > >> > Filter (date_time#25L > date_time#519L) > >> > BroadcastHashJoin [visid_high#954L,visid_low#955L], > >> > [visid_high#460L,visid_low#461L], BuildRight > >> > ConvertToUnsafe > >> > Scan ParquetRelation[hdfs://xxxxxx][400+ columns shown up here] > >> > ConvertToUnsafe > >> > Project [soid_e1#30 AS > >> > account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127] > >> > Filter (instr(event_list#105,202) > 0) > >> > Scan > >> > > ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30] > >> > Code Generation: true > >> > > >> > I don't understand the statistics shown in the GUI below: > >> > > >> > > >> > > >> > It looks like the last task will shuffle read all 506.6G data, but > this > >> > DOESN'T make any sense. The final output of 200 files shown below: > >> > > >> > hadoop fs -ls hdfs://finalPath | sort -u -k5n > >> > Found 203 items > >> > -rw-r--r-- 3 biginetl biginetl 44237 2016-03-18 16:47 > >> > finalPath/_common_metadata > >> > -rw-r--r-- 3 biginetl biginetl 105534 2016-03-18 15:45 > >> > > finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > >> > -rw-r--r-- 3 biginetl biginetl 107098 2016-03-18 16:24 > >> > > finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > >> > ............. > >> > -rw-r--r-- 3 biginetl biginetl 1031400 2016-03-18 16:35 > >> > > finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > >> > -rw-r--r-- 3 biginetl biginetl 1173678 2016-03-18 16:21 > >> > > finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > >> > -rw-r--r-- 3 biginetl biginetl 12257423 2016-03-18 16:47 > >> > finalPath/_metadata > >> > > >> > As we can see, the largest file is only 1.1M, so the total output is > >> > just about 150M for all 200 files. > >> > I really don't understand why stage 5 is so slow, and why the shuffle > >> > read is so BIG. > >> > Understanding the "broadcast" join in Spark 1.5 is very important for > >> > our use case, Please tell me what could the reasons behind this. > >> > > >> > Thanks > >> > > >> > Yong > >> > > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >