I will get the information and will share with you. On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari <abdealikoth...@gmail.com> wrote:
> How long does it take to do the window solution ? (Also mention how many > executors was your spark application using on average during that time) > I am not aware of anything that is faster. When I ran is on my data ~8-9GB > I think it took less than 5 mins (don't remember exact time) > > On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <chetan.opensou...@gmail.com> > wrote: > >> Thanks for awesome clarification / explanation. >> >> I have cases where update_time can be same. >> I am in need of suggestions, where I have very large data like 5 GB, this >> window based solution which I mentioned is taking very long time. >> >> Thanks again. >> >> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <abdealikoth...@gmail.com> >> wrote: >> >>> So, the above code for min() worked for me fine in general, but there >>> was one corner case where it failed. >>> Which was when I have something like: >>> invoice_id=1, update_time=*2018-01-01 15:00:00.000* >>> invoice_id=1, update_time=*2018-01-01 15:00:00.000* >>> invoice_id=1, update_time=2018-02-03 14:00:00.000 >>> >>> In this example, the update_time for 2 records is the exact same. So, >>> doing a filter for the min() will result in 2 records for the invoice_id=1. >>> This is avoided in your code snippet of row_num - because 2 rows will >>> never have row_num = 1 >>> >>> But note that here - row_num=1 and row_num=2 will be randomly ordered >>> (because orderBy is on update_time and they have the same value of >>> update_time). >>> Hence dropDuplicates can be used there cause it can be either one of >>> those rows. >>> >>> Overall - dropDuplicates seems like it's meant for cases where you >>> literally have redundant duplicated data. And not for filtering to get >>> first/last etc. >>> >>> >>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri < >>> chetan.opensou...@gmail.com> wrote: >>> >>>> Hello Abdeali, Thank you for your response. >>>> >>>> Can you please explain me this line, And the dropDuplicates at the end >>>> ensures records with two values for the same 'update_time' don't cause >>>> issues. >>>> >>>> Sorry I didn't get quickly. :) >>>> >>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari < >>>> abdealikoth...@gmail.com> wrote: >>>> >>>>> I've faced this issue too - and a colleague pointed me to the >>>>> documentation - >>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates >>>>> dropDuplicates docs does not say that it will guarantee that it will >>>>> return the "first" record (even if you sort your dataframe) >>>>> It would give you any record it finds and just ensure that duplicates >>>>> are not present. >>>>> >>>>> The only way I know of how to do this is what you did, but you can >>>>> avoid the sorting inside the partition with something like (in pyspark): >>>>> >>>>> from pyspark.sql import Window, functions as F >>>>> df = df.withColumn('wanted_time', >>>>> F.min('update_time').over(Window.partitionBy('invoice_id'))) >>>>> out_df = df.filter(df['update_time'] == df['wanted_time']) >>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time') >>>>> >>>>> The min() is faster than doing an orderBy() and a row_number(). >>>>> And the dropDuplicates at the end ensures records with two values for >>>>> the same 'update_time' don't cause issues. >>>>> >>>>> >>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri < >>>>> chetan.opensou...@gmail.com> wrote: >>>>> >>>>>> Hello Dear Spark Users, >>>>>> >>>>>> I am using dropDuplicate on a DataFrame generated from large parquet >>>>>> file from(HDFS) and doing dropDuplicate based on timestamp based column, >>>>>> every time I run it drops different - different rows based on same >>>>>> timestamp. >>>>>> >>>>>> What I tried and worked >>>>>> >>>>>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time". >>>>>> desc) >>>>>> >>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn", >>>>>> row_number.over(wSpec)).where($"rn" === 1) >>>>>> .drop("rn").drop("update_time") >>>>>> >>>>>> But this is damn slow... >>>>>> >>>>>> Can someone please throw a light. >>>>>> >>>>>> Thanks >>>>>> >>>>>>