Folks,
I wrote the following wrapper on top on combineByKey. The RDD is of
Array[Any] and I am extracting a field at a given index for combining.
There are two ways in which I tried this:
Option A: leave colIndex abstract in Aggregator class and define in derived
object Aggtor with value -1. It is set later in function myAggregate. Works
fine but I want to keep the API user unaware of colIndex.
Option B(shown in code below): Set colIndex to -1 in abstract class. Aggtor
does not mention it at all. It is set later in myAggregate.
Option B works from scalatest in Eclipse but runs into closure mishap in
scala-shell. I am looking for an explanation and a possible
solution/workaround. Appreciate any help!
Thanks,
Mohit.
---------- API helper -----
abstract class Aggregator[U] {
var colIndex: Int = -1
def convert(a: Array[Any]): U = {
a(colIndex).asInstanceOf[U]
}
def mergeValue(a: U, b: Array[Any]): U = {
aggregate(a, convert(b))
}
def mergeCombiners(x: U, y: U): U = {
aggregate(x, y)
}
def aggregate(p: U, q: U): U
}
------ API handler -----
def myAggregate[U: ClassTag](...aggtor: Aggregator[U]) = {
aggtor.colIndex = <something>
keyBy(aggByCol).combineByKey(aggtor.convert, aggtor.mergeValue,
aggtor.mergeCombiners)
}
-------- call the API ----
case object Aggtor extends Aggregator[List[String]] {
//var colIndex = -1
def aggregate = ....
}
myAggregate(...Aggtor)