Re: High level explanation of dropDuplicates
I would just map to pair using the id. Then do a reduceByKey where you compare the scores and keep the highest. Then do .values and that should do it. Sent from my iPhone > On Jan 11, 2020, at 11:14 AM, Rishi Shah wrote: > > > Thanks everyone for your contribution on this topic, I wanted to check-in to > see if anyone has discovered a different or have an opinion on better > approach to deduplicating data using pyspark. Would really appreciate any > further insight on this. > > Thanks, > -Rishi > >> On Wed, Jun 12, 2019 at 4:21 PM Yeikel wrote: >> Nicholas , thank you for your explanation. >> >> I am also interested in the example that Rishi is asking for. I am sure >> mapPartitions may work , but as Vladimir suggests it may not be the best >> option in terms of performance. >> >> @Vladimir Prus , are you aware of any example about writing a "custom >> physical exec operator"? >> >> If anyone needs a further explanation for the follow up question Rishi >> posted , please see the example below : >> >> >> import org.apache.spark.sql.types._ >> import org.apache.spark.sql.Row >> >> >> val someData = Seq( >> Row(1, 10), >> Row(1, 20), >> Row(1, 11) >> ) >> >> val schema = List( >> StructField("id", IntegerType, true), >> StructField("score", IntegerType, true) >> ) >> >> val df = spark.createDataFrame( >> spark.sparkContext.parallelize(someData), >> StructType(schema) >> ) >> >> // Goal : Drop duplicates using the "id" as the primary key and keep the >> highest "score". >> >> df.sort($"score".desc).dropDuplicates("id").show >> >> == Physical Plan == >> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)]) >> +- Exchange hashpartitioning(id#191, 200) >>+- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192, >> false)]) >> +- *(1) Sort [score#192 DESC NULLS LAST], true, 0 >> +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200) >> +- Scan ExistingRDD[id#191,score#192] >> >> This seems to work , but I don't know what are the implications if we use >> this approach with a bigger dataset or what are the alternatives. From the >> explain output I can see the two Exchanges , so it may not be the best >> approach? >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > > > -- > Regards, > > Rishi Shah
Re: High level explanation of dropDuplicates
Thanks everyone for your contribution on this topic, I wanted to check-in to see if anyone has discovered a different or have an opinion on better approach to deduplicating data using pyspark. Would really appreciate any further insight on this. Thanks, -Rishi On Wed, Jun 12, 2019 at 4:21 PM Yeikel wrote: > Nicholas , thank you for your explanation. > > I am also interested in the example that Rishi is asking for. I am sure > mapPartitions may work , but as Vladimir suggests it may not be the best > option in terms of performance. > > @Vladimir Prus , are you aware of any example about writing a "custom > physical exec operator"? > > If anyone needs a further explanation for the follow up question Rishi > posted , please see the example below : > > > import org.apache.spark.sql.types._ > import org.apache.spark.sql.Row > > > val someData = Seq( > Row(1, 10), > Row(1, 20), > Row(1, 11) > ) > > val schema = List( > StructField("id", IntegerType, true), > StructField("score", IntegerType, true) > ) > > val df = spark.createDataFrame( > spark.sparkContext.parallelize(someData), > StructType(schema) > ) > > // Goal : Drop duplicates using the "id" as the primary key and keep the > highest "score". > > df.sort($"score".desc).dropDuplicates("id").show > > == Physical Plan == > *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)]) > +- Exchange hashpartitioning(id#191, 200) >+- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192, > false)]) > +- *(1) Sort [score#192 DESC NULLS LAST], true, 0 > +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200) > +- Scan ExistingRDD[id#191,score#192] > > This seems to work , but I don't know what are the implications if we use > this approach with a bigger dataset or what are the alternatives. From the > explain output I can see the two Exchanges , so it may not be the best > approach? > > > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Regards, Rishi Shah
Re: High level explanation of dropDuplicates
Nicholas , thank you for your explanation. I am also interested in the example that Rishi is asking for. I am sure mapPartitions may work , but as Vladimir suggests it may not be the best option in terms of performance. @Vladimir Prus , are you aware of any example about writing a "custom physical exec operator"? If anyone needs a further explanation for the follow up question Rishi posted , please see the example below : import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val someData = Seq( Row(1, 10), Row(1, 20), Row(1, 11) ) val schema = List( StructField("id", IntegerType, true), StructField("score", IntegerType, true) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(someData), StructType(schema) ) // Goal : Drop duplicates using the "id" as the primary key and keep the highest "score". df.sort($"score".desc).dropDuplicates("id").show == Physical Plan == *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)]) +- Exchange hashpartitioning(id#191, 200) +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192, false)]) +- *(1) Sort [score#192 DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200) +- Scan ExistingRDD[id#191,score#192] This seems to work , but I don't know what are the implications if we use this approach with a bigger dataset or what are the alternatives. From the explain output I can see the two Exchanges , so it may not be the best approach? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: High level explanation of dropDuplicates
Hi, If your data frame is partitioned by column A, and you want deduplication by columns A, B and C, then a faster way might be to sort each partition by A, B and C and then do a linear scan - it is often faster than group by all columns - which require a shuffle. Sadly, there's no standard way to do it. One way to do it is via mapPartitions, but that involves serialisation to/from Row. The best way is to write custom physical exec operator, but it's not entirely trivial. On Mon, 10 Jun 2019, 06:00 Rishi Shah, wrote: > Hi All, > > Just wanted to check back regarding best way to perform deduplication. Is > using drop duplicates the optimal way to get rid of duplicates? Would it be > better if we run operations on red directly? > > Also what about if we want to keep the last value of the group while > performing deduplication (based on some sorting criteria)? > > Thanks, > Rishi > > On Mon, May 20, 2019 at 3:33 PM Nicholas Hakobian < > nicholas.hakob...@rallyhealth.com> wrote: > >> From doing some searching around in the spark codebase, I found the >> following: >> >> >> https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474 >> >> So it appears there is no direct operation called dropDuplicates or >> Deduplicate, but there is an optimizer rule that converts this logical >> operation to a physical operation that is equivalent to grouping by all the >> columns you want to deduplicate across (or all columns if you are doing >> something like distinct), and taking the First() value. So (using a pySpark >> code example): >> >> df = input_df.dropDuplicates(['col1', 'col2']) >> >> Is effectively shorthand for saying something like: >> >> df = input_df.groupBy('col1', >> 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*') >> >> Except I assume that it has some internal optimization so it doesn't need >> to pack/unpack the column data, and just returns the whole Row. >> >> Nicholas Szandor Hakobian, Ph.D. >> Principal Data Scientist >> Rally Health >> nicholas.hakob...@rallyhealth.com >> >> >> >> On Mon, May 20, 2019 at 11:38 AM Yeikel wrote: >> >>> Hi , >>> >>> I am looking for a high level explanation(overview) on how >>> dropDuplicates[1] >>> works. >>> >>> [1] >>> >>> https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326 >>> >>> Could someone please explain? >>> >>> Thank you >>> >>> >>> >>> -- >>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> > > -- > Regards, > > Rishi Shah >
Re: High level explanation of dropDuplicates
Hi All, Just wanted to check back regarding best way to perform deduplication. Is using drop duplicates the optimal way to get rid of duplicates? Would it be better if we run operations on red directly? Also what about if we want to keep the last value of the group while performing deduplication (based on some sorting criteria)? Thanks, Rishi On Mon, May 20, 2019 at 3:33 PM Nicholas Hakobian < nicholas.hakob...@rallyhealth.com> wrote: > From doing some searching around in the spark codebase, I found the > following: > > > https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474 > > So it appears there is no direct operation called dropDuplicates or > Deduplicate, but there is an optimizer rule that converts this logical > operation to a physical operation that is equivalent to grouping by all the > columns you want to deduplicate across (or all columns if you are doing > something like distinct), and taking the First() value. So (using a pySpark > code example): > > df = input_df.dropDuplicates(['col1', 'col2']) > > Is effectively shorthand for saying something like: > > df = input_df.groupBy('col1', > 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*') > > Except I assume that it has some internal optimization so it doesn't need > to pack/unpack the column data, and just returns the whole Row. > > Nicholas Szandor Hakobian, Ph.D. > Principal Data Scientist > Rally Health > nicholas.hakob...@rallyhealth.com > > > > On Mon, May 20, 2019 at 11:38 AM Yeikel wrote: > >> Hi , >> >> I am looking for a high level explanation(overview) on how >> dropDuplicates[1] >> works. >> >> [1] >> >> https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326 >> >> Could someone please explain? >> >> Thank you >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Regards, Rishi Shah
Re: High level explanation of dropDuplicates
>From doing some searching around in the spark codebase, I found the following: https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474 So it appears there is no direct operation called dropDuplicates or Deduplicate, but there is an optimizer rule that converts this logical operation to a physical operation that is equivalent to grouping by all the columns you want to deduplicate across (or all columns if you are doing something like distinct), and taking the First() value. So (using a pySpark code example): df = input_df.dropDuplicates(['col1', 'col2']) Is effectively shorthand for saying something like: df = input_df.groupBy('col1', 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*') Except I assume that it has some internal optimization so it doesn't need to pack/unpack the column data, and just returns the whole Row. Nicholas Szandor Hakobian, Ph.D. Principal Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Mon, May 20, 2019 at 11:38 AM Yeikel wrote: > Hi , > > I am looking for a high level explanation(overview) on how > dropDuplicates[1] > works. > > [1] > > https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326 > > Could someone please explain? > > Thank you > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
High level explanation of dropDuplicates
Hi , I am looking for a high level explanation(overview) on how dropDuplicates[1] works. [1] https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326 Could someone please explain? Thank you -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org