Folks, I wrote the following wrapper on top on combineByKey. The RDD is of Array[Any] and I am extracting a field at a given index for combining. There are two ways in which I tried this:
Option A: leave colIndex abstract in Aggregator class and define in derived object Aggtor with value -1. It is set later in function myAggregate. Works fine but I want to keep the API user unaware of colIndex. Option B(shown in code below): Set colIndex to -1 in abstract class. Aggtor does not mention it at all. It is set later in myAggregate. Option B works from scalatest in Eclipse but runs into closure mishap in scala-shell. I am looking for an explanation and a possible solution/workaround. Appreciate any help! Thanks, Mohit. ---------- API helper ----- abstract class Aggregator[U] { var colIndex: Int = -1 def convert(a: Array[Any]): U = { a(colIndex).asInstanceOf[U] } def mergeValue(a: U, b: Array[Any]): U = { aggregate(a, convert(b)) } def mergeCombiners(x: U, y: U): U = { aggregate(x, y) } def aggregate(p: U, q: U): U } ------ API handler ----- def myAggregate[U: ClassTag](...aggtor: Aggregator[U]) = { aggtor.colIndex = <something> keyBy(aggByCol).combineByKey(aggtor.convert, aggtor.mergeValue, aggtor.mergeCombiners) } -------- call the API ---- case object Aggtor extends Aggregator[List[String]] { //var colIndex = -1 def aggregate = .... } myAggregate(...Aggtor)