[FLINK-6124] [table] Add min/max string aggregation with retracion This closes #3593.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5282cbc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5282cbc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5282cbc Branch: refs/heads/table-retraction Commit: c5282cbcf898b99593ca753fce7557bae1ae09aa Parents: 05ceec0 Author: twalthr <twal...@apache.org> Authored: Wed Mar 22 10:03:56 2017 +0100 Committer: twalthr <twal...@apache.org> Committed: Mon Apr 3 18:21:53 2017 +0200 ---------------------------------------------------------------------- .../functions/aggfunctions/MaxAggFunction.scala | 2 +- .../MaxAggFunctionWithRetract.scala | 12 ++++++-- .../functions/aggfunctions/MinAggFunction.scala | 2 +- .../MinAggFunctionWithRetract.scala | 8 +++++ .../table/runtime/aggregate/AggregateUtil.scala | 4 +++ .../aggfunctions/AggFunctionTestBase.scala | 4 +-- .../MaxWithRetractAggFunctionTest.scala | 32 ++++++++++++++++++++ .../MinWithRetractAggFunctionTest.scala | 32 ++++++++++++++++++++ 8 files changed, 90 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala index 3793434..2e666fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala @@ -160,6 +160,6 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] { * Built-in String Max aggregate function */ class StringMaxAggFunction extends MaxAggFunction[String] { - override def getInitValue = "".toString + override def getInitValue = "" override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO } http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala index eb6e7dc..14ceba2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala @@ -201,7 +201,7 @@ class DoubleMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Double] * Built-in Boolean Max with retraction aggregate function */ class BooleanMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Boolean] { - override def getInitValue = false + override def getInitValue: Boolean = false override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO } @@ -209,6 +209,14 @@ class BooleanMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Boolean * Built-in Big Decimal Max with retraction aggregate function */ class DecimalMaxWithRetractAggFunction extends MaxWithRetractAggFunction[BigDecimal] { - override def getInitValue = BigDecimal.ZERO + override def getInitValue: BigDecimal = BigDecimal.ZERO override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO } + +/** + * Built-in String Max with retraction aggregate function + */ +class StringMaxWithRetractAggFunction extends MaxWithRetractAggFunction[String] { + override def getInitValue: String = "" + override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO +} http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala index 41361fd..75a8ebc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala @@ -160,6 +160,6 @@ class DecimalMinAggFunction extends MinAggFunction[BigDecimal] { * Built-in String Min aggregate function */ class StringMinAggFunction extends MinAggFunction[String] { - override def getInitValue = "".toString + override def getInitValue = "" override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO } http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala index c953286..6f2c3a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala @@ -212,3 +212,11 @@ class DecimalMinWithRetractAggFunction extends MinWithRetractAggFunction[BigDeci override def getInitValue: BigDecimal = BigDecimal.ZERO override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO } + +/** + * Built-in String Min with retraction aggregate function + */ +class StringMinWithRetractAggFunction extends MinWithRetractAggFunction[String] { + override def getInitValue: String = "" + override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO +} http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index caa2818..40a1f8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -1003,6 +1003,8 @@ object AggregateUtil { new DecimalMinWithRetractAggFunction case BOOLEAN => new BooleanMinWithRetractAggFunction + case VARCHAR | CHAR => + new StringMinWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException("Min with retract aggregate does no support type:" + sqlType) @@ -1050,6 +1052,8 @@ object AggregateUtil { new DecimalMaxWithRetractAggFunction case BOOLEAN => new BooleanMaxWithRetractAggFunction + case VARCHAR | CHAR => + new StringMaxWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException("Max with retract aggregate does no support type:" + sqlType) http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala index 80fc947..cb1137f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala @@ -44,7 +44,7 @@ abstract class AggFunctionTestBase[T] { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { val accumulator = accumulateVals(vals) - var result = aggregator.getValue(accumulator) + val result = aggregator.getValue(accumulator) validateResult[T](expected, result) if (supportRetraction) { @@ -71,7 +71,7 @@ abstract class AggFunctionTestBase[T] { accumulators.add(accumulateVals(firstVals)) accumulators.add(accumulateVals(secondVals)) - var accumulator = aggregator.merge(accumulators) + val accumulator = aggregator.merge(accumulators) val result = aggregator.getValue(accumulator) validateResult[T](expected, result) http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala index c2329a4..8bd5c6f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala @@ -186,3 +186,35 @@ class DecimalMaxWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecima override def aggregator: AggregateFunction[BigDecimal] = new DecimalMaxWithRetractAggFunction() } + +class StringMaxWithRetractAggFunctionTest extends AggFunctionTestBase[String] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + "abc", + "def", + "ghi", + null, + "jkl", + null, + "zzz" + ), + Seq( + null, + null + ), + Seq( + "x", + null, + "e" + ) + ) + + override def expectedResults: Seq[String] = Seq( + "zzz", + null, + "x" + ) + + override def aggregator: AggregateFunction[String] = new StringMaxWithRetractAggFunction() +} http://git-wip-us.apache.org/repos/asf/flink/blob/c5282cbc/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala index 2ef6b67..9da3828 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala @@ -186,3 +186,35 @@ class DecimalMinWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecima override def aggregator: AggregateFunction[BigDecimal] = new DecimalMinWithRetractAggFunction() } + +class StringMinWithRetractAggFunctionTest extends AggFunctionTestBase[String] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + "abc", + "def", + "ghi", + null, + "jkl", + null, + "zzz" + ), + Seq( + null, + null + ), + Seq( + "x", + null, + "e" + ) + ) + + override def expectedResults: Seq[String] = Seq( + "abc", + null, + "e" + ) + + override def aggregator: AggregateFunction[String] = new StringMinWithRetractAggFunction() +}