On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> wrote:
> Hi, > > Could groupBy and withColumn or UDAF work perhaps? I think window could > help here too. > This seems to work, but I do feel vaguely uneasy about it. :) // First add a 'rank' column which is priority order just in case priorities aren't // from 1 with no gaps. val temp1 = data.withColumn("rank", functions.dense_rank() .over(Window.partitionBy("id", "name").orderBy("priority"))) +---+----+-----+------+--------+----+ | id|name|extra| data|priority|rank| +---+----+-----+------+--------+----+ | 1|Fred| 8|value1| 1| 1| | 1|Fred| 8|value8| 2| 2| | 1|Fred| 8|value5| 3| 3| | 2| Amy| 9|value3| 1| 1| | 2| Amy| 9|value5| 2| 2| +---+----+-----+------+--------+----+ // Now move all the columns we want to denormalize into a struct column to keep them together. val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"), temp1("data"), temp1("priority"))) .drop("extra", "data", "priority") +---+----+----+------------+ | id|name|rank| temp_struct| +---+----+----+------------+ | 1|Fred| 1|[8,value1,1]| | 1|Fred| 2|[8,value8,2]| | 1|Fred| 3|[8,value5,3]| | 2| Amy| 1|[9,value3,1]| | 2| Amy| 2|[9,value5,2]| +---+----+----+------------+ // groupBy, again, but now pivot the rank column. We need an aggregate function after pivot, // so use first -- there will only ever be one element. val temp3 = temp2.groupBy("id", "name") .pivot("rank", Seq("1", "2", "3")) .agg(functions.first("temp_struct")) +---+----+------------+------------+------------+ | id|name| 1| 2| 3| +---+----+------------+------------+------------+ | 1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]| | 2| Amy|[9,value3,1]|[9,value5,2]| null| +---+----+------------+------------+------------+ // Now just moving things out of the structs and clean up. val output = temp3.withColumn("extra1", temp3("1").getField("extra")) .withColumn("data1", temp3("1").getField("data")) .withColumn("priority1", temp3("1").getField("priority")) .withColumn("extra2", temp3("2").getField("extra")) .withColumn("data2", temp3("2").getField("data")) .withColumn("priority2", temp3("2").getField("priority")) .withColumn("extra3", temp3("3").getField("extra")) .withColumn("data3", temp3("3").getField("data")) .withColumn("priority3", temp3("3").getField("priority")) .drop("1", "2", "3") +---+----+------+------+---------+------+------+---------+------+------+---------+ | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3| data3|priority3| +---+----+------+------+---------+------+------+---------+------+------+---------+ | 1|Fred| 8|value1| 1| 8|value8| 2| 8|value5| 3| | 2| Amy| 9|value3| 1| 9|value5| 2| null| null| null| +---+----+------+------+---------+------+------+---------+------+------+---------+ > > Jacek > > On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid> > wrote: > >> Hi, >> >> I'm trying to un-explode or denormalize a table like >> >> +---+----+-----+------+--------+ >> |id |name|extra|data |priority| >> +---+----+-----+------+--------+ >> |1 |Fred|8 |value1|1 | >> |1 |Fred|8 |value8|2 | >> |1 |Fred|8 |value5|3 | >> |2 |Amy |9 |value3|1 | >> |2 |Amy |9 |value5|2 | >> +---+----+-----+------+--------+ >> >> into something that looks like >> >> +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3 >> |priority3| >> +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> |1 |Fred|8 |value1|1 |8 |value8|2 |8 |value5|3 >> | >> |2 |Amy |9 |value3|1 |9 |value5|2 |null |null >> |null | >> +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> >> If I were going the other direction, I'd create a new column with an >> array of structs, each with 'extra', 'data', and 'priority' fields and then >> explode it. >> >> Going from the more normalized view, though, I'm having a harder time. >> >> I want to group or partition by (id, name) and order by priority, but >> after that I can't figure out how to get multiple rows rotated into one. >> >> Any ideas? >> >> Here's the code to create the input table above: >> >> import org.apache.spark.sql.Row >> import org.apache.spark.sql.Dataset >> import org.apache.spark.sql.types._ >> >> val rowsRDD = sc.parallelize(Seq( >> Row(1, "Fred", 8, "value1", 1), >> Row(1, "Fred", 8, "value8", 2), >> Row(1, "Fred", 8, "value5", 3), >> Row(2, "Amy", 9, "value3", 1), >> Row(2, "Amy", 9, "value5", 2))) >> >> val schema = StructType(Seq( >> StructField("id", IntegerType, nullable = true), >> StructField("name", StringType, nullable = true), >> StructField("extra", IntegerType, nullable = true), >> StructField("data", StringType, nullable = true), >> StructField("priority", IntegerType, nullable = true))) >> >> val data = sqlContext.createDataFrame(rowsRDD, schema) >> >> >> >>