You could also try pivot.

On 7 February 2017 at 16:13, Everett Anderson <ever...@nuna.com.invalid>
wrote:

>
>
> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I think the fastest way is likely to use a combination of conditionals
>> (when / otherwise), first (ignoring nulls), while grouping by the id.
>> This should get the answer with only a single shuffle.
>>
>> Here is an example
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
>> .
>>
>
> Very cool! Using the simpler aggregates feels cleaner.
>
>
>>
>> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi Everett,
>>>
>>> That's pretty much what I'd do. Can't think of a way to beat your
>>> solution. Why do you "feel vaguely uneasy about it"?
>>>
>>
> Maybe it felt like I was unnecessarily grouping-by twice, but probably
> mostly that I hadn't used pivot before.
>
> Interestingly, the physical plans are not especially different between
> these two solutions after the rank column is added. They both have two
> SortAggregates that seem to be figuring out where to put results based on
> the rank:
>
> My original one:
>
> == Physical Plan ==
> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
> +- SortAggregate(key=[id#279,name#280], functions=[first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else
> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312
> else null, true)])
>    +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
> temp_struct#312 else null, true)])
>       +- *Project [id#279, name#280, rank#292, struct(extra#281, data#282,
> priority#283) AS temp_struct#312]
>          +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
>             +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>                +- Exchange hashpartitioning(id#279, name#280, 200)
>                   +- Scan ExistingRDD[id#279,name#280,
> extra#281,data#282,priority#283]
>
>
> And modifying Michael's slightly to use a rank:
>
> import org.apache.spark.sql.functions._
>
> def getColumnWithRank(column: String, rank: Int) = {
>   first(when(col("rank") === lit(rank), col(column)).otherwise(null),
> ignoreNulls = true)
> }
>
> val withRankColumn = data.withColumn("rank", 
> functions.dense_rank().over(Window.partitionBy("id",
> "name").orderBy("priority")))
>
> val modCollapsed = withRankColumn
>   .groupBy($"id", $"name")
>   .agg(
>     getColumnWithRank("data", 1) as 'data1,
>     getColumnWithRank("data", 2) as 'data2,
>     getColumnWithRank("data", 3) as 'data3,
>     getColumnWithRank("extra", 1) as 'extra1,
>     getColumnWithRank("extra", 2) as 'extra2,
>     getColumnWithRank("extra", 3) as 'extra3)
>
>
> modCollapsed.explain
>
> == Physical Plan ==
> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN (rank#965
> = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN
> data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN data#282
> ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 ELSE
> null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE null
> END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null END,
> true)])
> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE
> WHEN (rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 3) THEN extra#281 ELSE null END, true)])
>    +- *Project [id#279, name#280, extra#281, data#282, rank#965]
>       +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
>          +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>             +- Exchange hashpartitioning(id#279, name#280, 200)
>                +- Scan ExistingRDD[id#279,name#280,
> extra#281,data#282,priority#283]
>
>
>
>>
>>> I'd also check out the execution plan (with explain) to see how it's
>>> gonna work at runtime. I may have seen groupBy + join be better than
>>> window (there were more exchanges in play for windows I reckon).
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com>
>>> wrote:
>>> >
>>> >
>>> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> Could groupBy and withColumn or UDAF work perhaps? I think window
>>> could
>>> >> help here too.
>>> >
>>> >
>>> > This seems to work, but I do feel vaguely uneasy about it. :)
>>> >
>>> > // First add a 'rank' column which is priority order just in case
>>> priorities
>>> > aren't
>>> > // from 1 with no gaps.
>>> > val temp1 = data.withColumn("rank", functions.dense_rank()
>>> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
>>> >
>>> > +---+----+-----+------+--------+----+
>>> > | id|name|extra|  data|priority|rank|
>>> > +---+----+-----+------+--------+----+
>>> > |  1|Fred|    8|value1|       1|   1|
>>> > |  1|Fred|    8|value8|       2|   2|
>>> > |  1|Fred|    8|value5|       3|   3|
>>> > |  2| Amy|    9|value3|       1|   1|
>>> > |  2| Amy|    9|value5|       2|   2|
>>> > +---+----+-----+------+--------+----+
>>> >
>>> > // Now move all the columns we want to denormalize into a struct
>>> column to
>>> > keep them together.
>>> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
>>> > temp1("data"), temp1("priority")))
>>> >   .drop("extra", "data", "priority")
>>> >
>>> > +---+----+----+------------+
>>> > | id|name|rank| temp_struct|
>>> > +---+----+----+------------+
>>> > |  1|Fred|   1|[8,value1,1]|
>>> > |  1|Fred|   2|[8,value8,2]|
>>> > |  1|Fred|   3|[8,value5,3]|
>>> > |  2| Amy|   1|[9,value3,1]|
>>> > |  2| Amy|   2|[9,value5,2]|
>>> > +---+----+----+------------+
>>> >
>>> > // groupBy, again, but now pivot the rank column. We need an aggregate
>>> > function after pivot,
>>> > // so use first -- there will only ever be one element.
>>> > val temp3 = temp2.groupBy("id", "name")
>>> >   .pivot("rank", Seq("1", "2", "3"))
>>> >   .agg(functions.first("temp_struct"))
>>> >
>>> > +---+----+------------+------------+------------+
>>> > | id|name|           1|           2|           3|
>>> > +---+----+------------+------------+------------+
>>> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
>>> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
>>> > +---+----+------------+------------+------------+
>>> >
>>> > // Now just moving things out of the structs and clean up.
>>> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>>> >      .withColumn("data1", temp3("1").getField("data"))
>>> >      .withColumn("priority1", temp3("1").getField("priority"))
>>> >      .withColumn("extra2", temp3("2").getField("extra"))
>>> >      .withColumn("data2", temp3("2").getField("data"))
>>> >      .withColumn("priority2", temp3("2").getField("priority"))
>>> >      .withColumn("extra3", temp3("3").getField("extra"))
>>> >      .withColumn("data3", temp3("3").getField("data"))
>>> >      .withColumn("priority3", temp3("3").getField("priority"))
>>> >      .drop("1", "2", "3")
>>> >
>>> > +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
>>> > data3|priority3|
>>> > +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> > |  1|Fred|     8|value1|        1|     8|value8|        2|
>>>  8|value5|
>>> > 3|
>>> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|
>>> null|
>>> > null|
>>> > +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >>
>>> >>
>>> >> Jacek
>>> >>
>>> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid
>>> >
>>> >> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I'm trying to un-explode or denormalize a table like
>>> >>>
>>> >>> +---+----+-----+------+--------+
>>> >>> |id |name|extra|data  |priority|
>>> >>> +---+----+-----+------+--------+
>>> >>> |1  |Fred|8    |value1|1       |
>>> >>> |1  |Fred|8    |value8|2       |
>>> >>> |1  |Fred|8    |value5|3       |
>>> >>> |2  |Amy |9    |value3|1       |
>>> >>> |2  |Amy |9    |value5|2       |
>>> >>> +---+----+-----+------+--------+
>>> >>>
>>> >>> into something that looks like
>>> >>>
>>> >>>
>>> >>> +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >>> |id |name|extra1|data1 |priority1|extra2|data2
>>> |priority2|extra3|data3
>>> >>> |priority3|
>>> >>>
>>> >>> +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>>>  |value5|3
>>> >>> |
>>> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>>> >>> |null     |
>>> >>>
>>> >>> +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >>>
>>> >>> If I were going the other direction, I'd create a new column with an
>>> >>> array of structs, each with 'extra', 'data', and 'priority' fields
>>> and then
>>> >>> explode it.
>>> >>>
>>> >>> Going from the more normalized view, though, I'm having a harder
>>> time.
>>> >>>
>>> >>> I want to group or partition by (id, name) and order by priority, but
>>> >>> after that I can't figure out how to get multiple rows rotated into
>>> one.
>>> >>>
>>> >>> Any ideas?
>>> >>>
>>> >>> Here's the code to create the input table above:
>>> >>>
>>> >>> import org.apache.spark.sql.Row
>>> >>> import org.apache.spark.sql.Dataset
>>> >>> import org.apache.spark.sql.types._
>>> >>>
>>> >>> val rowsRDD = sc.parallelize(Seq(
>>> >>>     Row(1, "Fred", 8, "value1", 1),
>>> >>>     Row(1, "Fred", 8, "value8", 2),
>>> >>>     Row(1, "Fred", 8, "value5", 3),
>>> >>>     Row(2, "Amy", 9, "value3", 1),
>>> >>>     Row(2, "Amy", 9, "value5", 2)))
>>> >>>
>>> >>> val schema = StructType(Seq(
>>> >>>     StructField("id", IntegerType, nullable = true),
>>> >>>     StructField("name", StringType, nullable = true),
>>> >>>     StructField("extra", IntegerType, nullable = true),
>>> >>>     StructField("data", StringType, nullable = true),
>>> >>>     StructField("priority", IntegerType, nullable = true)))
>>> >>>
>>> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>> >>>
>>> >>>
>>> >>>
>>> >
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>

Reply via email to