Re: Dataframe broadcast join hint not working

2016-11-28 Thread Yong Zhang
If your query plan has "Project" in it, there is a bug in Spark preventing 
"broadcast" hint working in pre-2.0 release.


https://issues.apache.org/jira/browse/SPARK-13383


Unfortunately, there is no port fix in 1.x.


Yong



From: Anton Okolnychyi <anton.okolnyc...@gmail.com>
Sent: Saturday, November 26, 2016 4:05 PM
To: Swapnil Shinde
Cc: Benyi Wang; user@spark.apache.org
Subject: Re: Dataframe broadcast join hint not working

Hi guys,

I also experienced a situation when Spark 1.6.2 ignored my hint to do a 
broadcast join (i.e. broadcast(df)) with a small dataset. However, this 
happened only in 1 of 3 cases. Setting the 
"spark.sql.autoBroadcastJoinThreshold" property did not have any impact as 
well. All 3 cases work fine in Spark 2.0.

Is there any chance that Spark can neglect manually specified broadcast 
operation? In other words, is Spark forced to perform a broadcast if one 
specifies "df1.join(broadcast(df2), ...)"?

Best regards,
Anton



2016-11-26 21:04 GMT+01:00 Swapnil Shinde 
<swapnilushi...@gmail.com<mailto:swapnilushi...@gmail.com>>:
I am using Spark 1.6.3 and below is the real plan (a,b,c in above were just for 
illustration purpose)

== Physical Plan ==
Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN 
mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
+- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801], 
[mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
   :- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
   :  +- TungstenExchange 
hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), None
   : +- Project [_1#3797 AS ltt#3800,_2#3798 AS mr_demo_id#3801,_3#3799 AS 
mr_demoname#3802]
   :+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
   +- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
  +- TungstenExchange 
hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), None
 +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
+- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804 AS 
mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806 AS 
etv_demo_id#3813]
   +- Filter ((map_type#3809 = master_roster_to_etv) && NOT 
(demogroup#3803 = gender_age_id))
  +- Scan 
ExistingRDD[demogroup#3803,demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,demovalue_old_map#3808,map_type#3809]


Thanks
Swapnil

On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang 
<bewang.t...@gmail.com<mailto:bewang.t...@gmail.com>> wrote:
Could you post the result of explain `c.explain`? If it is broadcast join, you 
will see it in explain.

On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde 
<swapnilushi...@gmail.com<mailto:swapnilushi...@gmail.com>> wrote:
Hello
I am trying a broadcast join on dataframes but it is still doing 
SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold higher 
but still no luck.

Related piece of code-
  val c = a.join(braodcast(b), "id")

On a side note, if I do SizeEstimator.estimate(b) and it is really 
high(460956584 bytes) compared to data it contains. b has just 85 rows and 
around 4964 bytes.
Help is very much appreciated!!

Thanks
Swapnil







Re: Dataframe broadcast join hint not working

2016-11-26 Thread Anton Okolnychyi
Hi guys,

I also experienced a situation when Spark 1.6.2 ignored my hint to do a
broadcast join (i.e. broadcast(df)) with a small dataset. However, this
happened only in 1 of 3 cases. Setting the
"spark.sql.autoBroadcastJoinThreshold" property did not have any impact as
well. All 3 cases work fine in Spark 2.0.

Is there any chance that Spark can neglect manually specified broadcast
operation? In other words, is Spark forced to perform a broadcast if one
specifies "df1.join(broadcast(df2), ...)"?

Best regards,
Anton



2016-11-26 21:04 GMT+01:00 Swapnil Shinde :

> I am using Spark 1.6.3 and below is the real plan (a,b,c in above were
> just for illustration purpose)
>
> == Physical Plan ==
> Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
> mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
> +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
> [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
>:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
>:  +- TungstenExchange 
> hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200),
> None
>: +- Project [_1#3797 AS ltt#3800,_2#3798 AS
> mr_demo_id#3801,_3#3799 AS mr_demoname#3802]
>:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
>+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
>   +- TungstenExchange 
> hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200),
> None
>  +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
> +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
> AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806
> AS etv_demo_id#3813]
>+- Filter ((map_type#3809 = master_roster_to_etv) && NOT
> (demogroup#3803 = gender_age_id))
>   +- Scan ExistingRDD[demogroup#3803,
> demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,
> demovalue_old_map#3808,map_type#3809]
>
>
> Thanks
> Swapnil
>
> On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang  wrote:
>
>> Could you post the result of explain `c.explain`? If it is broadcast
>> join, you will see it in explain.
>>
>> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde <
>> swapnilushi...@gmail.com> wrote:
>>
>>> Hello
>>> I am trying a broadcast join on dataframes but it is still doing
>>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>>> higher but still no luck.
>>>
>>> Related piece of code-
>>>   val c = a.join(braodcast(b), "id")
>>>
>>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>>> around 4964 bytes.
>>> Help is very much appreciated!!
>>>
>>> Thanks
>>> Swapnil
>>>
>>>
>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
I think your dataframes are converted from RDDs, Are those RDDs computed or
read from files directly? I guess it might affect how spark compute the
execution plan.

Try this: save your data frame which will be broadcasted to HDFS, and read
it back into a dataframe. Then do the join and check the explain plan.

On Sat, Nov 26, 2016 at 12:04 PM, Swapnil Shinde 
wrote:

> I am using Spark 1.6.3 and below is the real plan (a,b,c in above were
> just for illustration purpose)
>
> == Physical Plan ==
> Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
> mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
> +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
> [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
>:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
>:  +- TungstenExchange 
> hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200),
> None
>: +- Project [_1#3797 AS ltt#3800,_2#3798 AS
> mr_demo_id#3801,_3#3799 AS mr_demoname#3802]
>:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
>+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
>   +- TungstenExchange 
> hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200),
> None
>  +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
> +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
> AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806
> AS etv_demo_id#3813]
>+- Filter ((map_type#3809 = master_roster_to_etv) && NOT
> (demogroup#3803 = gender_age_id))
>   +- Scan ExistingRDD[demogroup#3803,
> demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,
> demovalue_old_map#3808,map_type#3809]
>
>
> Thanks
> Swapnil
>
> On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang  wrote:
>
>> Could you post the result of explain `c.explain`? If it is broadcast
>> join, you will see it in explain.
>>
>> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde <
>> swapnilushi...@gmail.com> wrote:
>>
>>> Hello
>>> I am trying a broadcast join on dataframes but it is still doing
>>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>>> higher but still no luck.
>>>
>>> Related piece of code-
>>>   val c = a.join(braodcast(b), "id")
>>>
>>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>>> around 4964 bytes.
>>> Help is very much appreciated!!
>>>
>>> Thanks
>>> Swapnil
>>>
>>>
>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Swapnil Shinde
I am using Spark 1.6.3 and below is the real plan (a,b,c in above were just
for illustration purpose)

== Physical Plan ==
Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
+- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
[mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
   :- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
   :  +- TungstenExchange
hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), None
   : +- Project [_1#3797 AS ltt#3800,_2#3798 AS mr_demo_id#3801,_3#3799
AS mr_demoname#3802]
   :+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
   +- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
  +- TungstenExchange
hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), None
 +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
+- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
AS mr_demo_id#3811,demoname#3805 AS
mr_demo_value#3812,demovalue_etv_map#3806 AS etv_demo_id#3813]
   +- Filter ((map_type#3809 = master_roster_to_etv) && NOT
(demogroup#3803 = gender_age_id))
  +- Scan
ExistingRDD[demogroup#3803,demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,demovalue_old_map#3808,map_type#3809]


Thanks
Swapnil

On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang  wrote:

> Could you post the result of explain `c.explain`? If it is broadcast join,
> you will see it in explain.
>
> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde  > wrote:
>
>> Hello
>> I am trying a broadcast join on dataframes but it is still doing
>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>> higher but still no luck.
>>
>> Related piece of code-
>>   val c = a.join(braodcast(b), "id")
>>
>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>> around 4964 bytes.
>> Help is very much appreciated!!
>>
>> Thanks
>> Swapnil
>>
>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
Could you post the result of explain `c.explain`? If it is broadcast join,
you will see it in explain.

On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde 
wrote:

> Hello
> I am trying a broadcast join on dataframes but it is still doing
> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
> higher but still no luck.
>
> Related piece of code-
>   val c = a.join(braodcast(b), "id")
>
> On a side note, if I do SizeEstimator.estimate(b) and it is really
> high(460956584 bytes) compared to data it contains. b has just 85 rows and
> around 4964 bytes.
> Help is very much appreciated!!
>
> Thanks
> Swapnil
>
>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Selvam Raman
Hi,

Which version of spark you are using.

Less than 10Mb automatically converted as broadcast join in spark.

\Thanks,
selvam R

On Sat, Nov 26, 2016 at 6:51 PM, Swapnil Shinde 
wrote:

> Hello
> I am trying a broadcast join on dataframes but it is still doing
> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
> higher but still no luck.
>
> Related piece of code-
>   val c = a.join(braodcast(b), "id")
>
> On a side note, if I do SizeEstimator.estimate(b) and it is really
> high(460956584 bytes) compared to data it contains. b has just 85 rows and
> around 4964 bytes.
> Help is very much appreciated!!
>
> Thanks
> Swapnil
>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Dataframe broadcast join hint not working

2016-11-26 Thread Swapnil Shinde
Hello
I am trying a broadcast join on dataframes but it is still doing
SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
higher but still no luck.

Related piece of code-
  val c = a.join(braodcast(b), "id")

On a side note, if I do SizeEstimator.estimate(b) and it is really
high(460956584 bytes) compared to data it contains. b has just 85 rows and
around 4964 bytes.
Help is very much appreciated!!

Thanks
Swapnil