Re: High level explanation of dropDuplicates

2020-01-11 Thread Miguel Morales
I would just map to pair using the id. Then do a reduceByKey where you compare 
the scores and keep the highest. Then do .values and that should do it.

Sent from my iPhone

> On Jan 11, 2020, at 11:14 AM, Rishi Shah  wrote:
> 
> 
> Thanks everyone for your contribution on this topic, I wanted to check-in to 
> see if anyone has discovered a different or have an opinion on better 
> approach to deduplicating data using pyspark. Would really appreciate any 
> further insight on this.
> 
> Thanks,
> -Rishi
> 
>> On Wed, Jun 12, 2019 at 4:21 PM Yeikel  wrote:
>> Nicholas , thank you for your explanation. 
>> 
>> I am also interested in the example that Rishi is asking for.  I am sure
>> mapPartitions may work , but as Vladimir suggests it may not be the best
>> option in terms of performance. 
>> 
>> @Vladimir Prus , are you aware of any example about writing a  "custom
>> physical exec operator"? 
>> 
>> If anyone needs a further explanation for the follow up  question Rishi 
>> posted , please see the example below : 
>> 
>> 
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.Row
>> 
>> 
>> val someData = Seq(
>>   Row(1, 10),
>>   Row(1, 20),
>>   Row(1, 11)
>> )
>> 
>> val schema = List(
>>   StructField("id", IntegerType, true),
>>   StructField("score", IntegerType, true)
>> )
>> 
>> val df = spark.createDataFrame(
>>   spark.sparkContext.parallelize(someData),
>>   StructType(schema)
>> )
>> 
>> // Goal : Drop duplicates using the "id" as the primary key and keep the
>> highest "score".
>> 
>> df.sort($"score".desc).dropDuplicates("id").show
>> 
>> == Physical Plan ==
>> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
>> +- Exchange hashpartitioning(id#191, 200)
>>+- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
>> false)])
>>   +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
>>  +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
>> +- Scan ExistingRDD[id#191,score#192]
>> 
>> This seems to work , but I don't know what are the implications if we use
>> this approach with a bigger dataset or what are the alternatives. From the
>> explain output I can see the two Exchanges , so it may not be the best
>> approach? 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> 
> -- 
> Regards,
> 
> Rishi Shah


Re: High level explanation of dropDuplicates

2020-01-11 Thread Rishi Shah
Thanks everyone for your contribution on this topic, I wanted to check-in
to see if anyone has discovered a different or have an opinion on better
approach to deduplicating data using pyspark. Would really appreciate any
further insight on this.

Thanks,
-Rishi

On Wed, Jun 12, 2019 at 4:21 PM Yeikel  wrote:

> Nicholas , thank you for your explanation.
>
> I am also interested in the example that Rishi is asking for.  I am sure
> mapPartitions may work , but as Vladimir suggests it may not be the best
> option in terms of performance.
>
> @Vladimir Prus , are you aware of any example about writing a  "custom
> physical exec operator"?
>
> If anyone needs a further explanation for the follow up  question Rishi
> posted , please see the example below :
>
>
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.Row
>
>
> val someData = Seq(
>   Row(1, 10),
>   Row(1, 20),
>   Row(1, 11)
> )
>
> val schema = List(
>   StructField("id", IntegerType, true),
>   StructField("score", IntegerType, true)
> )
>
> val df = spark.createDataFrame(
>   spark.sparkContext.parallelize(someData),
>   StructType(schema)
> )
>
> // Goal : Drop duplicates using the "id" as the primary key and keep the
> highest "score".
>
> df.sort($"score".desc).dropDuplicates("id").show
>
> == Physical Plan ==
> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
> +- Exchange hashpartitioning(id#191, 200)
>+- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
> false)])
>   +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
>  +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
> +- Scan ExistingRDD[id#191,score#192]
>
> This seems to work , but I don't know what are the implications if we use
> this approach with a bigger dataset or what are the alternatives. From the
> explain output I can see the two Exchanges , so it may not be the best
> approach?
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Regards,

Rishi Shah


Re: High level explanation of dropDuplicates

2019-06-12 Thread Yeikel
Nicholas , thank you for your explanation. 

I am also interested in the example that Rishi is asking for.  I am sure
mapPartitions may work , but as Vladimir suggests it may not be the best
option in terms of performance. 

@Vladimir Prus , are you aware of any example about writing a  "custom
physical exec operator"? 

If anyone needs a further explanation for the follow up  question Rishi 
posted , please see the example below : 


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


val someData = Seq(
  Row(1, 10),
  Row(1, 20),
  Row(1, 11)
)

val schema = List(
  StructField("id", IntegerType, true),
  StructField("score", IntegerType, true)
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(schema)
)

// Goal : Drop duplicates using the "id" as the primary key and keep the
highest "score".

df.sort($"score".desc).dropDuplicates("id").show

== Physical Plan ==
*(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
+- Exchange hashpartitioning(id#191, 200)
   +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
false)])
  +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
 +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
