[FLINK-6124] [table] Add min/max string aggregation with retracion

This closes #3593.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5282cbc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5282cbc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5282cbc

Branch: refs/heads/table-retraction
Commit: c5282cbcf898b99593ca753fce7557bae1ae09aa
Parents: 05ceec0
Author: twalthr <twal...@apache.org>
Authored: Wed Mar 22 10:03:56 2017 +0100
Committer: twalthr <twal...@apache.org>
Committed: Mon Apr 3 18:21:53 2017 +0200

----------------------------------------------------------------------
 .../functions/aggfunctions/MaxAggFunction.scala |  2 +-
 .../MaxAggFunctionWithRetract.scala             | 12 ++++++--
 .../functions/aggfunctions/MinAggFunction.scala |  2 +-
 .../MinAggFunctionWithRetract.scala             |  8 +++++
 .../table/runtime/aggregate/AggregateUtil.scala |  4 +++
 .../aggfunctions/AggFunctionTestBase.scala      |  4 +--
 .../MaxWithRetractAggFunctionTest.scala         | 32 ++++++++++++++++++++
 .../MinWithRetractAggFunctionTest.scala         | 32 ++++++++++++++++++++
 8 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 3793434..2e666fa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -160,6 +160,6 @@ class DecimalMaxAggFunction extends 
MaxAggFunction[BigDecimal] {
   * Built-in String Max aggregate function
   */
 class StringMaxAggFunction extends MaxAggFunction[String] {
-  override def getInitValue = "".toString
+  override def getInitValue = ""
   override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
index eb6e7dc..14ceba2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
@@ -201,7 +201,7 @@ class DoubleMaxWithRetractAggFunction extends 
MaxWithRetractAggFunction[Double]
   * Built-in Boolean Max with retraction aggregate function
   */
 class BooleanMaxWithRetractAggFunction extends 
MaxWithRetractAggFunction[Boolean] {
-  override def getInitValue = false
+  override def getInitValue: Boolean = false
   override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO
 }
 
@@ -209,6 +209,14 @@ class BooleanMaxWithRetractAggFunction extends 
MaxWithRetractAggFunction[Boolean
   * Built-in Big Decimal Max with retraction aggregate function
   */
 class DecimalMaxWithRetractAggFunction extends 
MaxWithRetractAggFunction[BigDecimal] {
-  override def getInitValue = BigDecimal.ZERO
+  override def getInitValue: BigDecimal = BigDecimal.ZERO
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }
+
+/**
+  * Built-in String Max with retraction aggregate function
+  */
+class StringMaxWithRetractAggFunction extends 
MaxWithRetractAggFunction[String] {
+  override def getInitValue: String = ""
+  override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index 41361fd..75a8ebc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -160,6 +160,6 @@ class DecimalMinAggFunction extends 
MinAggFunction[BigDecimal] {
   * Built-in String Min aggregate function
   */
 class StringMinAggFunction extends MinAggFunction[String] {
-  override def getInitValue = "".toString
+  override def getInitValue = ""
   override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
index c953286..6f2c3a1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
@@ -212,3 +212,11 @@ class DecimalMinWithRetractAggFunction extends 
MinWithRetractAggFunction[BigDeci
   override def getInitValue: BigDecimal = BigDecimal.ZERO
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }
+
+/**
+  * Built-in String Min with retraction aggregate function
+  */
+class StringMinWithRetractAggFunction extends 
MinWithRetractAggFunction[String] {
+  override def getInitValue: String = ""
+  override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index caa2818..40a1f8a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1003,6 +1003,8 @@ object AggregateUtil {
                   new DecimalMinWithRetractAggFunction
                 case BOOLEAN =>
                   new BooleanMinWithRetractAggFunction
+                case VARCHAR | CHAR =>
+                  new StringMinWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException("Min with retract aggregate does no 
support type:" +
                                              sqlType)
@@ -1050,6 +1052,8 @@ object AggregateUtil {
                   new DecimalMaxWithRetractAggFunction
                 case BOOLEAN =>
                   new BooleanMaxWithRetractAggFunction
+                case VARCHAR | CHAR =>
+                  new StringMaxWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException("Max with retract aggregate does no 
support type:" +
                                              sqlType)

http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
index 80fc947..cb1137f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
@@ -44,7 +44,7 @@ abstract class AggFunctionTestBase[T] {
     // iterate over input sets
     for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
       val accumulator = accumulateVals(vals)
-      var result = aggregator.getValue(accumulator)
+      val result = aggregator.getValue(accumulator)
       validateResult[T](expected, result)
 
       if (supportRetraction) {
@@ -71,7 +71,7 @@ abstract class AggFunctionTestBase[T] {
         accumulators.add(accumulateVals(firstVals))
         accumulators.add(accumulateVals(secondVals))
 
-        var accumulator = aggregator.merge(accumulators)
+        val accumulator = aggregator.merge(accumulators)
         val result = aggregator.getValue(accumulator)
         validateResult[T](expected, result)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
index c2329a4..8bd5c6f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
@@ -186,3 +186,35 @@ class DecimalMaxWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecima
 
   override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalMaxWithRetractAggFunction()
 }
+
+class StringMaxWithRetractAggFunctionTest extends AggFunctionTestBase[String] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      "abc",
+      "def",
+      "ghi",
+      null,
+      "jkl",
+      null,
+      "zzz"
+    ),
+    Seq(
+      null,
+      null
+    ),
+    Seq(
+      "x",
+      null,
+      "e"
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    "zzz",
+    null,
+    "x"
+  )
+
+  override def aggregator: AggregateFunction[String] = new 
StringMaxWithRetractAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
index 2ef6b67..9da3828 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
@@ -186,3 +186,35 @@ class DecimalMinWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecima
 
   override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalMinWithRetractAggFunction()
 }
+
+class StringMinWithRetractAggFunctionTest extends AggFunctionTestBase[String] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      "abc",
+      "def",
+      "ghi",
+      null,
+      "jkl",
+      null,
+      "zzz"
+    ),
+    Seq(
+      null,
+      null
+    ),
+    Seq(
+      "x",
+      null,
+      "e"
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    "abc",
+    null,
+    "e"
+  )
+
+  override def aggregator: AggregateFunction[String] = new 
StringMinWithRetractAggFunction()
+}

Reply via email to