Re: Pivot Data in Spark and Scala

2015-10-31 Thread ayan guha
(disclaimer: my reply in SO)

http://stackoverflow.com/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes/30278605#30278605


On Sat, Oct 31, 2015 at 6:21 AM, Ali Tajeldin EDU <alitedu1...@gmail.com>
wrote:

> You can take a look at the smvPivot function in the SMV library (
> https://github.com/TresAmigosSD/SMV ).  Should look for method "smvPivot"
> in SmvDFHelper (
>
> http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvDFHelper).
> You can also perform the pivot on a group-by-group basis.  See smvPivot and
> smvPivotSum in SmvGroupedDataFunc (
> http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvGroupedDataFunc
> ).
>
> Docs from smvPivotSum are copied below.  Note that you don't have to
> specify the baseOutput columns, but if you don't, it will force an
> additional action on the input data frame to build the cross products of
> all possible values in your input pivot columns.
>
> Perform a normal SmvPivot operation followed by a sum on all the output
> pivot columns.
> For example:
>
> df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", 
> "5_14_B", "6_14_A", "6_14_B")
>
> and the following input:
>
> Input
> | id  | month | product | count |
> | --- | - | --- | - |
> | 1   | 5/14  |   A |   100 |
> | 1   | 6/14  |   B |   200 |
> | 1   | 5/14  |   B |   300 |
>
> will produce the following output:
>
> | id  | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B |
> | --- |  |  |  |  |
> | 1   | 100  | 300  | NULL | 200  |
>
> pivotCols
> The sequence of column names whose values will be used as the output pivot
> column names.
> valueCols
> The columns whose value will be copied to the pivoted output columns.
> baseOutput
> The expected base output column names (without the value column prefix).
> The user is required to supply the list of expected pivot column output
> names to avoid and extra action on the input DataFrame just to extract the
> possible pivot columns. if an empty sequence is provided, then the base
> output columns will be extracted from values in the pivot columns (will
> cause an action on the entire DataFrame!)
>
> --
> Ali
> PS: shoot me an email if you run into any issues using SMV.
>
>
> On Oct 30, 2015, at 6:33 AM, Andrianasolo Fanilo <
> fanilo.andrianas...@worldline.com> wrote:
>
> Hey,
>
> The question is tricky, here is a possible answer by defining years as
> keys for a hashmap per client and merging those :
>
>
> *import *scalaz._
> *import *Scalaz._
>
>
> *val *sc = *new *SparkContext(*"local[*]"*, *"sandbox"*)
>
>
> *// Create RDD of your objects**val *rdd = sc.parallelize(*Seq*(
>   (*"A"*, 2015, 4),
>   (*"A"*, 2014, 12),
>   (*"A"*, 2013, 1),
>   (*"B"*, 2015, 24),
>   (*"B"*, 2013, 4)
> ))
>
>
> *// Search for all the years in the RDD**val *minYear =
> rdd.map(_._2).reduce(Math.*min*)
> *// look for minimum year**val *maxYear = rdd.map(_._2).reduce(Math.*max*
> )
> *// look for maximum year**val *sequenceOfYears = maxYear to minYear by -1
>
>
>
> *// create sequence of years from max to min// Define functions to build,
> for each client, a Map of year -> value for year, and how those maps will
> be merged**def *createCombiner(obj: (Int, Int)): Map[Int, String] = 
> *Map*(obj._1
> -> obj._2.toString)
> *def *mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum +
> (obj._1 -> obj._2.toString)
> *def *mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) =
>  accum1 |+| accum2 *// I’m lazy so I use Scalaz to merge two maps of year
> -> value, I assume we don’t have two lines with same client and year…*
>
>
> *// For each client, check for each year from maxYear to minYear if it
> exists in the computed map. If not input blank.**val *result = rdd
>   .map { *case *obj => (obj._1, (obj._2, obj._3)) }
>   .combineByKey(createCombiner, mergeValue, mergeCombiners)
>   .map{ *case *(name, mapOfYearsToValues) => (*Seq*(name) ++
> sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, *" "*
> ))).mkString(*","*)}* // here we assume that sequence of all years isn’t
> too big to not fit in memory. If you had to compute for each day, it may
> break and you would definitely need to use a specialized timeseries
> library…*
>
> result.foreach(*println*)
>
> sc.stop()
>
> Best regards,
>

Re: Pivot Data in Spark and Scala

2015-10-30 Thread Adrian Tanase
Its actually a bit tougher as you’ll first need all the years. Also not sure 
how you would reprsent your “columns” given they are dynamic based on the input 
data.

Depending on your downstream processing, I’d probably try to emulate it with a 
hash map with years as keys instead of the columns.

There is probably a nicer solution using the data frames API but I’m not 
familiar with it.

