Re: [DISCUSS] naming policy of Spark configs

2020-02-12 Thread Rubén Berenguel
I love it, it will make configs easier to read and write. Thanks Wenchen. 

R

> On 13 Feb 2020, at 00:15, Dongjoon Hyun  wrote:
> 
> 
> Thank you, Wenchen.
> 
> The new policy looks clear to me. +1 for the explicit policy.
> 
> So, are we going to revise the existing conf names before 3.0.0 release?
> 
> Or, is it applied to new up-coming configurations from now?
> 
> Bests,
> Dongjoon.
> 
>> On Wed, Feb 12, 2020 at 7:43 AM Wenchen Fan  wrote:
>> Hi all,
>> 
>> I'd like to discuss the naming policy of Spark configs, as for now it 
>> depends on personal preference which leads to inconsistent namings.
>> 
>> In general, the config name should be a noun that describes its meaning 
>> clearly.
>> Good examples:
>> spark.sql.session.timeZone
>> spark.sql.streaming.continuous.executorQueueSize
>> spark.sql.statistics.histogram.numBins
>> Bad examples:
>> spark.sql.defaultSizeInBytes (default size for what?)
>> 
>> Also note that, config name has many parts, joined by dots. Each part is a 
>> namespace. Don't create namespace unnecessarily.
>> Good example:
>> spark.sql.execution.rangeExchange.sampleSizePerPartition
>> spark.sql.execution.arrow.maxRecordsPerBatch
>> Bad examples:
>> spark.sql.windowExec.buffer.in.memory.threshold ("in" is not a useful 
>> namespace, better to be .buffer.inMemoryThreshold)
>> 
>> For a big feature, usually we need to create an umbrella config to turn it 
>> on/off, and other configs for fine-grained controls. These configs should 
>> share the same namespace, and the umbrella config should be named like 
>> featureName.enabled. For example:
>> spark.sql.cbo.enabled
>> spark.sql.cbo.starSchemaDetection
>> spark.sql.cbo.starJoinFTRatio
>> spark.sql.cbo.joinReorder.enabled
>> spark.sql.cbo.joinReorder.dp.threshold (BTW "dp" is not a good namespace)
>> spark.sql.cbo.joinReorder.card.weight (BTW "card" is not a good namespace)
>> 
>> For boolean configs, in general it should end with a verb, e.g. 
>> spark.sql.join.preferSortMergeJoin. If the config is for a feature and you 
>> can't find a good verb for the feature, featureName.enabled is also good.
>> 
>> I'll update https://spark.apache.org/contributing.html after we reach a 
>> consensus here. Any comments are welcome!
>> 
>> Thanks,
>> Wenchen
>> 
>> 


Re: [DISCUSS] Expensive deterministic UDFs

2019-11-07 Thread Rubén Berenguel
That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards, 

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen  wrote:
> 
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
> 
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
> 
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
> 
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
> 
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack  wrote:
>> 
>> Hi all,
>> 
>> Running expensive deterministic UDFs that return complex types, followed by 
>> multiple references to those results cause Spark to evaluate the UDF 
>> multiple times per row. This has been reported and discussed before: 
>> SPARK-18748 SPARK-17728
>> 
>>val f: Int => Array[Int]
>>val udfF = udf(f)
>>df
>>  .select($"id", udfF($"id").as("array"))
>>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> A common approach to make Spark evaluate the UDF only once is to cache the 
>> intermediate result right after projecting the UDF:
>> 
>>df
>>  .select($"id", udfF($"id").as("array"))
>>  .cache()
>>  .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> There are scenarios where this intermediate result is too big for the 
>> cluster to cache. Also this is bad design.
>> 
>> The best approach is to mark the UDF as non-deterministic. Then Spark 
>> optimizes the query in a way that the UDF gets called only once per row, 
>> exactly what you want.
>> 
>>val udfF = udf(f).asNondeterministic()
>> 
>> However, stating a UDF is non-deterministic though it clearly is 
>> deterministic is counter-intuitive and makes your code harder to read.
>> 
>> Spark should provide a better way to flag the UDF. Calling it expensive 
>> would be a better naming here.
>> 
>>val udfF = udf(f).asExpensive()
>> 
>> I understand that deterministic is a notion that Expression provides, and 
>> there is no equivalent to expensive that is understood by the optimizer. 
>> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = 
>> deterministic && !expensive, which implements the best available approach 
>> behind a better naming.
>> 
>> What are your thoughts on asExpensive()?
>> 
>> Regards,
>> Enrico
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re:Re: How to force sorted merge join to broadcast join

2019-07-29 Thread Rubén Berenguel
I think there is no way of doing that (at least don't remember one right
now). The closer I remember now, is you can run the SQL "ANALYZE TABLE
table_name COMPUTE STATISTIC" to compute them regardless of having a query
(also hints the cost based optimiser if I remember correctly), but as far
as displaying them it escapes me right now if it can be done.

R

-- 
Rubén Berenguel

On 29 July 2019 at 11:03:13, zhangliyun (kelly...@126.com) wrote:

thks! after using the syntax provided in the link, select /*+ BROADCAST (A)
*/ ...  , i got what i want.
but i want to ask beside using queryExecution.stringWithStats (dataframe
api) to show the table statistics, is there any way to show the table
statistics in explain xxx in spark sql command line?

Best Regards
Kelly



在 2019-07-29 14:29:50,"Rubén Berenguel"  写道:

Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html
(thanks
Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the
join (for clarity). Also keep in mind using the methods is more
powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it
depends on personal preference).

Regards,

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun (kelly...@126.com) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A
has problem.

A join B is sort merged join while the join key ( trim(A.country) =
trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark
sql to use
broadcast join (I can not use enlarge the
spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of
spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and
i did not know
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force
the sort merge join to
broadcast join.


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio *
1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) =
'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string,
credit_card_created_date: string, card_usage: smallint, cc_category:
string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2),
dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375,
credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379,
nsf_risk#380, nsf_cards_ratio#381, dt#382,
CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381
as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2,
DecimalType(22,4)) as decimal(38,16))) /
promote_precision(cast(nsf_on_entry#386 as decimal(38,16,
DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372,
country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376,
card_usage#377, cc_category#378, cc_size#379, nsf_risk#380,
nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
  +- HiveTableRelation `fpv365h`.`cc_rank_agg`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383,
num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the
physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang


Re: How to force sorted merge join to broadcast join

2019-07-29 Thread Rubén Berenguel
Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html
(thanks
Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the
join (for clarity). Also keep in mind using the methods is more
powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it
depends on personal preference).

Regards,

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun (kelly...@126.com) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A
has problem.

A join B is sort merged join while the join key ( trim(A.country) =
trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark
sql to use
broadcast join (I can not use enlarge the
spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of
spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and
i did not know
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force
the sort merge join to
broadcast join.


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio *
1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) =
'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string,
credit_card_created_date: string, card_usage: smallint, cc_category:
string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2),
dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375,
credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379,
nsf_risk#380, nsf_cards_ratio#381, dt#382,
CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381
as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2,
DecimalType(22,4)) as decimal(38,16))) /
promote_precision(cast(nsf_on_entry#386 as decimal(38,16,
DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372,
country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376,
card_usage#377, cc_category#378, cc_size#379, nsf_risk#380,
nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
  +- HiveTableRelation `fpv365h`.`cc_rank_agg`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383,
num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the
physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang


Re: Do you use single-quote syntax for the DataFrame API?

2019-03-31 Thread Rubén Berenguel
I favour using either $”foo” or columnar expressions, but know of several
developers who prefer single quote syntax and consider it a better practice.

R

On 31 March 2019 at 15:15:00, Sean Owen (sro...@apache.org) wrote:

> FWIW I use "foo" in Pyspark or col("foo") where necessary, and $"foo" in
> Scala
>
> On Sun, Mar 31, 2019 at 1:58 AM Reynold Xin  wrote:
>
>> As part of evolving the Scala language, the Scala team is considering
>> removing single-quote syntax for representing symbols. Single-quote syntax
>> is one of the ways to represent a column in Spark's DataFrame API. While I
>> personally don't use them (I prefer just using strings for column names, or
>> using expr function), I see them used quite a lot by other people's code,
>> e.g.
>>
>> df.select('id, 'name).show()
>>
>> I want to bring this to more people's attention, in case they are
>> depending on this. The discussion thread is:
>> https://contributors.scala-lang.org/t/proposal-to-deprecate-and-remove-symbol-literals/2953
>>
>>
>>
>>