Hi All https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
I have been converting my UDAFs to Dataset (Dataset's are cool BTW) Aggregators. I have an ArraySum aggregator that does an element wise sum or arrays. I have got the simple version working, but the Generic version fails with the following error, not sure what I am doing wrong. scala> import sqlContext.implicits._ scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = new GenericArraySumAggregator(f).toColumn <console>:34: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases. def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = new GenericArraySumAggregator(f).toColumn ^ object ArraySumAggregator extends Aggregator[Seq[Float], Seq[Float], Seq[Float]] with Serializable { def zero: Seq[Float] = Nil // The initial value. def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) = sumArray(currentSum, currentRow) def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row) def finish(b: Seq[Float]) = b // Return the final result. def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = { (a, b) match { case (Nil, Nil) => Nil case (Nil, row) => row case (sum, Nil) => sum case (sum, row) => (a, b).zipped.map { case (a, b) => a + b } } } } class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable { val numeric = implicitly[Numeric[N]] override def zero: Seq[N] = Nil override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b, a.map( x => f(x))) //numeric.plus(b, f(a)) override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2) override def finish(reduction: Seq[N]): Seq[N] = reduction def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = { (a, b) match { case (Nil, Nil) => Nil case (Nil, row) => row case (sum, Nil) => sum case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a, b) } } } } Regards Deenar