I think the fastest way is likely to use a combination of conditionals (when / otherwise), first (ignoring nulls), while grouping by the id. This should get the answer with only a single shuffle.
Here is an example <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html> . On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi Everett, > > That's pretty much what I'd do. Can't think of a way to beat your > solution. Why do you "feel vaguely uneasy about it"? > > I'd also check out the execution plan (with explain) to see how it's > gonna work at runtime. I may have seen groupBy + join be better than > window (there were more exchanges in play for windows I reckon). > > Pozdrawiam, > Jacek Laskowski > ---- > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com> > wrote: > > > > > > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> > wrote: > >> > >> Hi, > >> > >> Could groupBy and withColumn or UDAF work perhaps? I think window could > >> help here too. > > > > > > This seems to work, but I do feel vaguely uneasy about it. :) > > > > // First add a 'rank' column which is priority order just in case > priorities > > aren't > > // from 1 with no gaps. > > val temp1 = data.withColumn("rank", functions.dense_rank() > > .over(Window.partitionBy("id", "name").orderBy("priority"))) > > > > +---+----+-----+------+--------+----+ > > | id|name|extra| data|priority|rank| > > +---+----+-----+------+--------+----+ > > | 1|Fred| 8|value1| 1| 1| > > | 1|Fred| 8|value8| 2| 2| > > | 1|Fred| 8|value5| 3| 3| > > | 2| Amy| 9|value3| 1| 1| > > | 2| Amy| 9|value5| 2| 2| > > +---+----+-----+------+--------+----+ > > > > // Now move all the columns we want to denormalize into a struct column > to > > keep them together. > > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"), > > temp1("data"), temp1("priority"))) > > .drop("extra", "data", "priority") > > > > +---+----+----+------------+ > > | id|name|rank| temp_struct| > > +---+----+----+------------+ > > | 1|Fred| 1|[8,value1,1]| > > | 1|Fred| 2|[8,value8,2]| > > | 1|Fred| 3|[8,value5,3]| > > | 2| Amy| 1|[9,value3,1]| > > | 2| Amy| 2|[9,value5,2]| > > +---+----+----+------------+ > > > > // groupBy, again, but now pivot the rank column. We need an aggregate > > function after pivot, > > // so use first -- there will only ever be one element. > > val temp3 = temp2.groupBy("id", "name") > > .pivot("rank", Seq("1", "2", "3")) > > .agg(functions.first("temp_struct")) > > > > +---+----+------------+------------+------------+ > > | id|name| 1| 2| 3| > > +---+----+------------+------------+------------+ > > | 1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]| > > | 2| Amy|[9,value3,1]|[9,value5,2]| null| > > +---+----+------------+------------+------------+ > > > > // Now just moving things out of the structs and clean up. > > val output = temp3.withColumn("extra1", temp3("1").getField("extra")) > > .withColumn("data1", temp3("1").getField("data")) > > .withColumn("priority1", temp3("1").getField("priority")) > > .withColumn("extra2", temp3("2").getField("extra")) > > .withColumn("data2", temp3("2").getField("data")) > > .withColumn("priority2", temp3("2").getField("priority")) > > .withColumn("extra3", temp3("3").getField("extra")) > > .withColumn("data3", temp3("3").getField("data")) > > .withColumn("priority3", temp3("3").getField("priority")) > > .drop("1", "2", "3") > > > > +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3| > > data3|priority3| > > +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > > | 1|Fred| 8|value1| 1| 8|value8| 2| 8|value5| > > 3| > > | 2| Amy| 9|value3| 1| 9|value5| 2| null| null| > > null| > > +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > > > > > > > > > > > > > > > >> > >> > >> Jacek > >> > >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid> > >> wrote: > >>> > >>> Hi, > >>> > >>> I'm trying to un-explode or denormalize a table like > >>> > >>> +---+----+-----+------+--------+ > >>> |id |name|extra|data |priority| > >>> +---+----+-----+------+--------+ > >>> |1 |Fred|8 |value1|1 | > >>> |1 |Fred|8 |value8|2 | > >>> |1 |Fred|8 |value5|3 | > >>> |2 |Amy |9 |value3|1 | > >>> |2 |Amy |9 |value5|2 | > >>> +---+----+-----+------+--------+ > >>> > >>> into something that looks like > >>> > >>> > >>> +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3 > >>> |priority3| > >>> > >>> +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > >>> |1 |Fred|8 |value1|1 |8 |value8|2 |8 > |value5|3 > >>> | > >>> |2 |Amy |9 |value3|1 |9 |value5|2 |null |null > >>> |null | > >>> > >>> +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > >>> > >>> If I were going the other direction, I'd create a new column with an > >>> array of structs, each with 'extra', 'data', and 'priority' fields and > then > >>> explode it. > >>> > >>> Going from the more normalized view, though, I'm having a harder time. > >>> > >>> I want to group or partition by (id, name) and order by priority, but > >>> after that I can't figure out how to get multiple rows rotated into > one. > >>> > >>> Any ideas? > >>> > >>> Here's the code to create the input table above: > >>> > >>> import org.apache.spark.sql.Row > >>> import org.apache.spark.sql.Dataset > >>> import org.apache.spark.sql.types._ > >>> > >>> val rowsRDD = sc.parallelize(Seq( > >>> Row(1, "Fred", 8, "value1", 1), > >>> Row(1, "Fred", 8, "value8", 2), > >>> Row(1, "Fred", 8, "value5", 3), > >>> Row(2, "Amy", 9, "value3", 1), > >>> Row(2, "Amy", 9, "value5", 2))) > >>> > >>> val schema = StructType(Seq( > >>> StructField("id", IntegerType, nullable = true), > >>> StructField("name", StringType, nullable = true), > >>> StructField("extra", IntegerType, nullable = true), > >>> StructField("data", StringType, nullable = true), > >>> StructField("priority", IntegerType, nullable = true))) > >>> > >>> val data = sqlContext.createDataFrame(rowsRDD, schema) > >>> > >>> > >>> > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >