(disclaimer: my reply in SO) http://stackoverflow.com/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes/30278605#30278605
On Sat, Oct 31, 2015 at 6:21 AM, Ali Tajeldin EDU <alitedu1...@gmail.com> wrote: > You can take a look at the smvPivot function in the SMV library ( > https://github.com/TresAmigosSD/SMV ). Should look for method "smvPivot" > in SmvDFHelper ( > > http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvDFHelper). > You can also perform the pivot on a group-by-group basis. See smvPivot and > smvPivotSum in SmvGroupedDataFunc ( > http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvGroupedDataFunc > ). > > Docs from smvPivotSum are copied below. Note that you don't have to > specify the baseOutput columns, but if you don't, it will force an > additional action on the input data frame to build the cross products of > all possible values in your input pivot columns. > > Perform a normal SmvPivot operation followed by a sum on all the output > pivot columns. > For example: > > df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", > "5_14_B", "6_14_A", "6_14_B") > > and the following input: > > Input > | id | month | product | count | > | --- | ----- | ------- | ----- | > | 1 | 5/14 | A | 100 | > | 1 | 6/14 | B | 200 | > | 1 | 5/14 | B | 300 | > > will produce the following output: > > | id | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B | > | --- | ------------ | ------------ | ------------ | ------------ | > | 1 | 100 | 300 | NULL | 200 | > > pivotCols > The sequence of column names whose values will be used as the output pivot > column names. > valueCols > The columns whose value will be copied to the pivoted output columns. > baseOutput > The expected base output column names (without the value column prefix). > The user is required to supply the list of expected pivot column output > names to avoid and extra action on the input DataFrame just to extract the > possible pivot columns. if an empty sequence is provided, then the base > output columns will be extracted from values in the pivot columns (will > cause an action on the entire DataFrame!) > > -- > Ali > PS: shoot me an email if you run into any issues using SMV. > > > On Oct 30, 2015, at 6:33 AM, Andrianasolo Fanilo < > fanilo.andrianas...@worldline.com> wrote: > > Hey, > > The question is tricky, here is a possible answer by defining years as > keys for a hashmap per client and merging those : > > > *import *scalaz._ > *import *Scalaz._ > > > *val *sc = *new *SparkContext(*"local[*]"*, *"sandbox"*) > > > *// Create RDD of your objects**val *rdd = sc.parallelize(*Seq*( > (*"A"*, 2015, 4), > (*"A"*, 2014, 12), > (*"A"*, 2013, 1), > (*"B"*, 2015, 24), > (*"B"*, 2013, 4) > )) > > > *// Search for all the years in the RDD**val *minYear = > rdd.map(_._2).reduce(Math.*min*) > *// look for minimum year**val *maxYear = rdd.map(_._2).reduce(Math.*max* > ) > *// look for maximum year**val *sequenceOfYears = maxYear to minYear by -1 > > > > *// create sequence of years from max to min// Define functions to build, > for each client, a Map of year -> value for year, and how those maps will > be merged**def *createCombiner(obj: (Int, Int)): Map[Int, String] = > *Map*(obj._1 > -> obj._2.toString) > *def *mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum + > (obj._1 -> obj._2.toString) > *def *mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) = > accum1 |+| accum2 *// I’m lazy so I use Scalaz to merge two maps of year > -> value, I assume we don’t have two lines with same client and year…* > > > *// For each client, check for each year from maxYear to minYear if it > exists in the computed map. If not input blank.**val *result = rdd > .map { *case *obj => (obj._1, (obj._2, obj._3)) } > .combineByKey(createCombiner, mergeValue, mergeCombiners) > .map{ *case *(name, mapOfYearsToValues) => (*Seq*(name) ++ > sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, *" "* > ))).mkString(*","*)}* // here we assume that sequence of all years isn’t > too big to not fit in memory. If you had to compute for each day, it may > break and you would definitely need to use a specialized timeseries > library…* > > result.foreach(*println*) > > sc.stop() > > Best regards, > Fanilo > > *De :* Adrian Tanase [mailto:atan...@adobe.com] > *Envoyé :* vendredi 30 octobre 2015 11:50 > *À :* Deng Ching-Mallete; Ascot Moss > *Cc :* User > *Objet :* Re: Pivot Data in Spark and Scala > > Its actually a bit tougher as you’ll first need all the years. Also not > sure how you would reprsent your “columns” given they are dynamic based on > the input data. > > Depending on your downstream processing, I’d probably try to emulate it > with a hash map with years as keys instead of the columns. > > There is probably a nicer solution using the data frames API but I’m not > familiar with it. > > If you actually need vectors I think this article I saw recently on the > data bricks blog will highlight some options (look for gather encoder) > > https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html > > -adrian > > *From: *Deng Ching-Mallete > *Date: *Friday, October 30, 2015 at 4:35 AM > *To: *Ascot Moss > *Cc: *User > *Subject: *Re: Pivot Data in Spark and Scala > > Hi, > > You could transform it into a pair RDD then use the combineByKey function. > > HTH, > Deng > > On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss <ascot.m...@gmail.com> wrote: > Hi, > > I have data as follows: > > A, 2015, 4 > A, 2014, 12 > A, 2013, 1 > B, 2015, 24 > B, 2013 4 > > > I need to convert the data to a new format: > A , 4, 12, 1 > B, 24, , 4 > > Any idea how to make it in Spark Scala? > > Thanks > > > > ------------------------------ > > Ce message et les pièces jointes sont confidentiels et réservés à l'usage > exclusif de ses destinataires. Il peut également être protégé par le secret > professionnel. Si vous recevez ce message par erreur, merci d'en avertir > immédiatement l'expéditeur et de le détruire. L'intégrité du message ne > pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra > être recherchée quant au contenu de ce message. Bien que les meilleurs > efforts soient faits pour maintenir cette transmission exempte de tout > virus, l'expéditeur ne donne aucune garantie à cet égard et sa > responsabilité ne saurait être recherchée pour tout dommage résultant d'un > virus transmis. > > This e-mail and the documents attached are confidential and intended > solely for the addressee; it may also be privileged. If you receive this > e-mail in error, please notify the sender immediately and destroy it. As > its integrity cannot be secured on the Internet, the Worldline liability > cannot be triggered for the message content. Although the sender endeavours > to maintain a computer virus-free network, the sender does not warrant that > this transmission is virus-free and will not be liable for any damages > resulting from any virus transmitted. > > > -- Best Regards, Ayan Guha