(disclaimer: my reply in SO)

http://stackoverflow.com/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes/30278605#30278605


On Sat, Oct 31, 2015 at 6:21 AM, Ali Tajeldin EDU <alitedu1...@gmail.com>
wrote:

> You can take a look at the smvPivot function in the SMV library (
> https://github.com/TresAmigosSD/SMV ).  Should look for method "smvPivot"
> in SmvDFHelper (
>
> http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvDFHelper).
> You can also perform the pivot on a group-by-group basis.  See smvPivot and
> smvPivotSum in SmvGroupedDataFunc (
> http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvGroupedDataFunc
> ).
>
> Docs from smvPivotSum are copied below.  Note that you don't have to
> specify the baseOutput columns, but if you don't, it will force an
> additional action on the input data frame to build the cross products of
> all possible values in your input pivot columns.
>
> Perform a normal SmvPivot operation followed by a sum on all the output
> pivot columns.
> For example:
>
> df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", 
> "5_14_B", "6_14_A", "6_14_B")
>
> and the following input:
>
> Input
> | id  | month | product | count |
> | --- | ----- | ------- | ----- |
> | 1   | 5/14  |   A     |   100 |
> | 1   | 6/14  |   B     |   200 |
> | 1   | 5/14  |   B     |   300 |
>
> will produce the following output:
>
> | id  | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B |
> | --- | ------------ | ------------ | ------------ | ------------ |
> | 1   | 100          | 300          | NULL         | 200          |
>
> pivotCols
> The sequence of column names whose values will be used as the output pivot
> column names.
> valueCols
> The columns whose value will be copied to the pivoted output columns.
> baseOutput
> The expected base output column names (without the value column prefix).
> The user is required to supply the list of expected pivot column output
> names to avoid and extra action on the input DataFrame just to extract the
> possible pivot columns. if an empty sequence is provided, then the base
> output columns will be extracted from values in the pivot columns (will
> cause an action on the entire DataFrame!)
>
> --
> Ali
> PS: shoot me an email if you run into any issues using SMV.
>
>
> On Oct 30, 2015, at 6:33 AM, Andrianasolo Fanilo <
> fanilo.andrianas...@worldline.com> wrote:
>
> Hey,
>
> The question is tricky, here is a possible answer by defining years as
> keys for a hashmap per client and merging those :
>
>
> *import *scalaz._
> *import *Scalaz._
>
>
> *val *sc = *new *SparkContext(*"local[*]"*, *"sandbox"*)
>
>
> *// Create RDD of your objects**val *rdd = sc.parallelize(*Seq*(
>   (*"A"*, 2015, 4),
>   (*"A"*, 2014, 12),
>   (*"A"*, 2013, 1),
>   (*"B"*, 2015, 24),
>   (*"B"*, 2013, 4)
> ))
>
>
> *// Search for all the years in the RDD**val *minYear =
> rdd.map(_._2).reduce(Math.*min*)
> *// look for minimum year**val *maxYear = rdd.map(_._2).reduce(Math.*max*
> )
> *// look for maximum year**val *sequenceOfYears = maxYear to minYear by -1
>
>
>
> *// create sequence of years from max to min// Define functions to build,
> for each client, a Map of year -> value for year, and how those maps will
> be merged**def *createCombiner(obj: (Int, Int)): Map[Int, String] = 
> *Map*(obj._1
> -> obj._2.toString)
> *def *mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum +
> (obj._1 -> obj._2.toString)
> *def *mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) =
>  accum1 |+| accum2 *// I’m lazy so I use Scalaz to merge two maps of year
> -> value, I assume we don’t have two lines with same client and year…*
>
>
> *// For each client, check for each year from maxYear to minYear if it
> exists in the computed map. If not input blank.**val *result = rdd
>   .map { *case *obj => (obj._1, (obj._2, obj._3)) }
>   .combineByKey(createCombiner, mergeValue, mergeCombiners)
>   .map{ *case *(name, mapOfYearsToValues) => (*Seq*(name) ++
> sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, *" "*
> ))).mkString(*","*)}* // here we assume that sequence of all years isn’t
> too big to not fit in memory. If you had to compute for each day, it may
> break and you would definitely need to use a specialized timeseries
> library…*
>
> result.foreach(*println*)
>
> sc.stop()
>
> Best regards,
> Fanilo
>
> *De :* Adrian Tanase [mailto:atan...@adobe.com]
> *Envoyé :* vendredi 30 octobre 2015 11:50
> *À :* Deng Ching-Mallete; Ascot Moss
> *Cc :* User
> *Objet :* Re: Pivot Data in Spark and Scala
>
> Its actually a bit tougher as you’ll first need all the years. Also not
> sure how you would reprsent your “columns” given they are dynamic based on
> the input data.
>
> Depending on your downstream processing, I’d probably try to emulate it
> with a hash map with years as keys instead of the columns.
>
> There is probably a nicer solution using the data frames API but I’m not
> familiar with it.
>
> If you actually need vectors I think this article I saw recently on the
> data bricks blog will highlight some options (look for gather encoder)
>
> https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html
>
> -adrian
>
> *From: *Deng Ching-Mallete
> *Date: *Friday, October 30, 2015 at 4:35 AM
> *To: *Ascot Moss
> *Cc: *User
> *Subject: *Re: Pivot Data in Spark and Scala
>
> Hi,
>
> You could transform it into a pair RDD then use the combineByKey function.
>
> HTH,
> Deng
>
> On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss <ascot.m...@gmail.com> wrote:
> Hi,
>
> I have data as follows:
>
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
>
>
> I need to convert the data to a new format:
> A ,    4,    12,    1
> B,   24,        ,    4
>
> Any idea how to make it in Spark Scala?
>
> Thanks
>
>
>
> ------------------------------
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to