If you actually need vectors I think this article I saw recently on the data 
bricks blog will highlight some options (look for gather encoder)
https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html

-adrian

From: Deng Ching-Mallete
Date: Friday, October 30, 2015 at 4:35 AM
To: Ascot Moss
Cc: User
Subject: Re: Pivot Data in Spark and Scala

Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss 
<ascot.m...@gmail.com<mailto:ascot.m...@gmail.com>> wrote:
Hi,

I have data as follows:

A, 2015, 4
A, 2014, 12
A, 2013, 1
B, 2015, 24
B, 2013 4


I need to convert the data to a new format:
A ,4,12,1
B,   24,,4

Any idea how to make it in Spark Scala?

Thanks




RE: Pivot Data in Spark and Scala

2015-10-30 Thread Andrianasolo Fanilo
Hey,

The question is tricky, here is a possible answer by defining years as keys for 
a hashmap per client and merging those :


import scalaz._
import Scalaz._

val sc = new SparkContext("local[*]", "sandbox")

// Create RDD of your objects
val rdd = sc.parallelize(Seq(
  ("A", 2015, 4),
  ("A", 2014, 12),
  ("A", 2013, 1),
  ("B", 2015, 24),
  ("B", 2013, 4)
))

// Search for all the years in the RDD
val minYear = rdd.map(_._2).reduce(Math.min)// look for minimum year
val maxYear = rdd.map(_._2).reduce(Math.max)// look for maximum year
val sequenceOfYears = maxYear to minYear by -1 // create sequence of years from 
max to min

// Define functions to build, for each client, a Map of year -> value for year, 
and how those maps will be merged
def createCombiner(obj: (Int, Int)): Map[Int, String] = Map(obj._1 -> 
obj._2.toString)
def mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum + (obj._1 -> 
obj._2.toString)
def mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) = accum1 
|+| accum2 // I’m lazy so I use Scalaz to merge two maps of year -> value, I 
assume we don’t have two lines with same client and year…

