Hi Everett,

That's pretty much what I'd do. Can't think of a way to beat your
solution. Why do you "feel vaguely uneasy about it"?

I'd also check out the execution plan (with explain) to see how it's
gonna work at runtime. I may have seen groupBy + join be better than
window (there were more exchanges in play for windows I reckon).

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com> wrote:
>
>
> On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi,
>>
>> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> help here too.
>
>
> This seems to work, but I do feel vaguely uneasy about it. :)
>
> // First add a 'rank' column which is priority order just in case priorities
> aren't
> // from 1 with no gaps.
> val temp1 = data.withColumn("rank", functions.dense_rank()
>    .over(Window.partitionBy("id", "name").orderBy("priority")))
>
> +---+----+-----+------+--------+----+
> | id|name|extra|  data|priority|rank|
> +---+----+-----+------+--------+----+
> |  1|Fred|    8|value1|       1|   1|
> |  1|Fred|    8|value8|       2|   2|
> |  1|Fred|    8|value5|       3|   3|
> |  2| Amy|    9|value3|       1|   1|
> |  2| Amy|    9|value5|       2|   2|
> +---+----+-----+------+--------+----+
>
> // Now move all the columns we want to denormalize into a struct column to
> keep them together.
> val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> temp1("data"), temp1("priority")))
>   .drop("extra", "data", "priority")
>
> +---+----+----+------------+
> | id|name|rank| temp_struct|
> +---+----+----+------------+
> |  1|Fred|   1|[8,value1,1]|
> |  1|Fred|   2|[8,value8,2]|
> |  1|Fred|   3|[8,value5,3]|
> |  2| Amy|   1|[9,value3,1]|
> |  2| Amy|   2|[9,value5,2]|
> +---+----+----+------------+
>
> // groupBy, again, but now pivot the rank column. We need an aggregate
> function after pivot,
> // so use first -- there will only ever be one element.
> val temp3 = temp2.groupBy("id", "name")
>   .pivot("rank", Seq("1", "2", "3"))
>   .agg(functions.first("temp_struct"))
>
> +---+----+------------+------------+------------+
> | id|name|           1|           2|           3|
> +---+----+------------+------------+------------+
> |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
> +---+----+------------+------------+------------+
>
> // Now just moving things out of the structs and clean up.
> val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>      .withColumn("data1", temp3("1").getField("data"))
>      .withColumn("priority1", temp3("1").getField("priority"))
>      .withColumn("extra2", temp3("2").getField("extra"))
>      .withColumn("data2", temp3("2").getField("data"))
>      .withColumn("priority2", temp3("2").getField("priority"))
>      .withColumn("extra3", temp3("3").getField("extra"))
>      .withColumn("data3", temp3("3").getField("data"))
>      .withColumn("priority3", temp3("3").getField("priority"))
>      .drop("1", "2", "3")
>
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> data3|priority3|
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
> 3|
> |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
> null|
> +---+----+------+------+---------+------+------+---------+------+------+---------+
>
>
>
>
>
>
>
>>
>>
>> Jacek
>>
>> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid>
>> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to un-explode or denormalize a table like
>>>
>>> +---+----+-----+------+--------+
>>> |id |name|extra|data  |priority|
>>> +---+----+-----+------+--------+
>>> |1  |Fred|8    |value1|1       |
>>> |1  |Fred|8    |value8|2       |
>>> |1  |Fred|8    |value5|3       |
>>> |2  |Amy |9    |value3|1       |
>>> |2  |Amy |9    |value5|2       |
>>> +---+----+-----+------+--------+
>>>
>>> into something that looks like
>>>
>>>
>>> +---+----+------+------+---------+------+------+---------+------+------+---------+
>>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>>> |priority3|
>>>
>>> +---+----+------+------+---------+------+------+---------+------+------+---------+
>>> |1  |Fred|8     |value1|1        |8     |value8|2        |8     |value5|3
>>> |
>>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>>> |null     |
>>>
>>> +---+----+------+------+---------+------+------+---------+------+------+---------+
>>>
>>> If I were going the other direction, I'd create a new column with an
>>> array of structs, each with 'extra', 'data', and 'priority' fields and then
>>> explode it.
>>>
>>> Going from the more normalized view, though, I'm having a harder time.
>>>
>>> I want to group or partition by (id, name) and order by priority, but
>>> after that I can't figure out how to get multiple rows rotated into one.
>>>
>>> Any ideas?
>>>
>>> Here's the code to create the input table above:
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>>     Row(1, "Fred", 8, "value1", 1),
>>>     Row(1, "Fred", 8, "value8", 2),
>>>     Row(1, "Fred", 8, "value5", 3),
>>>     Row(2, "Amy", 9, "value3", 1),
>>>     Row(2, "Amy", 9, "value5", 2)))
>>>
>>> val schema = StructType(Seq(
>>>     StructField("id", IntegerType, nullable = true),
>>>     StructField("name", StringType, nullable = true),
>>>     StructField("extra", IntegerType, nullable = true),
>>>     StructField("data", StringType, nullable = true),
>>>     StructField("priority", IntegerType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to