May I ask for daraframe API and sql API, which is better on performance? Thanks
On Sun, Jan 2, 2022 at 8:06 PM Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Mich, > > your notes are really great, it really brought back the old days again :) > thanks. > > Just to note a few points that I found useful related to this question: > 1. cores and threads - page 5 > 2. executor cores and number settings - page 6.. > > > I think that the following example may be of use, note that I have one > driver and that has 8 cores as I am running PYSPARK 3.1.2 in local mode, > but this will give a way to find out a bit more possibly: > > ======================================================================== > >>> from pyspark.sql.types import * > >>> #create the filter dataframe, there are easier ways to do the below > >>> spark.createDataFrame(list(map(lambda filter: pyspark.sql.Row(filter), > [0, 1, 2, 4, 7, 9])), StructType([StructField("filter_value", > IntegerType())])).createOrReplaceTempView("filters") > >>> #create the main table > >>> spark.range(10000000000).createOrReplaceTempView("test_base") > >>> spark.sql("SELECT id, FLOOR(RAND() * 10) rand FROM > test_base").createOrReplaceTempView("test") > >>> #see the partitions in the filters and the main table > >>> spark.sql("SELECT * FROM filters").rdd.getNumPartitions() > 8 > >>> spark.sql("SELECT * FROM test").rdd.getNumPartitions() > 8 > >>> #see the number of partitions in the filtered join output, I am > assuming implicit casting here > >>> spark.sql("SELECT * FROM test WHERE rand IN (SELECT filter_value FROM > filters)").rdd.getNumPartitions() > 200 > >>> spark.sql("SET spark.sql.shuffle.partitions=10") > DataFrame[key: string, value: string] > >>> spark.sql("SELECT * FROM test WHERE rand IN (SELECT filter_value FROM > filters)").rdd.getNumPartitions() > 10 > ======================================================================== > > Please do refer to the following page for adaptive sql execution in SPARK > 3, it will be of massive help particularly in case you are handling skewed > joins, https://spark.apache.org/docs/latest/sql-performance-tuning.html > > > Thanks and Regards, > Gourav Sengupta > > On Sun, Jan 2, 2022 at 11:24 AM Bitfox <bit...@bitfox.top> wrote: > >> Thanks Mich. That looks good. >> >> On Sun, Jan 2, 2022 at 7:10 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> LOL. >>> >>> You asking these questions takes me back to summer 2016 when I started >>> writing notes on spark. Obviously earlier versions but the notion of RDD, >>> Local, standalone, YARN etc. are still valid. Those days there were no k8s >>> and the public cloud was not widely adopted. I browsed it and it was >>> refreshing for me. Anyway you may find some points addressing your >>> questions that you tend to ask. >>> >>> HTH >>> >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Sun, 2 Jan 2022 at 00:20, Bitfox <bit...@bitfox.top> wrote: >>> >>>> One more question, for this big filter, given my server has 4 Cores, >>>> will spark (standalone mode) split the RDD to 4 partitions automatically? >>>> >>>> Thanks >>>> >>>> On Sun, Jan 2, 2022 at 6:30 AM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Create a list of values that you don't want anf filter oon those >>>>> >>>>> >>> DF = spark.range(10) >>>>> >>> DF >>>>> DataFrame[id: bigint] >>>>> >>> >>>>> >>> array = [1, 2, 3, 8] # don't want these >>>>> >>> DF.filter(DF.id.isin(array) == False).show() >>>>> +---+ >>>>> | id| >>>>> +---+ >>>>> | 0| >>>>> | 4| >>>>> | 5| >>>>> | 6| >>>>> | 7| >>>>> | 9| >>>>> +---+ >>>>> >>>>> or use binary NOT operator: >>>>> >>>>> >>>>> >>> DF.filter(*~*DF.id.isin(array)).show() >>>>> >>>>> +---+ >>>>> >>>>> | id| >>>>> >>>>> +---+ >>>>> >>>>> | 0| >>>>> >>>>> | 4| >>>>> >>>>> | 5| >>>>> >>>>> | 6| >>>>> >>>>> | 7| >>>>> >>>>> | 9| >>>>> >>>>> +---+ >>>>> >>>>> >>>>> HTH >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, 1 Jan 2022 at 20:59, Bitfox <bit...@bitfox.top> wrote: >>>>> >>>>>> Using the dataframe API I need to implement a batch filter: >>>>>> >>>>>> DF. select(..).where(col(..) != ‘a’ and col(..) != ‘b’ and …) >>>>>> >>>>>> There are a lot of keywords should be filtered for the same column in >>>>>> where statement. >>>>>> >>>>>> How can I make it more smater? UDF or others? >>>>>> >>>>>> Thanks & Happy new Year! >>>>>> Bitfox >>>>>> >>>>>