Re: Implementing FIRST_VALUE, LEAD, LAG in Spark

2015-02-17 Thread Dmitry Tolpeko
I ended up with the following:

def firstValue(items: Iterable[String]) = for { i - items
} yield (i, items.head)

data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect

More details:
http://dmtolpeko.com/2015/02/17/first_value-last_value-lead-and-lag-in-spark/

I would appreciate any feedback.

Dmitry

On Fri, Feb 13, 2015 at 11:54 AM, Dmitry Tolpeko dmtolp...@gmail.com
wrote:

 Hello,

 To convert existing Map Reduce jobs to Spark, I need to implement window
 functions such as FIRST_VALUE, LEAD, LAG and so on. For example,
 FIRST_VALUE function:

 Source (1st column is key):

 A, A1
 A, A2
 A, A3
 B, B1
 B, B2
 C, C1

 and the result should be

 A, A1, A1
 A, A2, A1
 A, A3, A1
 B, B1, B1
 B, B2, B1
 C, C1, C1

 You can see that the first value in a group is repeated in each row.

 My current Spark/Scala code:

 def firstValue(b: Iterable[String]) : List[(String, String)] = {
   val c = scala.collection.mutable.MutableList[(String, String)]()
   var f = 
   b.foreach(d = { if(f.isEmpty()) f = d; c += d - f})
   c.toList
 }

 val data=sc.parallelize(List(
(A, A1),
(A, A2),
(A, A3),
(B, B1),
(B, B2),
(C, C1)))

 data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect

 So I create a new list after groupByKey. Is it right approach to do this
 in Spark? Are there any other options? Please point me to any drawbacks.

 Thanks,

 Dmitry




Implementing FIRST_VALUE, LEAD, LAG in Spark

2015-02-13 Thread Dmitry Tolpeko
Hello,

To convert existing Map Reduce jobs to Spark, I need to implement window
functions such as FIRST_VALUE, LEAD, LAG and so on. For example,
FIRST_VALUE function:

Source (1st column is key):

A, A1
A, A2
A, A3
B, B1
B, B2
C, C1

and the result should be

A, A1, A1
A, A2, A1
A, A3, A1
B, B1, B1
B, B2, B1
C, C1, C1

You can see that the first value in a group is repeated in each row.

My current Spark/Scala code:

def firstValue(b: Iterable[String]) : List[(String, String)] = {
  val c = scala.collection.mutable.MutableList[(String, String)]()
  var f = 
  b.foreach(d = { if(f.isEmpty()) f = d; c += d - f})
  c.toList
}

val data=sc.parallelize(List(
   (A, A1),
   (A, A2),
   (A, A3),
   (B, B1),
   (B, B2),
   (C, C1)))

data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect

So I create a new list after groupByKey. Is it right approach to do this in
Spark? Are there any other options? Please point me to any drawbacks.

Thanks,

Dmitry