Re: [DISCUSS] naming policy of Spark configs
I love it, it will make configs easier to read and write. Thanks Wenchen. R > On 13 Feb 2020, at 00:15, Dongjoon Hyun wrote: > > > Thank you, Wenchen. > > The new policy looks clear to me. +1 for the explicit policy. > > So, are we going to revise the existing conf names before 3.0.0 release? > > Or, is it applied to new up-coming configurations from now? > > Bests, > Dongjoon. > >> On Wed, Feb 12, 2020 at 7:43 AM Wenchen Fan wrote: >> Hi all, >> >> I'd like to discuss the naming policy of Spark configs, as for now it >> depends on personal preference which leads to inconsistent namings. >> >> In general, the config name should be a noun that describes its meaning >> clearly. >> Good examples: >> spark.sql.session.timeZone >> spark.sql.streaming.continuous.executorQueueSize >> spark.sql.statistics.histogram.numBins >> Bad examples: >> spark.sql.defaultSizeInBytes (default size for what?) >> >> Also note that, config name has many parts, joined by dots. Each part is a >> namespace. Don't create namespace unnecessarily. >> Good example: >> spark.sql.execution.rangeExchange.sampleSizePerPartition >> spark.sql.execution.arrow.maxRecordsPerBatch >> Bad examples: >> spark.sql.windowExec.buffer.in.memory.threshold ("in" is not a useful >> namespace, better to be .buffer.inMemoryThreshold) >> >> For a big feature, usually we need to create an umbrella config to turn it >> on/off, and other configs for fine-grained controls. These configs should >> share the same namespace, and the umbrella config should be named like >> featureName.enabled. For example: >> spark.sql.cbo.enabled >> spark.sql.cbo.starSchemaDetection >> spark.sql.cbo.starJoinFTRatio >> spark.sql.cbo.joinReorder.enabled >> spark.sql.cbo.joinReorder.dp.threshold (BTW "dp" is not a good namespace) >> spark.sql.cbo.joinReorder.card.weight (BTW "card" is not a good namespace) >> >> For boolean configs, in general it should end with a verb, e.g. >> spark.sql.join.preferSortMergeJoin. If the config is for a feature and you >> can't find a good verb for the feature, featureName.enabled is also good. >> >> I'll update https://spark.apache.org/contributing.html after we reach a >> consensus here. Any comments are welcome! >> >> Thanks, >> Wenchen >> >>
Re: [DISCUSS] Expensive deterministic UDFs
That was very interesting, thanks Enrico. Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases. Regards, Ruben > On 7 Nov 2019, at 11:09, Sean Owen wrote: > > Interesting, what does non-deterministic do except have this effect? > aside from the naming, it could be a fine use of this flag if that's > all it effectively does. I'm not sure I'd introduce another flag with > the same semantics just over naming. If anything 'expensive' also > isn't the right word, more like 'try not to evaluate multiple times'. > > Why isn't caching the answer? I realize it's big, but you can cache to > disk. This may be faster than whatever plan reordering has to happen > to evaluate once. > > Usually I'd say, can you redesign your UDF and code to be more > efficient too? or use a big a cluster if that's really what you need. > > At first look, no I don't think this Spark-side workaround for naming > for your use case is worthwhile. There are existing better solutions. > > On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack wrote: >> >> Hi all, >> >> Running expensive deterministic UDFs that return complex types, followed by >> multiple references to those results cause Spark to evaluate the UDF >> multiple times per row. This has been reported and discussed before: >> SPARK-18748 SPARK-17728 >> >>val f: Int => Array[Int] >>val udfF = udf(f) >>df >> .select($"id", udfF($"id").as("array")) >> .select($"array"(0).as("array0"), $"array"(1).as("array1")) >> >> A common approach to make Spark evaluate the UDF only once is to cache the >> intermediate result right after projecting the UDF: >> >>df >> .select($"id", udfF($"id").as("array")) >> .cache() >> .select($"array"(0).as("array0"), $"array"(1).as("array1")) >> >> There are scenarios where this intermediate result is too big for the >> cluster to cache. Also this is bad design. >> >> The best approach is to mark the UDF as non-deterministic. Then Spark >> optimizes the query in a way that the UDF gets called only once per row, >> exactly what you want. >> >>val udfF = udf(f).asNondeterministic() >> >> However, stating a UDF is non-deterministic though it clearly is >> deterministic is counter-intuitive and makes your code harder to read. >> >> Spark should provide a better way to flag the UDF. Calling it expensive >> would be a better naming here. >> >>val udfF = udf(f).asExpensive() >> >> I understand that deterministic is a notion that Expression provides, and >> there is no equivalent to expensive that is understood by the optimizer. >> However, that asExpensive() could just set the ScalaUDF.udfDeterministic = >> deterministic && !expensive, which implements the best available approach >> behind a better naming. >> >> What are your thoughts on asExpensive()? >> >> Regards, >> Enrico > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re:Re: How to force sorted merge join to broadcast join
I think there is no way of doing that (at least don't remember one right now). The closer I remember now, is you can run the SQL "ANALYZE TABLE table_name COMPUTE STATISTIC" to compute them regardless of having a query (also hints the cost based optimiser if I remember correctly), but as far as displaying them it escapes me right now if it can be done. R -- Rubén Berenguel On 29 July 2019 at 11:03:13, zhangliyun (kelly...@126.com) wrote: thks! after using the syntax provided in the link, select /*+ BROADCAST (A) */ ... , i got what i want. but i want to ask beside using queryExecution.stringWithStats (dataframe api) to show the table statistics, is there any way to show the table statistics in explain xxx in spark sql command line? Best Regards Kelly 在 2019-07-29 14:29:50,"Rubén Berenguel" 写道: Hi, I hope this answers your question. You can hint the broadcast in SQL as detailed here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks Jacek :) ) I'd recommend creating a temporary table with the trimming you use in the join (for clarity). Also keep in mind using the methods is more powerful/readable than using Spark SQL directly (as happens with the broadcast case, although it depends on personal preference). Regards, Ruben -- Rubén Berenguel On 29 July 2019 at 07:12:30, zhangliyun (kelly...@126.com) wrote: Hi all: i want to ask a question about broadcast join in spark sql. ``` select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop from B left join A on trim(A.country) = trim(B.cntry_code); ``` here A is a small table only 8 rows, but somehow the statistics of table A has problem. A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only has serveral values( neary 21 countries). is there any way i force spark sql to use broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold as i did not know the detail size of spark sql deal with it ). I tried to print the physical plan , but it did not show the table size and i did not know how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to broadcast join. ``` == Parsed Logical Plan == 'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369] +- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code)) :- 'UnresolvedRelation `cc_base_part1` +- 'UnresolvedRelation `cc_rank_agg` == Analyzed Logical Plan == cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6) Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2, DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16, DecimalType(38,6)) AS nsf_ratio_to_pop#369] +- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None)) :- SubqueryAlias cc_base_part1 : +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382] +- SubqueryAlias cc_rank_agg +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387] ``` Does spark have any command to show the table size when printing the physical plan ? Appreciate if you can help my question. Best regards Kelly Zhang
Re: How to force sorted merge join to broadcast join
Hi, I hope this answers your question. You can hint the broadcast in SQL as detailed here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks Jacek :) ) I'd recommend creating a temporary table with the trimming you use in the join (for clarity). Also keep in mind using the methods is more powerful/readable than using Spark SQL directly (as happens with the broadcast case, although it depends on personal preference). Regards, Ruben -- Rubén Berenguel On 29 July 2019 at 07:12:30, zhangliyun (kelly...@126.com) wrote: Hi all: i want to ask a question about broadcast join in spark sql. ``` select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop from B left join A on trim(A.country) = trim(B.cntry_code); ``` here A is a small table only 8 rows, but somehow the statistics of table A has problem. A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only has serveral values( neary 21 countries). is there any way i force spark sql to use broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold as i did not know the detail size of spark sql deal with it ). I tried to print the physical plan , but it did not show the table size and i did not know how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to broadcast join. ``` == Parsed Logical Plan == 'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369] +- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code)) :- 'UnresolvedRelation `cc_base_part1` +- 'UnresolvedRelation `cc_rank_agg` == Analyzed Logical Plan == cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6) Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2, DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16, DecimalType(38,6)) AS nsf_ratio_to_pop#369] +- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None)) :- SubqueryAlias cc_base_part1 : +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382] +- SubqueryAlias cc_rank_agg +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387] ``` Does spark have any command to show the table size when printing the physical plan ? Appreciate if you can help my question. Best regards Kelly Zhang
Re: Do you use single-quote syntax for the DataFrame API?
I favour using either $”foo” or columnar expressions, but know of several developers who prefer single quote syntax and consider it a better practice. R On 31 March 2019 at 15:15:00, Sean Owen (sro...@apache.org) wrote: > FWIW I use "foo" in Pyspark or col("foo") where necessary, and $"foo" in > Scala > > On Sun, Mar 31, 2019 at 1:58 AM Reynold Xin wrote: > >> As part of evolving the Scala language, the Scala team is considering >> removing single-quote syntax for representing symbols. Single-quote syntax >> is one of the ways to represent a column in Spark's DataFrame API. While I >> personally don't use them (I prefer just using strings for column names, or >> using expr function), I see them used quite a lot by other people's code, >> e.g. >> >> df.select('id, 'name).show() >> >> I want to bring this to more people's attention, in case they are >> depending on this. The discussion thread is: >> https://contributors.scala-lang.org/t/proposal-to-deprecate-and-remove-symbol-literals/2953 >> >> >> >>