Hi, Could groupBy and withColumn or UDAF work perhaps? I think window could help here too.
Jacek On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid> wrote: > Hi, > > I'm trying to un-explode or denormalize a table like > > +---+----+-----+------+--------+ > |id |name|extra|data |priority| > +---+----+-----+------+--------+ > |1 |Fred|8 |value1|1 | > |1 |Fred|8 |value8|2 | > |1 |Fred|8 |value5|3 | > |2 |Amy |9 |value3|1 | > |2 |Amy |9 |value5|2 | > +---+----+-----+------+--------+ > > into something that looks like > > +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3 > |priority3| > +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > |1 |Fred|8 |value1|1 |8 |value8|2 |8 |value5|3 > | > |2 |Amy |9 |value3|1 |9 |value5|2 |null |null > |null | > +---+----+------+------+---------+------+------+---------+-- > ----+------+---------+ > > If I were going the other direction, I'd create a new column with an array > of structs, each with 'extra', 'data', and 'priority' fields and then > explode it. > > Going from the more normalized view, though, I'm having a harder time. > > I want to group or partition by (id, name) and order by priority, but > after that I can't figure out how to get multiple rows rotated into one. > > Any ideas? > > Here's the code to create the input table above: > > import org.apache.spark.sql.Row > import org.apache.spark.sql.Dataset > import org.apache.spark.sql.types._ > > val rowsRDD = sc.parallelize(Seq( > Row(1, "Fred", 8, "value1", 1), > Row(1, "Fred", 8, "value8", 2), > Row(1, "Fred", 8, "value5", 3), > Row(2, "Amy", 9, "value3", 1), > Row(2, "Amy", 9, "value5", 2))) > > val schema = StructType(Seq( > StructField("id", IntegerType, nullable = true), > StructField("name", StringType, nullable = true), > StructField("extra", IntegerType, nullable = true), > StructField("data", StringType, nullable = true), > StructField("priority", IntegerType, nullable = true))) > > val data = sqlContext.createDataFrame(rowsRDD, schema) > > > >