I think this is fixed in branch-1.6 already.  If you can reproduce it there
can you please open a JIRA and ping me?

On Fri, Jan 29, 2016 at 12:16 PM, deenar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Hi Michael
>
> The Dataset aggregators do not appear to support complex Spark-SQL types. I
> wasn't sure if I was doing something wrong or if this was a bug or a
> feature
> not implemented yet. Having this in would be great. See below (reposting
> this from the spark user list)
>
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>
> I have been converting my UDAFs to Dataset (Dataset's are cool BTW)
> Aggregators. I have an ArraySum aggregator that does an element wise sum or
> arrays. I have got the simple version working, but the Generic version
> fails
> with the following error, not sure what I am doing wrong.
>
> scala> import sqlContext.implicits._
>
> scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N]
> = new GenericArraySumAggregator(f).toColumn
>
> <console>:34: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing sqlContext.implicits._  Support for serializing
> other
> types will be added in future releases.
>
>          def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I,
> N] = new GenericArraySumAggregator(f).toColumn
>
>
> ^
>
> object ArraySumAggregator extends  Aggregator[Seq[Float], Seq[Float],
> Seq[Float]] with Serializable {
>   def zero: Seq[Float] = Nil
>   // The initial value.
>   def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) =
> sumArray(currentSum, currentRow)
>   def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row)
>   def finish(b: Seq[Float]) = b // Return the final result.
>   def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = {
>     (a, b) match {
>       case (Nil, Nil) => Nil
>       case (Nil, row) => row
>       case (sum, Nil) => sum
>       case (sum, row) => (a, b).zipped.map { case (a, b) => a + b }
>     }
>   }
> }
> class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends
> Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable {
>   val numeric = implicitly[Numeric[N]]
>   override def zero: Seq[N] = Nil
>   override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b, a.map( x
> => f(x))) //numeric.plus(b, f(a))
>   override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2)
>   override def finish(reduction: Seq[N]): Seq[N] = reduction
>   def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = {
>     (a, b) match {
>       case (Nil, Nil) => Nil
>       case (Nil, row) => row
>       case (sum, Nil) => sum
>       case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a,
> b) }
>     }
>   }
> }
>
>
>
> Regards
> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-1-tp16009p16155.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>

Reply via email to