Repository: flink Updated Branches: refs/heads/master fe018921e -> bc6409d62
http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala index a388acf..fd510ff 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.functions.AggregateFunction * * @tparam T the type for the aggregation result */ -abstract class AvgAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] { +abstract class AvgAggFunctionTestBase[T: Numeric, ACC] extends AggFunctionTestBase[T, ACC] { private val numeric: Numeric[T] = implicitly[Numeric[T]] @@ -91,9 +91,11 @@ abstract class AvgAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] numeric.fromInt(3), null.asInstanceOf[T] ) + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } -class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte] { +class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte, IntegralAvgAccumulator] { override def minVal = (Byte.MinValue + 1).toByte @@ -102,7 +104,7 @@ class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte] { override def aggregator = new ByteAvgAggFunction() } -class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short] { +class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short, IntegralAvgAccumulator] { override def minVal = (Short.MinValue + 1).toShort @@ -111,7 +113,7 @@ class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short] { override def aggregator = new ShortAvgAggFunction() } -class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int] { +class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int, IntegralAvgAccumulator] { override def minVal = Int.MinValue + 1 @@ -120,7 +122,7 @@ class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int] { override def aggregator = new IntAvgAggFunction() } -class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long] { +class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long, BigIntegralAvgAccumulator] { override def minVal = Long.MinValue + 1 @@ -129,7 +131,7 @@ class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long] { override def aggregator = new LongAvgAggFunction() } -class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float] { +class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float, FloatingAvgAccumulator] { override def minVal = Float.MinValue @@ -138,7 +140,7 @@ class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float] { override def aggregator = new FloatAvgAggFunction() } -class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double] { +class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double, FloatingAvgAccumulator] { override def minVal = Float.MinValue @@ -147,7 +149,7 @@ class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double] { override def aggregator = new DoubleAvgAggFunction() } -class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal] { +class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal, DecimalAvgAccumulator] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -182,5 +184,8 @@ class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal] { null ) - override def aggregator: AggregateFunction[BigDecimal] = new DecimalAvgAggFunction() + override def aggregator: AggregateFunction[BigDecimal, DecimalAvgAccumulator] = + new DecimalAvgAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala index d5f09b2..6830b8f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.functions.AggregateFunction /** * Test case for built-in count aggregate function */ -class CountAggFunctionTest extends AggFunctionTestBase[Long] { +class CountAggFunctionTest extends AggFunctionTestBase[Long, CountAccumulator] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq("a", "b", null, "c", null, "d", "e", null, "f"), @@ -32,5 +32,7 @@ class CountAggFunctionTest extends AggFunctionTestBase[Long] { override def expectedResults: Seq[Long] = Seq(6L, 0L) - override def aggregator: AggregateFunction[Long] = new CountAggFunction() + override def aggregator: AggregateFunction[Long, CountAccumulator] = new CountAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala index 38ea993..54c2a7d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.functions.AggregateFunction * * @tparam T the type for the aggregation result */ -abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] { +abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, MaxAccumulator[T]] { private val numeric: Numeric[T] = implicitly[Numeric[T]] @@ -61,8 +61,6 @@ abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] { maxVal, null.asInstanceOf[T] ) - - override def supportRetraction: Boolean = false } class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] { @@ -71,7 +69,8 @@ class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] { override def maxVal = (Byte.MaxValue - 1).toByte - override def aggregator: AggregateFunction[Byte] = new ByteMaxAggFunction() + override def aggregator: AggregateFunction[Byte, MaxAccumulator[Byte]] = + new ByteMaxAggFunction() } class ShortMaxAggFunctionTest extends MaxAggFunctionTest[Short] { @@ -80,7 +79,8 @@ class ShortMaxAggFunctionTest extends MaxAggFunctionTest[Short] { override def maxVal = (Short.MaxValue - 1).toShort - override def aggregator: AggregateFunction[Short] = new ShortMaxAggFunction() + override def aggregator: AggregateFunction[Short, MaxAccumulator[Short]] = + new ShortMaxAggFunction() } class IntMaxAggFunctionTest extends MaxAggFunctionTest[Int] { @@ -89,7 +89,8 @@ class IntMaxAggFunctionTest extends MaxAggFunctionTest[Int] { override def maxVal = Int.MaxValue - 1 - override def aggregator: AggregateFunction[Int] = new IntMaxAggFunction() + override def aggregator: AggregateFunction[Int, MaxAccumulator[Int]] = + new IntMaxAggFunction() } class LongMaxAggFunctionTest extends MaxAggFunctionTest[Long] { @@ -98,7 +99,8 @@ class LongMaxAggFunctionTest extends MaxAggFunctionTest[Long] { override def maxVal = Long.MaxValue - 1 - override def aggregator: AggregateFunction[Long] = new LongMaxAggFunction() + override def aggregator: AggregateFunction[Long, MaxAccumulator[Long]] = + new LongMaxAggFunction() } class FloatMaxAggFunctionTest extends MaxAggFunctionTest[Float] { @@ -107,7 +109,8 @@ class FloatMaxAggFunctionTest extends MaxAggFunctionTest[Float] { override def maxVal = Float.MaxValue / 2 - override def aggregator: AggregateFunction[Float] = new FloatMaxAggFunction() + override def aggregator: AggregateFunction[Float, MaxAccumulator[Float]] = + new FloatMaxAggFunction() } class DoubleMaxAggFunctionTest extends MaxAggFunctionTest[Double] { @@ -116,10 +119,11 @@ class DoubleMaxAggFunctionTest extends MaxAggFunctionTest[Double] { override def maxVal = Double.MaxValue / 2 - override def aggregator: AggregateFunction[Double] = new DoubleMaxAggFunction() + override def aggregator: AggregateFunction[Double, MaxAccumulator[Double]] = + new DoubleMaxAggFunction() } -class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean] { +class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean, MaxAccumulator[Boolean]] { override def inputValueSets: Seq[Seq[Boolean]] = Seq( Seq( @@ -155,12 +159,12 @@ class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean] { null.asInstanceOf[Boolean] ) - override def aggregator: AggregateFunction[Boolean] = new BooleanMaxAggFunction() - - override def supportRetraction: Boolean = false + override def aggregator: AggregateFunction[Boolean, MaxAccumulator[Boolean]] = + new BooleanMaxAggFunction() } -class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal] { +class DecimalMaxAggFunctionTest + extends AggFunctionTestBase[BigDecimal, MaxAccumulator[BigDecimal]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -188,12 +192,11 @@ class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal] { null ) - override def aggregator: AggregateFunction[BigDecimal] = new DecimalMaxAggFunction() - - override def supportRetraction: Boolean = false + override def aggregator: AggregateFunction[BigDecimal, MaxAccumulator[BigDecimal]] = + new DecimalMaxAggFunction() } -class StringMaxAggFunctionTest extends AggFunctionTestBase[String] { +class StringMaxAggFunctionTest extends AggFunctionTestBase[String, MaxAccumulator[String]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( new String("a"), @@ -221,7 +224,6 @@ class StringMaxAggFunctionTest extends AggFunctionTestBase[String] { new String("household") ) - override def aggregator: AggregateFunction[String] = new StringMaxAggFunction() - - override def supportRetraction: Boolean = false + override def aggregator: AggregateFunction[String, MaxAccumulator[String]] = + new StringMaxAggFunction() } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala index 8bd5c6f..c11ae41 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala @@ -25,7 +25,8 @@ import org.apache.flink.table.functions.AggregateFunction * * @tparam T the type for the aggregation result */ -abstract class MaxWithRetractAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] { +abstract class MaxWithRetractAggFunctionTest[T: Numeric] + extends AggFunctionTestBase[T, MaxWithRetractAccumulator[T]] { private val numeric: Numeric[T] = implicitly[Numeric[T]] @@ -61,6 +62,8 @@ abstract class MaxWithRetractAggFunctionTest[T: Numeric] extends AggFunctionTest maxVal, null.asInstanceOf[T] ) + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class ByteMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Byte] { @@ -69,7 +72,8 @@ class ByteMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[By override def maxVal = (Byte.MaxValue - 1).toByte - override def aggregator: AggregateFunction[Byte] = new ByteMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[Byte, MaxWithRetractAccumulator[Byte]] = + new ByteMaxWithRetractAggFunction() } class ShortMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Short] { @@ -78,7 +82,8 @@ class ShortMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[S override def maxVal = (Short.MaxValue - 1).toShort - override def aggregator: AggregateFunction[Short] = new ShortMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[Short, MaxWithRetractAccumulator[Short]] = + new ShortMaxWithRetractAggFunction() } class IntMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Int] { @@ -87,7 +92,8 @@ class IntMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Int override def maxVal = Int.MaxValue - 1 - override def aggregator: AggregateFunction[Int] = new IntMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[Int, MaxWithRetractAccumulator[Int]] = + new IntMaxWithRetractAggFunction() } class LongMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Long] { @@ -96,7 +102,8 @@ class LongMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Lo override def maxVal = Long.MaxValue - 1 - override def aggregator: AggregateFunction[Long] = new LongMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[Long, MaxWithRetractAccumulator[Long]] = + new LongMaxWithRetractAggFunction() } class FloatMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Float] { @@ -105,7 +112,8 @@ class FloatMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[F override def maxVal = Float.MaxValue / 2 - override def aggregator: AggregateFunction[Float] = new FloatMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[Float, MaxWithRetractAccumulator[Float]] = + new FloatMaxWithRetractAggFunction() } class DoubleMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Double] { @@ -114,10 +122,12 @@ class DoubleMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[ override def maxVal = Double.MaxValue / 2 - override def aggregator: AggregateFunction[Double] = new DoubleMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[Double, MaxWithRetractAccumulator[Double]] = + new DoubleMaxWithRetractAggFunction() } -class BooleanMaxWithRetractAggFunctionTest extends AggFunctionTestBase[Boolean] { +class BooleanMaxWithRetractAggFunctionTest + extends AggFunctionTestBase[Boolean, MaxWithRetractAccumulator[Boolean]] { override def inputValueSets: Seq[Seq[Boolean]] = Seq( Seq( @@ -153,10 +163,14 @@ class BooleanMaxWithRetractAggFunctionTest extends AggFunctionTestBase[Boolean] null.asInstanceOf[Boolean] ) - override def aggregator: AggregateFunction[Boolean] = new BooleanMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[Boolean, MaxWithRetractAccumulator[Boolean]] = + new BooleanMaxWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } -class DecimalMaxWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecimal] { +class DecimalMaxWithRetractAggFunctionTest + extends AggFunctionTestBase[BigDecimal, MaxWithRetractAccumulator[BigDecimal]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -184,10 +198,14 @@ class DecimalMaxWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecima null ) - override def aggregator: AggregateFunction[BigDecimal] = new DecimalMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[BigDecimal, MaxWithRetractAccumulator[BigDecimal]] = + new DecimalMaxWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } -class StringMaxWithRetractAggFunctionTest extends AggFunctionTestBase[String] { +class StringMaxWithRetractAggFunctionTest + extends AggFunctionTestBase[String, MaxWithRetractAccumulator[String]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -216,5 +234,8 @@ class StringMaxWithRetractAggFunctionTest extends AggFunctionTestBase[String] { "x" ) - override def aggregator: AggregateFunction[String] = new StringMaxWithRetractAggFunction() + override def aggregator: AggregateFunction[String, MaxWithRetractAccumulator[String]] = + new StringMaxWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala index 84e541a..6f41bd2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.functions.AggregateFunction * * @tparam T the type for the aggregation result */ -abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] { +abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, MinAccumulator[T]] { private val numeric: Numeric[T] = implicitly[Numeric[T]] @@ -61,8 +61,6 @@ abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] { minVal, null.asInstanceOf[T] ) - - override def supportRetraction: Boolean = false } class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] { @@ -71,7 +69,8 @@ class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] { override def maxVal = (Byte.MaxValue - 1).toByte - override def aggregator: AggregateFunction[Byte] = new ByteMinAggFunction() + override def aggregator: AggregateFunction[Byte, MinAccumulator[Byte]] = + new ByteMinAggFunction() } class ShortMinAggFunctionTest extends MinAggFunctionTest[Short] { @@ -80,7 +79,8 @@ class ShortMinAggFunctionTest extends MinAggFunctionTest[Short] { override def maxVal = (Short.MaxValue - 1).toShort - override def aggregator: AggregateFunction[Short] = new ShortMinAggFunction() + override def aggregator: AggregateFunction[Short, MinAccumulator[Short]] = + new ShortMinAggFunction() } class IntMinAggFunctionTest extends MinAggFunctionTest[Int] { @@ -89,7 +89,8 @@ class IntMinAggFunctionTest extends MinAggFunctionTest[Int] { override def maxVal = Int.MaxValue - 1 - override def aggregator: AggregateFunction[Int] = new IntMinAggFunction() + override def aggregator: AggregateFunction[Int, MinAccumulator[Int]] = + new IntMinAggFunction() } class LongMinAggFunctionTest extends MinAggFunctionTest[Long] { @@ -98,7 +99,8 @@ class LongMinAggFunctionTest extends MinAggFunctionTest[Long] { override def maxVal = Long.MaxValue - 1 - override def aggregator: AggregateFunction[Long] = new LongMinAggFunction() + override def aggregator: AggregateFunction[Long, MinAccumulator[Long]] = + new LongMinAggFunction() } class FloatMinAggFunctionTest extends MinAggFunctionTest[Float] { @@ -107,7 +109,8 @@ class FloatMinAggFunctionTest extends MinAggFunctionTest[Float] { override def maxVal = Float.MaxValue / 2 - override def aggregator: AggregateFunction[Float] = new FloatMinAggFunction() + override def aggregator: AggregateFunction[Float, MinAccumulator[Float]] = + new FloatMinAggFunction() } class DoubleMinAggFunctionTest extends MinAggFunctionTest[Double] { @@ -116,10 +119,11 @@ class DoubleMinAggFunctionTest extends MinAggFunctionTest[Double] { override def maxVal = Double.MaxValue / 2 - override def aggregator: AggregateFunction[Double] = new DoubleMinAggFunction() + override def aggregator: AggregateFunction[Double, MinAccumulator[Double]] = + new DoubleMinAggFunction() } -class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean] { +class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean, MinAccumulator[Boolean]] { override def inputValueSets: Seq[Seq[Boolean]] = Seq( Seq( @@ -155,12 +159,12 @@ class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean] { null.asInstanceOf[Boolean] ) - override def aggregator: AggregateFunction[Boolean] = new BooleanMinAggFunction() - - override def supportRetraction: Boolean = false + override def aggregator: AggregateFunction[Boolean, MinAccumulator[Boolean]] = + new BooleanMinAggFunction() } -class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal] { +class DecimalMinAggFunctionTest + extends AggFunctionTestBase[BigDecimal, MinAccumulator[BigDecimal]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -188,12 +192,12 @@ class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal] { null ) - override def aggregator: AggregateFunction[BigDecimal] = new DecimalMinAggFunction() - - override def supportRetraction: Boolean = false + override def aggregator: AggregateFunction[BigDecimal, MinAccumulator[BigDecimal]] = + new DecimalMinAggFunction() } -class StringMinAggFunctionTest extends AggFunctionTestBase[String] { +class StringMinAggFunctionTest + extends AggFunctionTestBase[String, MinAccumulator[String]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( new String("a"), @@ -221,7 +225,6 @@ class StringMinAggFunctionTest extends AggFunctionTestBase[String] { new String("1House") ) - override def aggregator: AggregateFunction[String] = new StringMinAggFunction() - - override def supportRetraction: Boolean = false + override def aggregator: AggregateFunction[String, MinAccumulator[String]] = + new StringMinAggFunction() } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala index 9da3828..e13e69b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala @@ -25,7 +25,8 @@ import org.apache.flink.table.functions.AggregateFunction * * @tparam T the type for the aggregation result */ -abstract class MinWithRetractAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] { +abstract class MinWithRetractAggFunctionTest[T: Numeric] + extends AggFunctionTestBase[T, MinWithRetractAccumulator[T]] { private val numeric: Numeric[T] = implicitly[Numeric[T]] @@ -61,6 +62,8 @@ abstract class MinWithRetractAggFunctionTest[T: Numeric] extends AggFunctionTest minVal, null.asInstanceOf[T] ) + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class ByteMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Byte] { @@ -69,7 +72,8 @@ class ByteMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[By override def maxVal = (Byte.MaxValue - 1).toByte - override def aggregator: AggregateFunction[Byte] = new ByteMinWithRetractAggFunction() + override def aggregator: AggregateFunction[Byte, MinWithRetractAccumulator[Byte]] = + new ByteMinWithRetractAggFunction() } class ShortMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Short] { @@ -78,7 +82,8 @@ class ShortMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[S override def maxVal = (Short.MaxValue - 1).toShort - override def aggregator: AggregateFunction[Short] = new ShortMinWithRetractAggFunction() + override def aggregator: AggregateFunction[Short, MinWithRetractAccumulator[Short]] = + new ShortMinWithRetractAggFunction() } class IntMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Int] { @@ -87,7 +92,8 @@ class IntMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Int override def maxVal = Int.MaxValue - 1 - override def aggregator: AggregateFunction[Int] = new IntMinWithRetractAggFunction() + override def aggregator: AggregateFunction[Int, MinWithRetractAccumulator[Int]] = + new IntMinWithRetractAggFunction() } class LongMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Long] { @@ -96,7 +102,8 @@ class LongMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Lo override def maxVal = Long.MaxValue - 1 - override def aggregator: AggregateFunction[Long] = new LongMinWithRetractAggFunction() + override def aggregator: AggregateFunction[Long, MinWithRetractAccumulator[Long]] = + new LongMinWithRetractAggFunction() } class FloatMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Float] { @@ -105,7 +112,8 @@ class FloatMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[F override def maxVal = Float.MaxValue / 2 - override def aggregator: AggregateFunction[Float] = new FloatMinWithRetractAggFunction() + override def aggregator: AggregateFunction[Float, MinWithRetractAccumulator[Float]] = + new FloatMinWithRetractAggFunction() } class DoubleMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Double] { @@ -114,10 +122,12 @@ class DoubleMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[ override def maxVal = Double.MaxValue / 2 - override def aggregator: AggregateFunction[Double] = new DoubleMinWithRetractAggFunction() + override def aggregator: AggregateFunction[Double, MinWithRetractAccumulator[Double]] = + new DoubleMinWithRetractAggFunction() } -class BooleanMinWithRetractAggFunctionTest extends AggFunctionTestBase[Boolean] { +class BooleanMinWithRetractAggFunctionTest + extends AggFunctionTestBase[Boolean, MinWithRetractAccumulator[Boolean]] { override def inputValueSets: Seq[Seq[Boolean]] = Seq( Seq( @@ -153,10 +163,14 @@ class BooleanMinWithRetractAggFunctionTest extends AggFunctionTestBase[Boolean] null.asInstanceOf[Boolean] ) - override def aggregator: AggregateFunction[Boolean] = new BooleanMinWithRetractAggFunction() + override def aggregator: AggregateFunction[Boolean, MinWithRetractAccumulator[Boolean]] = + new BooleanMinWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } -class DecimalMinWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecimal] { +class DecimalMinWithRetractAggFunctionTest + extends AggFunctionTestBase[BigDecimal, MinWithRetractAccumulator[BigDecimal]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -184,10 +198,14 @@ class DecimalMinWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecima null ) - override def aggregator: AggregateFunction[BigDecimal] = new DecimalMinWithRetractAggFunction() + override def aggregator: AggregateFunction[BigDecimal, MinWithRetractAccumulator[BigDecimal]] = + new DecimalMinWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } -class StringMinWithRetractAggFunctionTest extends AggFunctionTestBase[String] { +class StringMinWithRetractAggFunctionTest + extends AggFunctionTestBase[String, MinWithRetractAccumulator[String]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -216,5 +234,8 @@ class StringMinWithRetractAggFunctionTest extends AggFunctionTestBase[String] { "e" ) - override def aggregator: AggregateFunction[String] = new StringMinWithRetractAggFunction() + override def aggregator: AggregateFunction[String, MinWithRetractAccumulator[String]] = + new StringMinWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala index cd69187..6816a5e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala @@ -26,7 +26,8 @@ import org.apache.flink.table.functions.AggregateFunction * * @tparam T the type for the aggregation result */ -abstract class SumAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] { +abstract class SumAggFunctionTestBase[T: Numeric] + extends AggFunctionTestBase[T, SumAccumulator[T]] { private val numeric: Numeric[T] = implicitly[Numeric[T]] @@ -63,54 +64,59 @@ abstract class SumAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] numeric.fromInt(2), null.asInstanceOf[T] ) - - override def supportRetraction: Boolean = false } class ByteSumAggFunctionTest extends SumAggFunctionTestBase[Byte] { override def maxVal = (Byte.MaxValue / 2).toByte - override def aggregator: AggregateFunction[Byte] = new ByteSumAggFunction + override def aggregator: AggregateFunction[Byte, SumAccumulator[Byte]] = + new ByteSumAggFunction } class ShortSumAggFunctionTest extends SumAggFunctionTestBase[Short] { override def maxVal = (Short.MaxValue / 2).toShort - override def aggregator: AggregateFunction[Short] = new ShortSumAggFunction + override def aggregator: AggregateFunction[Short, SumAccumulator[Short]] = + new ShortSumAggFunction } class IntSumAggFunctionTest extends SumAggFunctionTestBase[Int] { override def maxVal = Int.MaxValue / 2 - override def aggregator: AggregateFunction[Int] = new IntSumAggFunction + override def aggregator: AggregateFunction[Int, SumAccumulator[Int]] = + new IntSumAggFunction } class LongSumAggFunctionTest extends SumAggFunctionTestBase[Long] { override def maxVal = Long.MaxValue / 2 - override def aggregator: AggregateFunction[Long] = new LongSumAggFunction + override def aggregator: AggregateFunction[Long, SumAccumulator[Long]] = + new LongSumAggFunction } class FloatSumAggFunctionTest extends SumAggFunctionTestBase[Float] { override def maxVal = 12345.6789f - override def aggregator: AggregateFunction[Float] = new FloatSumAggFunction + override def aggregator: AggregateFunction[Float, SumAccumulator[Float]] = + new FloatSumAggFunction } class DoubleSumAggFunctionTest extends SumAggFunctionTestBase[Double] { override def maxVal = 12345.6789d - override def aggregator: AggregateFunction[Double] = new DoubleSumAggFunction + override def aggregator: AggregateFunction[Double, SumAccumulator[Double]] = + new DoubleSumAggFunction } -class DecimalSumAggFunctionTest extends AggFunctionTestBase[BigDecimal] { +class DecimalSumAggFunctionTest + extends AggFunctionTestBase[BigDecimal, DecimalSumAccumulator] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -143,9 +149,8 @@ class DecimalSumAggFunctionTest extends AggFunctionTestBase[BigDecimal] { null ) - override def aggregator: AggregateFunction[BigDecimal] = new DecimalSumAggFunction() - - override def supportRetraction: Boolean = false + override def aggregator: AggregateFunction[BigDecimal, DecimalSumAccumulator] = + new DecimalSumAggFunction() } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala index 72af358..8b6daba 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala @@ -26,7 +26,8 @@ import org.apache.flink.table.functions.AggregateFunction * * @tparam T the type for the aggregation result */ -abstract class SumWithRetractAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] { +abstract class SumWithRetractAggFunctionTestBase[T: Numeric] + extends AggFunctionTestBase[T, SumWithRetractAccumulator[T]] { private val numeric: Numeric[T] = implicitly[Numeric[T]] @@ -63,52 +64,61 @@ abstract class SumWithRetractAggFunctionTestBase[T: Numeric] extends AggFunction numeric.fromInt(2), null.asInstanceOf[T] ) + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class ByteSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Byte] { override def maxVal = (Byte.MaxValue / 2).toByte - override def aggregator: AggregateFunction[Byte] = new ByteSumWithRetractAggFunction + override def aggregator: AggregateFunction[Byte, SumWithRetractAccumulator[Byte]] = + new ByteSumWithRetractAggFunction } class ShortSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Short] { override def maxVal = (Short.MaxValue / 2).toShort - override def aggregator: AggregateFunction[Short] = new ShortSumWithRetractAggFunction + override def aggregator: AggregateFunction[Short, SumWithRetractAccumulator[Short]] = + new ShortSumWithRetractAggFunction } class IntSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Int] { override def maxVal = Int.MaxValue / 2 - override def aggregator: AggregateFunction[Int] = new IntSumWithRetractAggFunction + override def aggregator: AggregateFunction[Int, SumWithRetractAccumulator[Int]] = + new IntSumWithRetractAggFunction } class LongSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Long] { override def maxVal = Long.MaxValue / 2 - override def aggregator: AggregateFunction[Long] = new LongSumWithRetractAggFunction + override def aggregator: AggregateFunction[Long, SumWithRetractAccumulator[Long]] = + new LongSumWithRetractAggFunction } class FloatSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Float] { override def maxVal = 12345.6789f - override def aggregator: AggregateFunction[Float] = new FloatSumWithRetractAggFunction + override def aggregator: AggregateFunction[Float, SumWithRetractAccumulator[Float]] = + new FloatSumWithRetractAggFunction } class DoubleSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Double] { override def maxVal = 12345.6789d - override def aggregator: AggregateFunction[Double] = new DoubleSumWithRetractAggFunction + override def aggregator: AggregateFunction[Double, SumWithRetractAccumulator[Double]] = + new DoubleSumWithRetractAggFunction } -class DecimalSumWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecimal] { +class DecimalSumWithRetractAggFunctionTest + extends AggFunctionTestBase[BigDecimal, DecimalSumWithRetractAccumulator] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq( @@ -141,7 +151,10 @@ class DecimalSumWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecima null ) - override def aggregator: AggregateFunction[BigDecimal] = new DecimalSumWithRetractAggFunction() + override def aggregator: AggregateFunction[BigDecimal, DecimalSumWithRetractAccumulator] = + new DecimalSumWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala index 16c493e..5e3e995 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala @@ -51,7 +51,7 @@ class BoundedProcessingOverRangeProcessFunctionTest { val aggregates = Array(new LongMinWithRetractAggFunction, - new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_]]] + new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]] val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates) val funcCode = @@ -71,9 +71,9 @@ class BoundedProcessingOverRangeProcessFunctionTest { | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + - | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbkDcXxs1apkP" + + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" + | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + - | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uSa3YqbzCL3QCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + + | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); | @@ -81,9 +81,9 @@ class BoundedProcessingOverRangeProcessFunctionTest { | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + - | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbu4_w_gPePlO" + + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" + | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + - | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uSa3YqbzCL3QCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + + | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); | } @@ -95,12 +95,14 @@ class BoundedProcessingOverRangeProcessFunctionTest { | org.apache.flink.table.functions.AggregateFunction baseClass0 = | (org.apache.flink.table.functions.AggregateFunction) fmin; | output.setField(5, baseClass0.getValue( - | (org.apache.flink.table.functions.Accumulator) accs.getField(0))); + | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0))); | | org.apache.flink.table.functions.AggregateFunction baseClass1 = | (org.apache.flink.table.functions.AggregateFunction) fmax; | output.setField(6, baseClass1.getValue( - | (org.apache.flink.table.functions.Accumulator) accs.getField(1))); + | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1))); | } | | public void accumulate( @@ -108,11 +110,13 @@ class BoundedProcessingOverRangeProcessFunctionTest { | org.apache.flink.types.Row input) { | | fmin.accumulate( - | ((org.apache.flink.table.functions.Accumulator) accs.getField(0)), + | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0)), | (java.lang.Long) input.getField(4)); | | fmax.accumulate( - | ((org.apache.flink.table.functions.Accumulator) accs.getField(1)), + | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1)), | (java.lang.Long) input.getField(4)); | } | @@ -121,11 +125,13 @@ class BoundedProcessingOverRangeProcessFunctionTest { | org.apache.flink.types.Row input) { | | fmin.retract( - | ((org.apache.flink.table.functions.Accumulator) accs.getField(0)), + | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0)), | (java.lang.Long) input.getField(4)); | | fmax.retract( - | ((org.apache.flink.table.functions.Accumulator) accs.getField(1)), + | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1)), | (java.lang.Long) input.getField(4)); | } |