Hi Deenar, You just need to encapsulate Array in Case Class ( you can not define case class inside spark shell as it can not be inner class)
import com.hsbc.rsl.spark.aggregation.MinVectorAggFunction import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SQLContext import org.apache.spark.sql.TypedColumn case class ResultSmallA(tradeId: String, tradeVersion: String, values: Array[Double]) class AggregateResults extends Aggregator[ResultSmallA, ResultSmallA, ResultSmallA] with Serializable { def zero: ResultSmallA = ResultSmallA("", "", Array[Double](0)) // The initial value. def reduce(b: ResultSmallA, a: ResultSmallA) = ResultSmallA(b.tradeId, b.tradeVersion, min.mergeArrays(a.values, b.values)) // Add an element to the running total def merge(b: ResultSmallA, a: ResultSmallA) = ResultSmallA(b.tradeId, b.tradeVersion, (a.values, b.values).zipped.map { case (a, b) => a+ b } // Merge intermediate values. def finish(b: ResultSmallA) = b } def sumRes : TypedColumn[ResultSmallA, ResultSmallA] = new AggregateResults().toColumn import sqlContext.implicits._ val dsResults = Seq(ResultSmallA("1", "1", Array[Double](1.0,2.0)), ResultSmallA("1", "1", Array[Double](1.0,2.0)) ).toDS() dsResults.groupBy(_.tradeId).agg(sumRes) Best Regards, Arkadiusz Bicz https://uk.linkedin.com/in/arkadiuszbicz On Mon, Jan 25, 2016 at 10:36 PM, Deenar Toraskar <deenar.toras...@gmail.com> wrote: > Hi All > > https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html > > I have been converting my UDAFs to Dataset (Dataset's are cool BTW) > Aggregators. I have an ArraySum aggregator that does an element wise sum or > arrays. I have got the simple version working, but the Generic version fails > with the following error, not sure what I am doing wrong. > > scala> import sqlContext.implicits._ > > scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] > = new GenericArraySumAggregator(f).toColumn > > <console>:34: error: Unable to find encoder for type stored in a Dataset. > Primitive types (Int, String, etc) and Product types (case classes) are > supported by importing sqlContext.implicits._ Support for serializing other > types will be added in future releases. > > def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, > N] = new GenericArraySumAggregator(f).toColumn > > > ^ > > object ArraySumAggregator extends Aggregator[Seq[Float], Seq[Float], > Seq[Float]] with Serializable { > def zero: Seq[Float] = Nil > // The initial value. > def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) = > sumArray(currentSum, currentRow) > def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row) > def finish(b: Seq[Float]) = b // Return the final result. > def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = { > (a, b) match { > case (Nil, Nil) => Nil > case (Nil, row) => row > case (sum, Nil) => sum > case (sum, row) => (a, b).zipped.map { case (a, b) => a + b } > } > } > } > > class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends > Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable { > val numeric = implicitly[Numeric[N]] > override def zero: Seq[N] = Nil > override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b, a.map( x > => f(x))) //numeric.plus(b, f(a)) > override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2) > override def finish(reduction: Seq[N]): Seq[N] = reduction > def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = { > (a, b) match { > case (Nil, Nil) => Nil > case (Nil, row) => row > case (sum, Nil) => sum > case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a, > b) } > } > } > } > > Regards > > Deenar > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org