On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Could groupBy and withColumn or UDAF work perhaps? I think window could
> help here too.
>

This seems to work, but I do feel vaguely uneasy about it. :)

// First add a 'rank' column which is priority order just in case
priorities aren't
// from 1 with no gaps.
val temp1 = data.withColumn("rank", functions.dense_rank()
   .over(Window.partitionBy("id", "name").orderBy("priority")))

+---+----+-----+------+--------+----+
| id|name|extra|  data|priority|rank|
+---+----+-----+------+--------+----+
|  1|Fred|    8|value1|       1|   1|
|  1|Fred|    8|value8|       2|   2|
|  1|Fred|    8|value5|       3|   3|
|  2| Amy|    9|value3|       1|   1|
|  2| Amy|    9|value5|       2|   2|
+---+----+-----+------+--------+----+

// Now move all the columns we want to denormalize into a struct column to
keep them together.
val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
temp1("data"), temp1("priority")))
  .drop("extra", "data", "priority")

+---+----+----+------------+
| id|name|rank| temp_struct|
+---+----+----+------------+
|  1|Fred|   1|[8,value1,1]|
|  1|Fred|   2|[8,value8,2]|
|  1|Fred|   3|[8,value5,3]|
|  2| Amy|   1|[9,value3,1]|
|  2| Amy|   2|[9,value5,2]|
+---+----+----+------------+

// groupBy, again, but now pivot the rank column. We need an aggregate
function after pivot,
// so use first -- there will only ever be one element.
val temp3 = temp2.groupBy("id", "name")
  .pivot("rank", Seq("1", "2", "3"))
  .agg(functions.first("temp_struct"))

+---+----+------------+------------+------------+
| id|name|           1|           2|           3|
+---+----+------------+------------+------------+
|  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
|  2| Amy|[9,value3,1]|[9,value5,2]|        null|
+---+----+------------+------------+------------+

// Now just moving things out of the structs and clean up.
val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
     .withColumn("data1", temp3("1").getField("data"))
     .withColumn("priority1", temp3("1").getField("priority"))
     .withColumn("extra2", temp3("2").getField("extra"))
     .withColumn("data2", temp3("2").getField("data"))
     .withColumn("priority2", temp3("2").getField("priority"))
     .withColumn("extra3", temp3("3").getField("extra"))
     .withColumn("data3", temp3("3").getField("data"))
     .withColumn("priority3", temp3("3").getField("priority"))
     .drop("1", "2", "3")

+---+----+------+------+---------+------+------+---------+------+------+---------+
| id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
data3|priority3|
+---+----+------+------+---------+------+------+---------+------+------+---------+
|  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
     3|
|  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
  null|
+---+----+------+------+---------+------+------+---------+------+------+---------+








>
> Jacek
>
> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid>
> wrote:
>
>> Hi,
>>
>> I'm trying to un-explode or denormalize a table like
>>
>> +---+----+-----+------+--------+
>> |id |name|extra|data  |priority|
>> +---+----+-----+------+--------+
>> |1  |Fred|8    |value1|1       |
>> |1  |Fred|8    |value8|2       |
>> |1  |Fred|8    |value5|3       |
>> |2  |Amy |9    |value3|1       |
>> |2  |Amy |9    |value5|2       |
>> +---+----+-----+------+--------+
>>
>> into something that looks like
>>
>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> |priority3|
>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> |1  |Fred|8     |value1|1        |8     |value8|2        |8     |value5|3
>>        |
>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>>  |null     |
>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>>
>> If I were going the other direction, I'd create a new column with an
>> array of structs, each with 'extra', 'data', and 'priority' fields and then
>> explode it.
>>
>> Going from the more normalized view, though, I'm having a harder time.
>>
>> I want to group or partition by (id, name) and order by priority, but
>> after that I can't figure out how to get multiple rows rotated into one.
>>
>> Any ideas?
>>
>> Here's the code to create the input table above:
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>>     Row(1, "Fred", 8, "value1", 1),
>>     Row(1, "Fred", 8, "value8", 2),
>>     Row(1, "Fred", 8, "value5", 3),
>>     Row(2, "Amy", 9, "value3", 1),
>>     Row(2, "Amy", 9, "value5", 2)))
>>
>> val schema = StructType(Seq(
>>     StructField("id", IntegerType, nullable = true),
>>     StructField("name", StringType, nullable = true),
>>     StructField("extra", IntegerType, nullable = true),
>>     StructField("data", StringType, nullable = true),
>>     StructField("priority", IntegerType, nullable = true)))
>>
>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>
>>
>>
>>

Reply via email to