Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18113#discussion_r154889195 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala --- @@ -99,3 +96,165 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] } } + +class TypedMinDouble[IN](val f: IN => Double) + extends Aggregator[IN, MutableDouble, java.lang.Double] { + override def zero: MutableDouble = null + override def reduce(b: MutableDouble, a: IN): MutableDouble = { + if (b == null) { + new MutableDouble(f(a)) + } else { + b.value = math.min(b.value, f(a)) + b + } + } + override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = { + if (b1 == null) { + b2 + } else if (b2 == null) { + b1 + } else { + b1.value = math.min(b1.value, b2.value) + b1 + } + } + override def finish(reduction: MutableDouble): java.lang.Double = { + if (reduction == null) { + null + } else { + reduction.toJavaDouble + } + } + + override def bufferEncoder: Encoder[MutableDouble] = Encoders.kryo[MutableDouble] + override def outputEncoder: Encoder[java.lang.Double] = ExpressionEncoder[java.lang.Double]() + + // Java api support + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x)) + def toColumnScala: TypedColumn[IN, Double] = { --- End diff -- seems it's hard to achieve, how about ``` trait TypedMinDouble[IN, OUT](val f: IN => Double) extends Aggregator[IN, MutableDouble, OUT] { def zero = ... def reduce = ... } class JavaTypedMinDouble[IN](val f: IN => Double) extend TypedMinDouble[IN, JDouble](f) { def finish: JDouble } class ScalaTypedMinDouble[IN](val f: IN => Double) extends TypedMinDouble[IN, Option[Double](f) { def finish: Option[Double] } ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org