This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8290e5e  [SPARK-26353][SQL] Add typed aggregate functions(max/min) to 
the example module.
8290e5e is described below

commit 8290e5eccb22e6b865075a0fe62772ce072f0a08
Author: liuxian <liu.xi...@zte.com.cn>
AuthorDate: Mon Feb 18 17:20:58 2019 +0800

    [SPARK-26353][SQL] Add typed aggregate functions(max/min) to the example 
module.
    
    ## What changes were proposed in this pull request?
    
    Add typed aggregate functions(max/min) to the example module.
    
    ## How was this patch tested?
    Manual testing:
    running typed minimum:
    
    ```
    +-----+----------------------+
    |value|TypedMin(scala.Tuple2)|
    +-----+----------------------+
    |    0|                 [0.0]|
    |    2|                 [2.0]|
    |    1|                 [1.0]|
    +-----+----------------------+
    ```
    
    running typed maximum:
    
    ```
    +-----+----------------------+
    |value|TypedMax(scala.Tuple2)|
    +-----+----------------------+
    |    0|                  [18]|
    |    2|                  [17]|
    |    1|                  [19]|
    +-----+----------------------+
    ```
    
    Closes #23304 from 10110346/typedminmax.
    
    Authored-by: liuxian <liu.xi...@zte.com.cn>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/examples/sql/SimpleTypedAggregator.scala | 74 ++++++++++++++++++++++
 1 file changed, 74 insertions(+)

diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala
index f8af919..5510f00 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala
@@ -44,6 +44,12 @@ object SimpleTypedAggregator {
     println("running typed average:")
     ds.groupByKey(_._1).agg(new TypedAverage[(Long, 
Long)](_._2.toDouble).toColumn).show()
 
+    println("running typed minimum:")
+    ds.groupByKey(_._1).agg(new TypedMin[(Long, 
Long)](_._2.toDouble).toColumn).show()
+
+    println("running typed maximum:")
+    ds.groupByKey(_._1).agg(new TypedMax[(Long, Long)](_._2).toColumn).show()
+
     spark.stop()
   }
 }
@@ -84,3 +90,71 @@ class TypedAverage[IN](val f: IN => Double) extends 
Aggregator[IN, (Double, Long
   }
   override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
 }
+
+class TypedMin[IN](val f: IN => Double) extends Aggregator[IN, MutableDouble, 
Option[Double]] {
+  override def zero: MutableDouble = null
+  override def reduce(b: MutableDouble, a: IN): MutableDouble = {
+    if (b == null) {
+      new MutableDouble(f(a))
+    } else {
+      b.value = math.min(b.value, f(a))
+      b
+    }
+  }
+  override def merge(b1: MutableDouble, b2: MutableDouble): MutableDouble = {
+    if (b1 == null) {
+      b2
+    } else if (b2 == null) {
+      b1
+    } else {
+      b1.value = math.min(b1.value, b2.value)
+      b1
+    }
+  }
+  override def finish(reduction: MutableDouble): Option[Double] = {
+    if (reduction != null) {
+      Some(reduction.value)
+    } else {
+      None
+    }
+  }
+
+  override def bufferEncoder: Encoder[MutableDouble] = 
Encoders.kryo[MutableDouble]
+  override def outputEncoder: Encoder[Option[Double]] = 
Encoders.product[Option[Double]]
+}
+
+class TypedMax[IN](val f: IN => Long) extends Aggregator[IN, MutableLong, 
Option[Long]] {
+  override def zero: MutableLong = null
+  override def reduce(b: MutableLong, a: IN): MutableLong = {
+    if (b == null) {
+      new MutableLong(f(a))
+    } else {
+      b.value = math.max(b.value, f(a))
+      b
+    }
+  }
+  override def merge(b1: MutableLong, b2: MutableLong): MutableLong = {
+    if (b1 == null) {
+      b2
+    } else if (b2 == null) {
+      b1
+    } else {
+      b1.value = math.max(b1.value, b2.value)
+      b1
+    }
+  }
+  override def finish(reduction: MutableLong): Option[Long] = {
+    if (reduction != null) {
+      Some(reduction.value)
+    } else {
+      None
+    }
+  }
+
+  override def bufferEncoder: Encoder[MutableLong] = Encoders.kryo[MutableLong]
+  override def outputEncoder: Encoder[Option[Long]] = 
Encoders.product[Option[Long]]
+}
+
+class MutableLong(var value: Long) extends Serializable
+
+class MutableDouble(var value: Double) extends Serializable


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to