This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8290e5e [SPARK-26353][SQL] Add typed aggregate functions(max/min) to the example module. 8290e5e is described below commit 8290e5eccb22e6b865075a0fe62772ce072f0a08 Author: liuxian <liu.xi...@zte.com.cn> AuthorDate: Mon Feb 18 17:20:58 2019 +0800 [SPARK-26353][SQL] Add typed aggregate functions(max/min) to the example module. ## What changes were proposed in this pull request? Add typed aggregate functions(max/min) to the example module. ## How was this patch tested? Manual testing: running typed minimum: ``` +-----+----------------------+ |value|TypedMin(scala.Tuple2)| +-----+----------------------+ | 0| [0.0]| | 2| [2.0]| | 1| [1.0]| +-----+----------------------+ ``` running typed maximum: ``` +-----+----------------------+ |value|TypedMax(scala.Tuple2)| +-----+----------------------+ | 0| [18]| | 2| [17]| | 1| [19]| +-----+----------------------+ ``` Closes #23304 from 10110346/typedminmax. Authored-by: liuxian <liu.xi...@zte.com.cn> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/examples/sql/SimpleTypedAggregator.scala | 74 ++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala index f8af919..5510f00 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala @@ -44,6 +44,12 @@ object SimpleTypedAggregator { println("running typed average:") ds.groupByKey(_._1).agg(new TypedAverage[(Long, Long)](_._2.toDouble).toColumn).show() + println("running typed minimum:") + ds.groupByKey(_._1).agg(new TypedMin[(Long, Long)](_._2.toDouble).toColumn).show() + + println("running typed maximum:") + ds.groupByKey(_._1).agg(new TypedMax[(Long, Long)](_._2).toColumn).show() + spark.stop() } } @@ -84,3 +90,71 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long } override def outputEncoder: Encoder[Double] = Encoders.scalaDouble } + +class TypedMin[IN](val f: IN => Double) extends Aggregator[IN, MutableDouble, Option[Double]] { + override def zero: MutableDouble = null + override def reduce(b: MutableDouble, a: IN): MutableDouble = { + if (b == null) { + new MutableDouble(f(a)) + } else { + b.value = math.min(b.value, f(a)) + b + } + } + override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = { + if (b1 == null) { + b2 + } else if (b2 == null) { + b1 + } else { + b1.value = math.min(b1.value, b2.value) + b1 + } + } + override def finish(reduction: MutableDouble): Option[Double] = { + if (reduction != null) { + Some(reduction.value) + } else { + None + } + } + + override def bufferEncoder: Encoder[MutableDouble] = Encoders.kryo[MutableDouble] + override def outputEncoder: Encoder[Option[Double]] = Encoders.product[Option[Double]] +} + +class TypedMax[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, Option[Long]] { + override def zero: MutableLong = null + override def reduce(b: MutableLong, a: IN): MutableLong = { + if (b == null) { + new MutableLong(f(a)) + } else { + b.value = math.max(b.value, f(a)) + b + } + } + override def merge(b1: MutableLong, b2: MutableLong): MutableLong = { + if (b1 == null) { + b2 + } else if (b2 == null) { + b1 + } else { + b1.value = math.max(b1.value, b2.value) + b1 + } + } + override def finish(reduction: MutableLong): Option[Long] = { + if (reduction != null) { + Some(reduction.value) + } else { + None + } + } + + override def bufferEncoder: Encoder[MutableLong] = Encoders.kryo[MutableLong] + override def outputEncoder: Encoder[Option[Long]] = Encoders.product[Option[Long]] +} + +class MutableLong(var value: Long) extends Serializable + +class MutableDouble(var value: Double) extends Serializable --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org