Abdeali, Jason: while submitting spark job num-executors 8, num-cores 8, driver-memory 14g and executor-memory 14g, the size of total data was processed were 5 GB with 100+ aggregation and 50+ different joins at various data frame level.
So it is really hard to tell specific number of partitions. But I have not done repartition / coalesce so default 200 would be used, I guess. I read , a long time ago that Window function is killer. So I wanted to clarify my doubt. Thanks On Thu, Apr 4, 2019 at 10:43 PM Jason Nerothin <jasonnerot...@gmail.com> wrote: > My thinking is that if you run everything in one partition - say 12 GB - > then you don't experience the partitioning problem - one partition will > have all duplicates. > > If that's not the case, there are other options, but would probably > require a design change. > > On Thu, Apr 4, 2019 at 8:46 AM Jason Nerothin <jasonnerot...@gmail.com> > wrote: > >> How much memory do you have per partition? >> >> On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri <chetan.opensou...@gmail.com> >> wrote: >> >>> I will get the information and will share with you. >>> >>> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari <abdealikoth...@gmail.com> >>> wrote: >>> >>>> How long does it take to do the window solution ? (Also mention how >>>> many executors was your spark application using on average during that >>>> time) >>>> I am not aware of anything that is faster. When I ran is on my data >>>> ~8-9GB I think it took less than 5 mins (don't remember exact time) >>>> >>>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri < >>>> chetan.opensou...@gmail.com> wrote: >>>> >>>>> Thanks for awesome clarification / explanation. >>>>> >>>>> I have cases where update_time can be same. >>>>> I am in need of suggestions, where I have very large data like 5 GB, >>>>> this window based solution which I mentioned is taking very long time. >>>>> >>>>> Thanks again. >>>>> >>>>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari < >>>>> abdealikoth...@gmail.com> wrote: >>>>> >>>>>> So, the above code for min() worked for me fine in general, but there >>>>>> was one corner case where it failed. >>>>>> Which was when I have something like: >>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000* >>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000* >>>>>> invoice_id=1, update_time=2018-02-03 14:00:00.000 >>>>>> >>>>>> In this example, the update_time for 2 records is the exact same. So, >>>>>> doing a filter for the min() will result in 2 records for the >>>>>> invoice_id=1. >>>>>> This is avoided in your code snippet of row_num - because 2 rows will >>>>>> never have row_num = 1 >>>>>> >>>>>> But note that here - row_num=1 and row_num=2 will be randomly ordered >>>>>> (because orderBy is on update_time and they have the same value of >>>>>> update_time). >>>>>> Hence dropDuplicates can be used there cause it can be either one of >>>>>> those rows. >>>>>> >>>>>> Overall - dropDuplicates seems like it's meant for cases where you >>>>>> literally have redundant duplicated data. And not for filtering to get >>>>>> first/last etc. >>>>>> >>>>>> >>>>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri < >>>>>> chetan.opensou...@gmail.com> wrote: >>>>>> >>>>>>> Hello Abdeali, Thank you for your response. >>>>>>> >>>>>>> Can you please explain me this line, And the dropDuplicates at the >>>>>>> end ensures records with two values for the same 'update_time' don't >>>>>>> cause >>>>>>> issues. >>>>>>> >>>>>>> Sorry I didn't get quickly. :) >>>>>>> >>>>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari < >>>>>>> abdealikoth...@gmail.com> wrote: >>>>>>> >>>>>>>> I've faced this issue too - and a colleague pointed me to the >>>>>>>> documentation - >>>>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates >>>>>>>> dropDuplicates docs does not say that it will guarantee that it >>>>>>>> will return the "first" record (even if you sort your dataframe) >>>>>>>> It would give you any record it finds and just ensure that >>>>>>>> duplicates are not present. >>>>>>>> >>>>>>>> The only way I know of how to do this is what you did, but you can >>>>>>>> avoid the sorting inside the partition with something like (in >>>>>>>> pyspark): >>>>>>>> >>>>>>>> from pyspark.sql import Window, functions as F >>>>>>>> df = df.withColumn('wanted_time', >>>>>>>> F.min('update_time').over(Window.partitionBy('invoice_id'))) >>>>>>>> out_df = df.filter(df['update_time'] == df['wanted_time']) >>>>>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time') >>>>>>>> >>>>>>>> The min() is faster than doing an orderBy() and a row_number(). >>>>>>>> And the dropDuplicates at the end ensures records with two values >>>>>>>> for the same 'update_time' don't cause issues. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri < >>>>>>>> chetan.opensou...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello Dear Spark Users, >>>>>>>>> >>>>>>>>> I am using dropDuplicate on a DataFrame generated from large >>>>>>>>> parquet file from(HDFS) and doing dropDuplicate based on timestamp >>>>>>>>> based >>>>>>>>> column, every time I run it drops different - different rows based on >>>>>>>>> same >>>>>>>>> timestamp. >>>>>>>>> >>>>>>>>> What I tried and worked >>>>>>>>> >>>>>>>>> val wSpec = Window.partitionBy($"invoice_ >>>>>>>>> id").orderBy($"update_time".desc) >>>>>>>>> >>>>>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn", >>>>>>>>> row_number.over(wSpec)).where($"rn" === 1) >>>>>>>>> .drop("rn").drop("update_time") >>>>>>>>> >>>>>>>>> But this is damn slow... >>>>>>>>> >>>>>>>>> Can someone please throw a light. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> >> >> -- >> Thanks, >> Jason >> > > > -- > Thanks, > Jason >