// For each client, check for each year from maxYear to minYear if it exists in 
the computed map. If not input blank.
val result = rdd
  .map { case obj => (obj._1, (obj._2, obj._3)) }
  .combineByKey(createCombiner, mergeValue, mergeCombiners)
  .map{ case (name, mapOfYearsToValues) => (Seq(name) ++ 
sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, " 
"))).mkString(",")} // here we assume that sequence of all years isn’t too big 
to not fit in memory. If you had to compute for each day, it may break and you 
would definitely need to use a specialized timeseries library…

result.foreach(println)

sc.stop()

Best regards,
Fanilo

De : Adrian Tanase [mailto:atan...@adobe.com]
Envoyé : vendredi 30 octobre 2015 11:50
À : Deng Ching-Mallete; Ascot Moss
Cc : User
Objet : Re: Pivot Data in Spark and Scala

Its actually a bit tougher as you’ll first need all the years. Also not sure 
how you would reprsent your “columns” given they are dynamic based on the input 
data.

Depending on your downstream processing, I’d probably try to emulate it with a 
hash map with years as keys instead of the columns.

There is probably a nicer solution using the data frames API but I’m not 
familiar with it.

If you actually need vectors I think this article I saw recently on the data 
bricks blog will highlight some options (look for gather encoder)
https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html

-adrian

From: Deng Ching-Mallete
Date: Friday, October 30, 2015 at 4:35 AM
To: Ascot Moss
Cc: User
Subject: Re: Pivot Data in Spark and Scala

Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss 
<ascot.m...@gmail.com<mailto:ascot.m...@gmail.com>> wrote:
Hi,

I have data as follows:

A, 2015, 4
A, 2014, 12
A, 2013, 1
B, 2015, 24
B, 2013 4


I need to convert the data to a new format:
A ,4,12,1
B,   24,,4

Any idea how to make it in Spark Scala?

Thanks





Ce message et les pièces jointes sont confidentiels et réservés à l'usage 
exclusif de ses destinataires. Il peut également être protégé par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant 
être assurée sur Internet, la responsabilité de Worldline ne pourra être 
recherchée quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne 
saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, the Worldline liability cannot be triggered for the 
message content. Although the sender endeavours to maintain a computer 
virus-free network, the sender does not warrant that this transmission is 
virus-free and will not be liable for any damages resulting from any virus 
transmitted.


Re: Pivot Data in Spark and Scala

2015-10-30 Thread Ali Tajeldin EDU
You can take a look at the smvPivot function in the SMV library ( 
https://github.com/TresAmigosSD/SMV ).  Should look for method "smvPivot" in 
SmvDFHelper (
http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvDFHelper).
  You can also perform the pivot on a group-by-group basis.  See smvPivot and 
smvPivotSum in SmvGroupedDataFunc 
(http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvGroupedDataFunc).

Docs from smvPivotSum are copied below.  Note that you don't have to specify 
the baseOutput columns, but if you don't, it will force an additional action on 
the input data frame to build the cross products of all possible values in your 
input pivot columns. 

Perform a normal SmvPivot operation followed by a sum on all the output pivot 
columns.
For example:
df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", 
"5_14_B", "6_14_A", "6_14_B")
and the following input:
Input
| id  | month | product | count |
| --- | - | --- | - |
| 1   | 5/14  |   A |   100 |
| 1   | 6/14  |   B |   200 |
| 1   | 5/14  |   B |   300 |
will produce the following output:
| id  | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B |
| --- |  |  |  |  |
| 1   | 100  | 300  | NULL | 200  |
pivotCols
The sequence of column names whose values will be used as the output pivot 
column names.
valueCols
The columns whose value will be copied to the pivoted output columns.
baseOutput
The expected base output column names (without the value column prefix). The 
user is required to supply the list of expected pivot column output names to 
avoid and extra action on the input DataFrame just to extract the possible 
pivot columns. if an empty sequence is provided, then the base output columns 
will be extracted from values in the pivot columns (will cause an action on the 
entire DataFrame!)

--
Ali
PS: shoot me an email if you run into any issues using SMV.


On Oct 30, 2015, at 6:33 AM, Andrianasolo Fanilo 
<fanilo.andrianas...@worldline.com> wrote:

> Hey,
>  
> The question is tricky, here is a possible answer by defining years as keys 
> for a hashmap per client and merging those :
>  
> import scalaz._
> import Scalaz._
>  
> val sc = new SparkContext("local[*]", "sandbox")
> 
> // Create RDD of your objects
> val rdd = sc.parallelize(Seq(
>   ("A", 2015, 4),
>   ("A", 2014, 12),
>   ("A", 2013, 1),
>   ("B", 2015, 24),
>   ("B", 2013, 4)
> ))
> 
> // Search for all the years in the RDD
> val minYear = rdd.map(_._2).reduce(Math.min)// look for minimum year
> val maxYear = rdd.map(_._2).reduce(Math.max)// look for maximum year
> val sequenceOfYears = maxYear to minYear by -1 // create sequence of years 
> from max to min
> 
> // Define functions to build, for each client, a Map of year -> value for 
> year, and how those maps will be merged
> def createCombiner(obj: (Int, Int)): Map[Int, String] = Map(obj._1 -> 
> obj._2.toString)
> def mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum + (obj._1 -> 
> obj._2.toString)
> def mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) = 
> accum1 |+| accum2 // I’m lazy so I use Scalaz to merge two maps of year -> 
> value, I assume we don’t have two lines with same client and year…
> 
> // For each client, check for each year from maxYear to minYear if it exists 
> in the computed map. If not input blank.
> val result = rdd
>   .map { case obj => (obj._1, (obj._2, obj._3)) }
>   .combineByKey(createCombiner, mergeValue, mergeCombiners)
>   .map{ case (name, mapOfYearsToValues) => (Seq(name) ++ 
> sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, " 
> "))).mkString(",")} // here we assume that sequence of all years isn’t too 
> big to not fit in memory. If you had to compute for each day, it may break 
> and you would definitely need to use a specialized timeseries library…
> 
> result.foreach(println)
> 
> sc.stop()
>  
> Best regards,
> Fanilo
>  
> De : Adrian Tanase [mailto:atan...@adobe.com] 
> Envoyé : vendredi 30 octobre 2015 11:50
> À : Deng Ching-Mallete; Ascot Moss
> Cc : User
> Objet : Re: Pivot Data in Spark and Scala
>  
> Its actually a bit tougher as you’ll first need all the years. Also not sure 
> how you would reprsent your “columns” given they are dynamic based on the 
> input data.
>  
> Depending on your downstream processing, I’d probably try to emulate it with 
> a hash map with years as keys instead of the columns.
>  
> There is proba

Re: Pivot Data in Spark and Scala

2015-10-30 Thread Ruslan Dautkhanov
https://issues.apache.org/jira/browse/SPARK-8992

Should be in 1.6?



-- 
Ruslan Dautkhanov

On Thu, Oct 29, 2015 at 5:29 AM, Ascot Moss  wrote:

> Hi,
>
> I have data as follows:
>
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
>
>
> I need to convert the data to a new format:
> A ,4,12,1
> B,   24,,4
>
> Any idea how to make it in Spark Scala?
>
> Thanks
>
>


Re: Pivot Data in Spark and Scala

2015-10-29 Thread Deng Ching-Mallete
Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss  wrote:

> Hi,
>
> I have data as follows:
>
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
>
>
> I need to convert the data to a new format:
> A ,4,12,1
> B,   24,,4
>
> Any idea how to make it in Spark Scala?
>
> Thanks
>
>