+- Scan ExistingRDD[id#191,score#192]

This seems to work , but I don't know what are the implications if we use
this approach with a bigger dataset or what are the alternatives. From the
explain output I can see the two Exchanges , so it may not be the best
approach? 







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: High level explanation of dropDuplicates

2019-06-12 Thread Vladimir Prus
Hi,

If your data frame is partitioned by column A, and you want deduplication
by columns A, B and C, then a faster way might be to sort each partition by
A, B and C and then do a linear scan - it is often faster than group by all
columns - which require a shuffle. Sadly, there's no standard way to do it.

One way to do it is via mapPartitions, but that involves serialisation
to/from Row. The best way is to write custom physical exec operator, but
it's not entirely trivial.

On Mon, 10 Jun 2019, 06:00 Rishi Shah,  wrote:

> Hi All,
>
> Just wanted to check back regarding best way to perform deduplication. Is
> using drop duplicates the optimal way to get rid of duplicates? Would it be
> better if we run operations on red directly?
>
> Also what about if we want to keep the last value of the group while
> performing deduplication (based on some sorting criteria)?
>
> Thanks,
> Rishi
>
> On Mon, May 20, 2019 at 3:33 PM Nicholas Hakobian <
> nicholas.hakob...@rallyhealth.com> wrote:
>
>> From doing some searching around in the spark codebase, I found the
>> following:
>>
>>
>> https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474
>>
>> So it appears there is no direct operation called dropDuplicates or
>> Deduplicate, but there is an optimizer rule that converts this logical
>> operation to a physical operation that is equivalent to grouping by all the
>> columns you want to deduplicate across (or all columns if you are doing
>> something like distinct), and taking the First() value. So (using a pySpark
>> code example):
>>
>> df = input_df.dropDuplicates(['col1', 'col2'])
>>
>> Is effectively shorthand for saying something like:
>>
>> df = input_df.groupBy('col1',
>> 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')
>>
>> Except I assume that it has some internal optimization so it doesn't need
>> to pack/unpack the column data, and just returns the whole Row.
>>
>> Nicholas Szandor Hakobian, Ph.D.
>> Principal Data Scientist
>> Rally Health
>> nicholas.hakob...@rallyhealth.com
>>
>>
>>
>> On Mon, May 20, 2019 at 11:38 AM Yeikel  wrote:
>>
>>> Hi ,
>>>
>>> I am looking for a high level explanation(overview) on how
>>> dropDuplicates[1]
>>> works.
>>>
>>> [1]
>>>
>>> https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326
>>>
>>> Could someone please explain?
>>>
>>> Thank you
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>
> --
> Regards,
>
> Rishi Shah
>


Re: High level explanation of dropDuplicates

2019-06-09 Thread Rishi Shah
Hi All,

Just wanted to check back regarding best way to perform deduplication. Is
using drop duplicates the optimal way to get rid of duplicates? Would it be
better if we run operations on red directly?

Also what about if we want to keep the last value of the group while
performing deduplication (based on some sorting criteria)?

Thanks,
Rishi

On Mon, May 20, 2019 at 3:33 PM Nicholas Hakobian <
nicholas.hakob...@rallyhealth.com> wrote:

> From doing some searching around in the spark codebase, I found the
> following:
>
>
> https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474
>
> So it appears there is no direct operation called dropDuplicates or
> Deduplicate, but there is an optimizer rule that converts this logical
> operation to a physical operation that is equivalent to grouping by all the
> columns you want to deduplicate across (or all columns if you are doing
> something like distinct), and taking the First() value. So (using a pySpark
> code example):
>
> df = input_df.dropDuplicates(['col1', 'col2'])
>
> Is effectively shorthand for saying something like:
>
> df = input_df.groupBy('col1',
> 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')
>
> Except I assume that it has some internal optimization so it doesn't need
> to pack/unpack the column data, and just returns the whole Row.
>
> Nicholas Szandor Hakobian, Ph.D.
> Principal Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
>
>
>
> On Mon, May 20, 2019 at 11:38 AM Yeikel  wrote:
>
>> Hi ,
>>
>> I am looking for a high level explanation(overview) on how
>> dropDuplicates[1]
>> works.
>>
>> [1]
>>
>> https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326
>>
>> Could someone please explain?
>>
>> Thank you
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 
Regards,

Rishi Shah


Re: High level explanation of dropDuplicates

2019-05-20 Thread Nicholas Hakobian
>From doing some searching around in the spark codebase, I found the
following:

https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474

So it appears there is no direct operation called dropDuplicates or
Deduplicate, but there is an optimizer rule that converts this logical
operation to a physical operation that is equivalent to grouping by all the
columns you want to deduplicate across (or all columns if you are doing
something like distinct), and taking the First() value. So (using a pySpark
code example):

df = input_df.dropDuplicates(['col1', 'col2'])

Is effectively shorthand for saying something like:

df = input_df.groupBy('col1',
'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')

Except I assume that it has some internal optimization so it doesn't need
to pack/unpack the column data, and just returns the whole Row.

Nicholas Szandor Hakobian, Ph.D.
Principal Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com



On Mon, May 20, 2019 at 11:38 AM Yeikel  wrote:

> Hi ,
>
> I am looking for a high level explanation(overview) on how
> dropDuplicates[1]
> works.
>
> [1]
>
> https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326
>
> Could someone please explain?
>
> Thank you
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


High level explanation of dropDuplicates

2019-05-20 Thread Yeikel
Hi , 

I am looking for a high level explanation(overview) on how dropDuplicates[1]
works. 

[1]
https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326

Could someone please explain?

Thank you



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org