I think the fastest way is likely to use a combination of conditionals
(when / otherwise), first (ignoring nulls), while grouping by the id.  This
should get the answer with only a single shuffle.

Here is an example
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
.

On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Everett,
>
> That's pretty much what I'd do. Can't think of a way to beat your
> solution. Why do you "feel vaguely uneasy about it"?
>
> I'd also check out the execution plan (with explain) to see how it's
> gonna work at runtime. I may have seen groupBy + join be better than
> window (there were more exchanges in play for windows I reckon).
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com>
> wrote:
> >
> >
> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
> wrote:
> >>
> >> Hi,
> >>
> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
> >> help here too.
> >
> >
> > This seems to work, but I do feel vaguely uneasy about it. :)
> >
> > // First add a 'rank' column which is priority order just in case
> priorities
> > aren't
> > // from 1 with no gaps.
> > val temp1 = data.withColumn("rank", functions.dense_rank()
> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
> >
> > +---+----+-----+------+--------+----+
> > | id|name|extra|  data|priority|rank|
> > +---+----+-----+------+--------+----+
> > |  1|Fred|    8|value1|       1|   1|
> > |  1|Fred|    8|value8|       2|   2|
> > |  1|Fred|    8|value5|       3|   3|
> > |  2| Amy|    9|value3|       1|   1|
> > |  2| Amy|    9|value5|       2|   2|
> > +---+----+-----+------+--------+----+
> >
> > // Now move all the columns we want to denormalize into a struct column
> to
> > keep them together.
> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> > temp1("data"), temp1("priority")))
> >   .drop("extra", "data", "priority")
> >
> > +---+----+----+------------+
> > | id|name|rank| temp_struct|
> > +---+----+----+------------+
> > |  1|Fred|   1|[8,value1,1]|
> > |  1|Fred|   2|[8,value8,2]|
> > |  1|Fred|   3|[8,value5,3]|
> > |  2| Amy|   1|[9,value3,1]|
> > |  2| Amy|   2|[9,value5,2]|
> > +---+----+----+------------+
> >
> > // groupBy, again, but now pivot the rank column. We need an aggregate
> > function after pivot,
> > // so use first -- there will only ever be one element.
> > val temp3 = temp2.groupBy("id", "name")
> >   .pivot("rank", Seq("1", "2", "3"))
> >   .agg(functions.first("temp_struct"))
> >
> > +---+----+------------+------------+------------+
> > | id|name|           1|           2|           3|
> > +---+----+------------+------------+------------+
> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
> > +---+----+------------+------------+------------+
> >
> > // Now just moving things out of the structs and clean up.
> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
> >      .withColumn("data1", temp3("1").getField("data"))
> >      .withColumn("priority1", temp3("1").getField("priority"))
> >      .withColumn("extra2", temp3("2").getField("extra"))
> >      .withColumn("data2", temp3("2").getField("data"))
> >      .withColumn("priority2", temp3("2").getField("priority"))
> >      .withColumn("extra3", temp3("3").getField("extra"))
> >      .withColumn("data3", temp3("3").getField("data"))
> >      .withColumn("priority3", temp3("3").getField("priority"))
> >      .drop("1", "2", "3")
> >
> > +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> > data3|priority3|
> > +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> > |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
> > 3|
> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
> > null|
> > +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >
> >
> >
> >
> >
> >
> >
> >>
> >>
> >> Jacek
> >>
> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm trying to un-explode or denormalize a table like
> >>>
> >>> +---+----+-----+------+--------+
> >>> |id |name|extra|data  |priority|
> >>> +---+----+-----+------+--------+
> >>> |1  |Fred|8    |value1|1       |
> >>> |1  |Fred|8    |value8|2       |
> >>> |1  |Fred|8    |value5|3       |
> >>> |2  |Amy |9    |value3|1       |
> >>> |2  |Amy |9    |value5|2       |
> >>> +---+----+-----+------+--------+
> >>>
> >>> into something that looks like
> >>>
> >>>
> >>> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
> >>> |priority3|
> >>>
> >>> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>  |value5|3
> >>> |
> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
> >>> |null     |
> >>>
> >>> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >>>
> >>> If I were going the other direction, I'd create a new column with an
> >>> array of structs, each with 'extra', 'data', and 'priority' fields and
> then
> >>> explode it.
> >>>
> >>> Going from the more normalized view, though, I'm having a harder time.
> >>>
> >>> I want to group or partition by (id, name) and order by priority, but
> >>> after that I can't figure out how to get multiple rows rotated into
> one.
> >>>
> >>> Any ideas?
> >>>
> >>> Here's the code to create the input table above:
> >>>
> >>> import org.apache.spark.sql.Row
> >>> import org.apache.spark.sql.Dataset
> >>> import org.apache.spark.sql.types._
> >>>
> >>> val rowsRDD = sc.parallelize(Seq(
> >>>     Row(1, "Fred", 8, "value1", 1),
> >>>     Row(1, "Fred", 8, "value8", 2),
> >>>     Row(1, "Fred", 8, "value5", 3),
> >>>     Row(2, "Amy", 9, "value3", 1),
> >>>     Row(2, "Amy", 9, "value5", 2)))
> >>>
> >>> val schema = StructType(Seq(
> >>>     StructField("id", IntegerType, nullable = true),
> >>>     StructField("name", StringType, nullable = true),
> >>>     StructField("extra", IntegerType, nullable = true),
> >>>     StructField("data", StringType, nullable = true),
> >>>     StructField("priority", IntegerType, nullable = true)))
> >>>
> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
> >>>
> >>>
> >>>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to