Folks,

I wrote the following wrapper on top on combineByKey. The RDD is of
Array[Any] and I am extracting a field at a given index for combining.
There are two ways in which I tried this:

Option A: leave colIndex abstract in Aggregator class and define in derived
object Aggtor with value -1. It is set later in function myAggregate. Works
fine but I want to keep the API user unaware of colIndex.

Option B(shown in code below): Set colIndex to -1 in abstract class. Aggtor
does not mention it at all. It is set later in myAggregate.

Option B works from scalatest in Eclipse but runs into closure mishap in
scala-shell. I am looking for an explanation and a possible
solution/workaround. Appreciate any help!

Thanks,

Mohit.

---------- API helper -----

abstract class Aggregator[U] {

    var colIndex: Int = -1

    def convert(a: Array[Any]): U = {

        a(colIndex).asInstanceOf[U]

    }

    def mergeValue(a: U, b: Array[Any]): U = {

        aggregate(a, convert(b))

    }

    def mergeCombiners(x: U, y: U): U = {

        aggregate(x, y)

    }

    def aggregate(p: U, q: U): U

}

------ API handler -----

    def myAggregate[U: ClassTag](...aggtor: Aggregator[U]) = {

    aggtor.colIndex = <something>

        keyBy(aggByCol).combineByKey(aggtor.convert, aggtor.mergeValue,
aggtor.mergeCombiners)

    }


-------- call the API ----

case object Aggtor extends Aggregator[List[String]] {

//var colIndex = -1

 def aggregate = ....

}

myAggregate(...Aggtor)

Reply via email to