Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
Abdeali, Jason:

while submitting spark job num-executors 8, num-cores 8, driver-memory 14g
and executor-memory 14g, the size of total data was processed were 5 GB
with 100+ aggregation and 50+ different joins at various data frame level.

So it is really hard to tell specific number of partitions. But I have not
done repartition / coalesce so default 200 would be used, I guess.

I read , a long time ago that Window function is killer. So I wanted to
clarify my doubt.

Thanks



On Thu, Apr 4, 2019 at 10:43 PM Jason Nerothin 
wrote:

> My thinking is that if you run everything in one partition - say 12 GB -
> then you don't experience the partitioning problem - one partition will
> have all duplicates.
>
> If that's not the case, there are other options, but would probably
> require a design change.
>
> On Thu, Apr 4, 2019 at 8:46 AM Jason Nerothin 
> wrote:
>
>> How much memory do you have per partition?
>>
>> On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri 
>> wrote:
>>
>>> I will get the information and will share with you.
>>>
>>> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
>>> wrote:
>>>
 How long does it take to do the window solution ? (Also mention how
 many executors was your spark application using on average during that 
 time)
 I am not aware of anything that is faster. When I ran is on my data
 ~8-9GB I think it took less than 5 mins (don't remember exact time)

 On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:

> Thanks for awesome clarification / explanation.
>
> I have cases where update_time can be same.
> I am in need of suggestions, where I have very large data like 5 GB,
> this window based solution which I mentioned is taking very long time.
>
> Thanks again.
>
> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
> abdealikoth...@gmail.com> wrote:
>
>> So, the above code for min() worked for me fine in general, but there
>> was one corner case where it failed.
>> Which was when I have something like:
>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>
>> In this example, the update_time for 2 records is the exact same. So,
>> doing a filter for the min() will result in 2 records for the 
>> invoice_id=1.
>> This is avoided in your code snippet of row_num - because 2 rows will
>> never have row_num = 1
>>
>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>> (because orderBy is on update_time and they have the same value of
>> update_time).
>> Hence dropDuplicates can be used there cause it can be either one of
>> those rows.
>>
>> Overall - dropDuplicates seems like it's meant for cases where you
>> literally have redundant duplicated data. And not for filtering to get
>> first/last etc.
>>
>>
>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Abdeali, Thank you for your response.
>>>
>>> Can you please explain me this line, And the dropDuplicates at the
>>> end ensures records with two values for the same 'update_time' don't 
>>> cause
>>> issues.
>>>
>>> Sorry I didn't get quickly. :)
>>>
>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>> abdealikoth...@gmail.com> wrote:
>>>
 I've faced this issue too - and a colleague pointed me to the
 documentation -
 https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
 dropDuplicates docs does not say that it will guarantee that it
 will return the "first" record (even if you sort your dataframe)
 It would give you any record it finds and just ensure that
 duplicates are not present.

 The only way I know of how to do this is what you did, but you can
 avoid the sorting inside the partition with something like (in 
 pyspark):

 from pyspark.sql import Window, functions as F
 df = df.withColumn('wanted_time',
 F.min('update_time').over(Window.partitionBy('invoice_id')))
 out_df = df.filter(df['update_time'] == df['wanted_time'])
 .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')

 The min() is faster than doing an orderBy() and a row_number().
 And the dropDuplicates at the end ensures records with two values
 for the same 'update_time' don't cause issues.


 On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:

> Hello Dear Spark Users,
>
> I am using dropDuplicate on a DataFrame generated from large
> parquet file from(HDFS) and doing dropDuplicate based on 

Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Jason Nerothin
My thinking is that if you run everything in one partition - say 12 GB -
then you don't experience the partitioning problem - one partition will
have all duplicates.

If that's not the case, there are other options, but would probably require
a design change.

On Thu, Apr 4, 2019 at 8:46 AM Jason Nerothin 
wrote:

> How much memory do you have per partition?
>
> On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri 
> wrote:
>
>> I will get the information and will share with you.
>>
>> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
>> wrote:
>>
>>> How long does it take to do the window solution ? (Also mention how many
>>> executors was your spark application using on average during that time)
>>> I am not aware of anything that is faster. When I ran is on my data
>>> ~8-9GB I think it took less than 5 mins (don't remember exact time)
>>>
>>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Thanks for awesome clarification / explanation.

 I have cases where update_time can be same.
 I am in need of suggestions, where I have very large data like 5 GB,
 this window based solution which I mentioned is taking very long time.

 Thanks again.

 On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
 abdealikoth...@gmail.com> wrote:

> So, the above code for min() worked for me fine in general, but there
> was one corner case where it failed.
> Which was when I have something like:
> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
> invoice_id=1, update_time=2018-02-03 14:00:00.000
>
> In this example, the update_time for 2 records is the exact same. So,
> doing a filter for the min() will result in 2 records for the 
> invoice_id=1.
> This is avoided in your code snippet of row_num - because 2 rows will
> never have row_num = 1
>
> But note that here - row_num=1 and row_num=2 will be randomly ordered
> (because orderBy is on update_time and they have the same value of
> update_time).
> Hence dropDuplicates can be used there cause it can be either one of
> those rows.
>
> Overall - dropDuplicates seems like it's meant for cases where you
> literally have redundant duplicated data. And not for filtering to get
> first/last etc.
>
>
> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Abdeali, Thank you for your response.
>>
>> Can you please explain me this line, And the dropDuplicates at the
>> end ensures records with two values for the same 'update_time' don't 
>> cause
>> issues.
>>
>> Sorry I didn't get quickly. :)
>>
>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>> abdealikoth...@gmail.com> wrote:
>>
>>> I've faced this issue too - and a colleague pointed me to the
>>> documentation -
>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>> dropDuplicates docs does not say that it will guarantee that it will
>>> return the "first" record (even if you sort your dataframe)
>>> It would give you any record it finds and just ensure that
>>> duplicates are not present.
>>>
>>> The only way I know of how to do this is what you did, but you can
>>> avoid the sorting inside the partition with something like (in pyspark):
>>>
>>> from pyspark.sql import Window, functions as F
>>> df = df.withColumn('wanted_time',
>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>
>>> The min() is faster than doing an orderBy() and a row_number().
>>> And the dropDuplicates at the end ensures records with two values
>>> for the same 'update_time' don't cause issues.
>>>
>>>
>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Hello Dear Spark Users,

 I am using dropDuplicate on a DataFrame generated from large
 parquet file from(HDFS) and doing dropDuplicate based on timestamp 
 based
 column, every time I run it drops different - different rows based on 
 same
 timestamp.

 What I tried and worked

 val wSpec = Window.partitionBy($"invoice_
 id").orderBy($"update_time".desc)

 val irqDistinctDF = irqFilteredDF.withColumn("rn",
 row_number.over(wSpec)).where($"rn" === 1)
 .drop("rn").drop("update_time")

 But this is damn slow...

 Can someone please throw a light.

 Thanks


>
> --
> Thanks,
> Jason
>


-- 
Thanks,
Jason


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Jason Nerothin
How much memory do you have per partition?

On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri 
wrote:

> I will get the information and will share with you.
>
> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
> wrote:
>
>> How long does it take to do the window solution ? (Also mention how many
>> executors was your spark application using on average during that time)
>> I am not aware of anything that is faster. When I ran is on my data
>> ~8-9GB I think it took less than 5 mins (don't remember exact time)
>>
>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri 
>> wrote:
>>
>>> Thanks for awesome clarification / explanation.
>>>
>>> I have cases where update_time can be same.
>>> I am in need of suggestions, where I have very large data like 5 GB,
>>> this window based solution which I mentioned is taking very long time.
>>>
>>> Thanks again.
>>>
>>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
>>> abdealikoth...@gmail.com> wrote:
>>>
 So, the above code for min() worked for me fine in general, but there
 was one corner case where it failed.
 Which was when I have something like:
 invoice_id=1, update_time=*2018-01-01 15:00:00.000*
 invoice_id=1, update_time=*2018-01-01 15:00:00.000*
 invoice_id=1, update_time=2018-02-03 14:00:00.000

 In this example, the update_time for 2 records is the exact same. So,
 doing a filter for the min() will result in 2 records for the invoice_id=1.
 This is avoided in your code snippet of row_num - because 2 rows will
 never have row_num = 1

 But note that here - row_num=1 and row_num=2 will be randomly ordered
 (because orderBy is on update_time and they have the same value of
 update_time).
 Hence dropDuplicates can be used there cause it can be either one of
 those rows.

 Overall - dropDuplicates seems like it's meant for cases where you
 literally have redundant duplicated data. And not for filtering to get
 first/last etc.


 On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:

> Hello Abdeali, Thank you for your response.
>
> Can you please explain me this line, And the dropDuplicates at the end
> ensures records with two values for the same 'update_time' don't cause
> issues.
>
> Sorry I didn't get quickly. :)
>
> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
> abdealikoth...@gmail.com> wrote:
>
>> I've faced this issue too - and a colleague pointed me to the
>> documentation -
>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>> dropDuplicates docs does not say that it will guarantee that it will
>> return the "first" record (even if you sort your dataframe)
>> It would give you any record it finds and just ensure that duplicates
>> are not present.
>>
>> The only way I know of how to do this is what you did, but you can
>> avoid the sorting inside the partition with something like (in pyspark):
>>
>> from pyspark.sql import Window, functions as F
>> df = df.withColumn('wanted_time',
>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>
>> The min() is faster than doing an orderBy() and a row_number().
>> And the dropDuplicates at the end ensures records with two values for
>> the same 'update_time' don't cause issues.
>>
>>
>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Dear Spark Users,
>>>
>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>> every time I run it drops different - different rows based on same
>>> timestamp.
>>>
>>> What I tried and worked
>>>
>>> val wSpec = Window.partitionBy($"invoice_
>>> id").orderBy($"update_time".desc)
>>>
>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>> row_number.over(wSpec)).where($"rn" === 1)
>>> .drop("rn").drop("update_time")
>>>
>>> But this is damn slow...
>>>
>>> Can someone please throw a light.
>>>
>>> Thanks
>>>
>>>

-- 
Thanks,
Jason


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
I will get the information and will share with you.

On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
wrote:

> How long does it take to do the window solution ? (Also mention how many
> executors was your spark application using on average during that time)
> I am not aware of anything that is faster. When I ran is on my data ~8-9GB
> I think it took less than 5 mins (don't remember exact time)
>
> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri 
> wrote:
>
>> Thanks for awesome clarification / explanation.
>>
>> I have cases where update_time can be same.
>> I am in need of suggestions, where I have very large data like 5 GB, this
>> window based solution which I mentioned is taking very long time.
>>
>> Thanks again.
>>
>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari 
>> wrote:
>>
>>> So, the above code for min() worked for me fine in general, but there
>>> was one corner case where it failed.
>>> Which was when I have something like:
>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>>
>>> In this example, the update_time for 2 records is the exact same. So,
>>> doing a filter for the min() will result in 2 records for the invoice_id=1.
>>> This is avoided in your code snippet of row_num - because 2 rows will
>>> never have row_num = 1
>>>
>>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>>> (because orderBy is on update_time and they have the same value of
>>> update_time).
>>> Hence dropDuplicates can be used there cause it can be either one of
>>> those rows.
>>>
>>> Overall - dropDuplicates seems like it's meant for cases where you
>>> literally have redundant duplicated data. And not for filtering to get
>>> first/last etc.
>>>
>>>
>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Hello Abdeali, Thank you for your response.

 Can you please explain me this line, And the dropDuplicates at the end
 ensures records with two values for the same 'update_time' don't cause
 issues.

 Sorry I didn't get quickly. :)

 On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
 abdealikoth...@gmail.com> wrote:

> I've faced this issue too - and a colleague pointed me to the
> documentation -
> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
> dropDuplicates docs does not say that it will guarantee that it will
> return the "first" record (even if you sort your dataframe)
> It would give you any record it finds and just ensure that duplicates
> are not present.
>
> The only way I know of how to do this is what you did, but you can
> avoid the sorting inside the partition with something like (in pyspark):
>
> from pyspark.sql import Window, functions as F
> df = df.withColumn('wanted_time',
> F.min('update_time').over(Window.partitionBy('invoice_id')))
> out_df = df.filter(df['update_time'] == df['wanted_time'])
> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>
> The min() is faster than doing an orderBy() and a row_number().
> And the dropDuplicates at the end ensures records with two values for
> the same 'update_time' don't cause issues.
>
>
> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Dear Spark Users,
>>
>> I am using dropDuplicate on a DataFrame generated from large parquet
>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>> every time I run it drops different - different rows based on same
>> timestamp.
>>
>> What I tried and worked
>>
>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>> desc)
>>
>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>> row_number.over(wSpec)).where($"rn" === 1)
>> .drop("rn").drop("update_time")
>>
>> But this is damn slow...
>>
>> Can someone please throw a light.
>>
>> Thanks
>>
>>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Abdeali Kothari
How long does it take to do the window solution ? (Also mention how many
executors was your spark application using on average during that time)
I am not aware of anything that is faster. When I ran is on my data ~8-9GB
I think it took less than 5 mins (don't remember exact time)

On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri 
wrote:

> Thanks for awesome clarification / explanation.
>
> I have cases where update_time can be same.
> I am in need of suggestions, where I have very large data like 5 GB, this
> window based solution which I mentioned is taking very long time.
>
> Thanks again.
>
> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari 
> wrote:
>
>> So, the above code for min() worked for me fine in general, but there was
>> one corner case where it failed.
>> Which was when I have something like:
>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>
>> In this example, the update_time for 2 records is the exact same. So,
>> doing a filter for the min() will result in 2 records for the invoice_id=1.
>> This is avoided in your code snippet of row_num - because 2 rows will
>> never have row_num = 1
>>
>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>> (because orderBy is on update_time and they have the same value of
>> update_time).
>> Hence dropDuplicates can be used there cause it can be either one of
>> those rows.
>>
>> Overall - dropDuplicates seems like it's meant for cases where you
>> literally have redundant duplicated data. And not for filtering to get
>> first/last etc.
>>
>>
>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Abdeali, Thank you for your response.
>>>
>>> Can you please explain me this line, And the dropDuplicates at the end
>>> ensures records with two values for the same 'update_time' don't cause
>>> issues.
>>>
>>> Sorry I didn't get quickly. :)
>>>
>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>> abdealikoth...@gmail.com> wrote:
>>>
 I've faced this issue too - and a colleague pointed me to the
 documentation -
 https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
 dropDuplicates docs does not say that it will guarantee that it will
 return the "first" record (even if you sort your dataframe)
 It would give you any record it finds and just ensure that duplicates
 are not present.

 The only way I know of how to do this is what you did, but you can
 avoid the sorting inside the partition with something like (in pyspark):

 from pyspark.sql import Window, functions as F
 df = df.withColumn('wanted_time',
 F.min('update_time').over(Window.partitionBy('invoice_id')))
 out_df = df.filter(df['update_time'] == df['wanted_time'])
 .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')

 The min() is faster than doing an orderBy() and a row_number().
 And the dropDuplicates at the end ensures records with two values for
 the same 'update_time' don't cause issues.


 On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:

> Hello Dear Spark Users,
>
> I am using dropDuplicate on a DataFrame generated from large parquet
> file from(HDFS) and doing dropDuplicate based on timestamp based column,
> every time I run it drops different - different rows based on same
> timestamp.
>
> What I tried and worked
>
> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
> desc)
>
> val irqDistinctDF = irqFilteredDF.withColumn("rn",
> row_number.over(wSpec)).where($"rn" === 1)
> .drop("rn").drop("update_time")
>
> But this is damn slow...
>
> Can someone please throw a light.
>
> Thanks
>
>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
Thanks for awesome clarification / explanation.

I have cases where update_time can be same.
I am in need of suggestions, where I have very large data like 5 GB, this
window based solution which I mentioned is taking very long time.

Thanks again.

On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari 
wrote:

> So, the above code for min() worked for me fine in general, but there was
> one corner case where it failed.
> Which was when I have something like:
> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
> invoice_id=1, update_time=2018-02-03 14:00:00.000
>
> In this example, the update_time for 2 records is the exact same. So,
> doing a filter for the min() will result in 2 records for the invoice_id=1.
> This is avoided in your code snippet of row_num - because 2 rows will
> never have row_num = 1
>
> But note that here - row_num=1 and row_num=2 will be randomly ordered
> (because orderBy is on update_time and they have the same value of
> update_time).
> Hence dropDuplicates can be used there cause it can be either one of those
> rows.
>
> Overall - dropDuplicates seems like it's meant for cases where you
> literally have redundant duplicated data. And not for filtering to get
> first/last etc.
>
>
> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri 
> wrote:
>
>> Hello Abdeali, Thank you for your response.
>>
>> Can you please explain me this line, And the dropDuplicates at the end
>> ensures records with two values for the same 'update_time' don't cause
>> issues.
>>
>> Sorry I didn't get quickly. :)
>>
>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari 
>> wrote:
>>
>>> I've faced this issue too - and a colleague pointed me to the
>>> documentation -
>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>> dropDuplicates docs does not say that it will guarantee that it will
>>> return the "first" record (even if you sort your dataframe)
>>> It would give you any record it finds and just ensure that duplicates
>>> are not present.
>>>
>>> The only way I know of how to do this is what you did, but you can avoid
>>> the sorting inside the partition with something like (in pyspark):
>>>
>>> from pyspark.sql import Window, functions as F
>>> df = df.withColumn('wanted_time',
>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>
>>> The min() is faster than doing an orderBy() and a row_number().
>>> And the dropDuplicates at the end ensures records with two values for
>>> the same 'update_time' don't cause issues.
>>>
>>>
>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Hello Dear Spark Users,

 I am using dropDuplicate on a DataFrame generated from large parquet
 file from(HDFS) and doing dropDuplicate based on timestamp based column,
 every time I run it drops different - different rows based on same
 timestamp.

 What I tried and worked

 val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
 desc)

 val irqDistinctDF = irqFilteredDF.withColumn("rn",
 row_number.over(wSpec)).where($"rn" === 1)
 .drop("rn").drop("update_time")

 But this is damn slow...

 Can someone please throw a light.

 Thanks




Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Abdeali Kothari
So, the above code for min() worked for me fine in general, but there was
one corner case where it failed.
Which was when I have something like:
invoice_id=1, update_time=*2018-01-01 15:00:00.000*
invoice_id=1, update_time=*2018-01-01 15:00:00.000*
invoice_id=1, update_time=2018-02-03 14:00:00.000

In this example, the update_time for 2 records is the exact same. So, doing
a filter for the min() will result in 2 records for the invoice_id=1.
This is avoided in your code snippet of row_num - because 2 rows will never
have row_num = 1

But note that here - row_num=1 and row_num=2 will be randomly ordered
(because orderBy is on update_time and they have the same value of
update_time).
Hence dropDuplicates can be used there cause it can be either one of those
rows.

Overall - dropDuplicates seems like it's meant for cases where you
literally have redundant duplicated data. And not for filtering to get
first/last etc.


On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri 
wrote:

> Hello Abdeali, Thank you for your response.
>
> Can you please explain me this line, And the dropDuplicates at the end
> ensures records with two values for the same 'update_time' don't cause
> issues.
>
> Sorry I didn't get quickly. :)
>
> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari 
> wrote:
>
>> I've faced this issue too - and a colleague pointed me to the
>> documentation -
>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>> dropDuplicates docs does not say that it will guarantee that it will
>> return the "first" record (even if you sort your dataframe)
>> It would give you any record it finds and just ensure that duplicates are
>> not present.
>>
>> The only way I know of how to do this is what you did, but you can avoid
>> the sorting inside the partition with something like (in pyspark):
>>
>> from pyspark.sql import Window, functions as F
>> df = df.withColumn('wanted_time',
>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>
>> The min() is faster than doing an orderBy() and a row_number().
>> And the dropDuplicates at the end ensures records with two values for the
>> same 'update_time' don't cause issues.
>>
>>
>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Dear Spark Users,
>>>
>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>> every time I run it drops different - different rows based on same
>>> timestamp.
>>>
>>> What I tried and worked
>>>
>>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>>> desc)
>>>
>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>> row_number.over(wSpec)).where($"rn" === 1)
>>> .drop("rn").drop("update_time")
>>>
>>> But this is damn slow...
>>>
>>> Can someone please throw a light.
>>>
>>> Thanks
>>>
>>>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
Hello Abdeali, Thank you for your response.

Can you please explain me this line, And the dropDuplicates at the end
ensures records with two values for the same 'update_time' don't cause
issues.

Sorry I didn't get quickly. :)

On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari 
wrote:

> I've faced this issue too - and a colleague pointed me to the
> documentation -
> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
> dropDuplicates docs does not say that it will guarantee that it will
> return the "first" record (even if you sort your dataframe)
> It would give you any record it finds and just ensure that duplicates are
> not present.
>
> The only way I know of how to do this is what you did, but you can avoid
> the sorting inside the partition with something like (in pyspark):
>
> from pyspark.sql import Window, functions as F
> df = df.withColumn('wanted_time',
> F.min('update_time').over(Window.partitionBy('invoice_id')))
> out_df = df.filter(df['update_time'] == df['wanted_time'])
> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>
> The min() is faster than doing an orderBy() and a row_number().
> And the dropDuplicates at the end ensures records with two values for the
> same 'update_time' don't cause issues.
>
>
> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri 
> wrote:
>
>> Hello Dear Spark Users,
>>
>> I am using dropDuplicate on a DataFrame generated from large parquet file
>> from(HDFS) and doing dropDuplicate based on timestamp based column, every
>> time I run it drops different - different rows based on same timestamp.
>>
>> What I tried and worked
>>
>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>> desc)
>>
>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>> row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time"
>> )
>>
>> But this is damn slow...
>>
>> Can someone please throw a light.
>>
>> Thanks
>>
>>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-03 Thread Abdeali Kothari
I've faced this issue too - and a colleague pointed me to the documentation
-
https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
dropDuplicates docs does not say that it will guarantee that it will return
the "first" record (even if you sort your dataframe)
It would give you any record it finds and just ensure that duplicates are
not present.

The only way I know of how to do this is what you did, but you can avoid
the sorting inside the partition with something like (in pyspark):

from pyspark.sql import Window, functions as F
df = df.withColumn('wanted_time',
F.min('update_time').over(Window.partitionBy('invoice_id')))
out_df = df.filter(df['update_time'] == df['wanted_time'])
.drop('wanted_time').dropDuplicates('invoice_id', 'update_time')

The min() is faster than doing an orderBy() and a row_number().
And the dropDuplicates at the end ensures records with two values for the
same 'update_time' don't cause issues.


On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri 
wrote:

> Hello Dear Spark Users,
>
> I am using dropDuplicate on a DataFrame generated from large parquet file
> from(HDFS) and doing dropDuplicate based on timestamp based column, every
> time I run it drops different - different rows based on same timestamp.
>
> What I tried and worked
>
> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
> desc)
>
> val irqDistinctDF = irqFilteredDF.withColumn("rn",
> row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time")
>
> But this is damn slow...
>
> Can someone please throw a light.
>
> Thanks
>
>


dropDuplicate on timestamp based column unexpected output

2019-04-03 Thread Chetan Khatri
Hello Dear Spark Users,

I am using dropDuplicate on a DataFrame generated from large parquet file
from(HDFS) and doing dropDuplicate based on timestamp based column, every
time I run it drops different - different rows based on same timestamp.

What I tried and worked

val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".desc)

val irqDistinctDF = irqFilteredDF.withColumn("rn",
row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time")

But this is damn slow...

Can someone please throw a light.

Thanks