Repository: flink
Updated Branches:
  refs/heads/master fe018921e -> bc6409d62


http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
index a388acf..fd510ff 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.functions.AggregateFunction
   *
   * @tparam T the type for the aggregation result
   */
-abstract class AvgAggFunctionTestBase[T: Numeric] extends 
AggFunctionTestBase[T] {
+abstract class AvgAggFunctionTestBase[T: Numeric, ACC] extends 
AggFunctionTestBase[T, ACC] {
 
   private val numeric: Numeric[T] = implicitly[Numeric[T]]
 
@@ -91,9 +91,11 @@ abstract class AvgAggFunctionTestBase[T: Numeric] extends 
AggFunctionTestBase[T]
     numeric.fromInt(3),
     null.asInstanceOf[T]
   )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
-class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte] {
+class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte, 
IntegralAvgAccumulator] {
 
   override def minVal = (Byte.MinValue + 1).toByte
 
@@ -102,7 +104,7 @@ class ByteAvgAggFunctionTest extends 
AvgAggFunctionTestBase[Byte] {
   override def aggregator = new ByteAvgAggFunction()
 }
 
-class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short] {
+class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short, 
IntegralAvgAccumulator] {
 
   override def minVal = (Short.MinValue + 1).toShort
 
@@ -111,7 +113,7 @@ class ShortAvgAggFunctionTest extends 
AvgAggFunctionTestBase[Short] {
   override def aggregator = new ShortAvgAggFunction()
 }
 
-class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int] {
+class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int, 
IntegralAvgAccumulator] {
 
   override def minVal = Int.MinValue + 1
 
@@ -120,7 +122,7 @@ class IntAvgAggFunctionTest extends 
AvgAggFunctionTestBase[Int] {
   override def aggregator = new IntAvgAggFunction()
 }
 
-class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long] {
+class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long, 
BigIntegralAvgAccumulator] {
 
   override def minVal = Long.MinValue + 1
 
@@ -129,7 +131,7 @@ class LongAvgAggFunctionTest extends 
AvgAggFunctionTestBase[Long] {
   override def aggregator = new LongAvgAggFunction()
 }
 
-class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float] {
+class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float, 
FloatingAvgAccumulator] {
 
   override def minVal = Float.MinValue
 
@@ -138,7 +140,7 @@ class FloatAvgAggFunctionTest extends 
AvgAggFunctionTestBase[Float] {
   override def aggregator = new FloatAvgAggFunction()
 }
 
-class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double] {
+class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double, 
FloatingAvgAccumulator] {
 
   override def minVal = Float.MinValue
 
@@ -147,7 +149,7 @@ class DoubleAvgAggFunctionTest extends 
AvgAggFunctionTestBase[Double] {
   override def aggregator = new DoubleAvgAggFunction()
 }
 
-class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal, 
DecimalAvgAccumulator] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -182,5 +184,8 @@ class DecimalAvgAggFunctionTest extends 
AggFunctionTestBase[BigDecimal] {
     null
   )
 
-  override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalAvgAggFunction()
+  override def aggregator: AggregateFunction[BigDecimal, 
DecimalAvgAccumulator] =
+    new DecimalAvgAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
index d5f09b2..6830b8f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.functions.AggregateFunction
 /**
   * Test case for built-in count aggregate function
   */
-class CountAggFunctionTest extends AggFunctionTestBase[Long] {
+class CountAggFunctionTest extends AggFunctionTestBase[Long, CountAccumulator] 
{
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq("a", "b", null, "c", null, "d", "e", null, "f"),
@@ -32,5 +32,7 @@ class CountAggFunctionTest extends AggFunctionTestBase[Long] {
 
   override def expectedResults: Seq[Long] = Seq(6L, 0L)
 
-  override def aggregator: AggregateFunction[Long] = new CountAggFunction()
+  override def aggregator: AggregateFunction[Long, CountAccumulator] = new 
CountAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
index 38ea993..54c2a7d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.functions.AggregateFunction
   *
   * @tparam T the type for the aggregation result
   */
-abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
+abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, 
MaxAccumulator[T]] {
 
   private val numeric: Numeric[T] = implicitly[Numeric[T]]
 
@@ -61,8 +61,6 @@ abstract class MaxAggFunctionTest[T: Numeric] extends 
AggFunctionTestBase[T] {
     maxVal,
     null.asInstanceOf[T]
   )
-
-  override def supportRetraction: Boolean = false
 }
 
 class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] {
@@ -71,7 +69,8 @@ class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] 
{
 
   override def maxVal = (Byte.MaxValue - 1).toByte
 
-  override def aggregator: AggregateFunction[Byte] = new ByteMaxAggFunction()
+  override def aggregator: AggregateFunction[Byte, MaxAccumulator[Byte]] =
+    new ByteMaxAggFunction()
 }
 
 class ShortMaxAggFunctionTest extends MaxAggFunctionTest[Short] {
@@ -80,7 +79,8 @@ class ShortMaxAggFunctionTest extends 
MaxAggFunctionTest[Short] {
 
   override def maxVal = (Short.MaxValue - 1).toShort
 
-  override def aggregator: AggregateFunction[Short] = new ShortMaxAggFunction()
+  override def aggregator: AggregateFunction[Short, MaxAccumulator[Short]] =
+    new ShortMaxAggFunction()
 }
 
 class IntMaxAggFunctionTest extends MaxAggFunctionTest[Int] {
@@ -89,7 +89,8 @@ class IntMaxAggFunctionTest extends MaxAggFunctionTest[Int] {
 
   override def maxVal = Int.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Int] = new IntMaxAggFunction()
+  override def aggregator: AggregateFunction[Int, MaxAccumulator[Int]] =
+    new IntMaxAggFunction()
 }
 
 class LongMaxAggFunctionTest extends MaxAggFunctionTest[Long] {
@@ -98,7 +99,8 @@ class LongMaxAggFunctionTest extends MaxAggFunctionTest[Long] 
{
 
   override def maxVal = Long.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Long] = new LongMaxAggFunction()
+  override def aggregator: AggregateFunction[Long, MaxAccumulator[Long]] =
+    new LongMaxAggFunction()
 }
 
 class FloatMaxAggFunctionTest extends MaxAggFunctionTest[Float] {
@@ -107,7 +109,8 @@ class FloatMaxAggFunctionTest extends 
MaxAggFunctionTest[Float] {
 
   override def maxVal = Float.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Float] = new FloatMaxAggFunction()
+  override def aggregator: AggregateFunction[Float, MaxAccumulator[Float]] =
+    new FloatMaxAggFunction()
 }
 
 class DoubleMaxAggFunctionTest extends MaxAggFunctionTest[Double] {
@@ -116,10 +119,11 @@ class DoubleMaxAggFunctionTest extends 
MaxAggFunctionTest[Double] {
 
   override def maxVal = Double.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Double] = new 
DoubleMaxAggFunction()
+  override def aggregator: AggregateFunction[Double, MaxAccumulator[Double]] =
+    new DoubleMaxAggFunction()
 }
 
-class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean] {
+class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean, 
MaxAccumulator[Boolean]] {
 
   override def inputValueSets: Seq[Seq[Boolean]] = Seq(
     Seq(
@@ -155,12 +159,12 @@ class BooleanMaxAggFunctionTest extends 
AggFunctionTestBase[Boolean] {
     null.asInstanceOf[Boolean]
   )
 
-  override def aggregator: AggregateFunction[Boolean] = new 
BooleanMaxAggFunction()
-
-  override def supportRetraction: Boolean = false
+  override def aggregator: AggregateFunction[Boolean, MaxAccumulator[Boolean]] 
=
+    new BooleanMaxAggFunction()
 }
 
-class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+class DecimalMaxAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, MaxAccumulator[BigDecimal]] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -188,12 +192,11 @@ class DecimalMaxAggFunctionTest extends 
AggFunctionTestBase[BigDecimal] {
     null
   )
 
-  override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalMaxAggFunction()
-
-  override def supportRetraction: Boolean = false
+  override def aggregator: AggregateFunction[BigDecimal, 
MaxAccumulator[BigDecimal]] =
+    new DecimalMaxAggFunction()
 }
 
-class StringMaxAggFunctionTest extends AggFunctionTestBase[String] {
+class StringMaxAggFunctionTest extends AggFunctionTestBase[String, 
MaxAccumulator[String]] {
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
       new String("a"),
@@ -221,7 +224,6 @@ class StringMaxAggFunctionTest extends 
AggFunctionTestBase[String] {
     new String("household")
   )
 
-  override def aggregator: AggregateFunction[String] = new 
StringMaxAggFunction()
-
-  override def supportRetraction: Boolean = false
+  override def aggregator: AggregateFunction[String, MaxAccumulator[String]] =
+    new StringMaxAggFunction()
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
index 8bd5c6f..c11ae41 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
@@ -25,7 +25,8 @@ import org.apache.flink.table.functions.AggregateFunction
   *
   * @tparam T the type for the aggregation result
   */
-abstract class MaxWithRetractAggFunctionTest[T: Numeric] extends 
AggFunctionTestBase[T] {
+abstract class MaxWithRetractAggFunctionTest[T: Numeric]
+  extends AggFunctionTestBase[T, MaxWithRetractAccumulator[T]] {
 
   private val numeric: Numeric[T] = implicitly[Numeric[T]]
 
@@ -61,6 +62,8 @@ abstract class MaxWithRetractAggFunctionTest[T: Numeric] 
extends AggFunctionTest
     maxVal,
     null.asInstanceOf[T]
   )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
 class ByteMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Byte] {
@@ -69,7 +72,8 @@ class ByteMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[By
 
   override def maxVal = (Byte.MaxValue - 1).toByte
 
-  override def aggregator: AggregateFunction[Byte] = new 
ByteMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Byte, 
MaxWithRetractAccumulator[Byte]] =
+    new ByteMaxWithRetractAggFunction()
 }
 
 class ShortMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Short] {
@@ -78,7 +82,8 @@ class ShortMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[S
 
   override def maxVal = (Short.MaxValue - 1).toShort
 
-  override def aggregator: AggregateFunction[Short] = new 
ShortMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Short, 
MaxWithRetractAccumulator[Short]] =
+    new ShortMaxWithRetractAggFunction()
 }
 
 class IntMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Int] {
@@ -87,7 +92,8 @@ class IntMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Int
 
   override def maxVal = Int.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Int] = new 
IntMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Int, 
MaxWithRetractAccumulator[Int]] =
+    new IntMaxWithRetractAggFunction()
 }
 
 class LongMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Long] {
@@ -96,7 +102,8 @@ class LongMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Lo
 
   override def maxVal = Long.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Long] = new 
LongMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Long, 
MaxWithRetractAccumulator[Long]] =
+    new LongMaxWithRetractAggFunction()
 }
 
 class FloatMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Float] {
@@ -105,7 +112,8 @@ class FloatMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[F
 
   override def maxVal = Float.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Float] = new 
FloatMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Float, 
MaxWithRetractAccumulator[Float]] =
+    new FloatMaxWithRetractAggFunction()
 }
 
 class DoubleMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[Double] {
@@ -114,10 +122,12 @@ class DoubleMaxWithRetractAggFunctionTest extends 
MaxWithRetractAggFunctionTest[
 
   override def maxVal = Double.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Double] = new 
DoubleMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Double, 
MaxWithRetractAccumulator[Double]] =
+    new DoubleMaxWithRetractAggFunction()
 }
 
-class BooleanMaxWithRetractAggFunctionTest extends 
AggFunctionTestBase[Boolean] {
+class BooleanMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Boolean, MaxWithRetractAccumulator[Boolean]] {
 
   override def inputValueSets: Seq[Seq[Boolean]] = Seq(
     Seq(
@@ -153,10 +163,14 @@ class BooleanMaxWithRetractAggFunctionTest extends 
AggFunctionTestBase[Boolean]
     null.asInstanceOf[Boolean]
   )
 
-  override def aggregator: AggregateFunction[Boolean] = new 
BooleanMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Boolean, 
MaxWithRetractAccumulator[Boolean]] =
+    new BooleanMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
-class DecimalMaxWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecimal] {
+class DecimalMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, 
MaxWithRetractAccumulator[BigDecimal]] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -184,10 +198,14 @@ class DecimalMaxWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecima
     null
   )
 
-  override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[BigDecimal, 
MaxWithRetractAccumulator[BigDecimal]] =
+    new DecimalMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
-class StringMaxWithRetractAggFunctionTest extends AggFunctionTestBase[String] {
+class StringMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[String, MaxWithRetractAccumulator[String]] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -216,5 +234,8 @@ class StringMaxWithRetractAggFunctionTest extends 
AggFunctionTestBase[String] {
     "x"
   )
 
-  override def aggregator: AggregateFunction[String] = new 
StringMaxWithRetractAggFunction()
+  override def aggregator: AggregateFunction[String, 
MaxWithRetractAccumulator[String]] =
+    new StringMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
index 84e541a..6f41bd2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.functions.AggregateFunction
   *
   * @tparam T the type for the aggregation result
   */
-abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
+abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, 
MinAccumulator[T]] {
 
   private val numeric: Numeric[T] = implicitly[Numeric[T]]
 
@@ -61,8 +61,6 @@ abstract class MinAggFunctionTest[T: Numeric] extends 
AggFunctionTestBase[T] {
     minVal,
     null.asInstanceOf[T]
   )
-
-  override def supportRetraction: Boolean = false
 }
 
 class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] {
@@ -71,7 +69,8 @@ class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] 
{
 
   override def maxVal = (Byte.MaxValue - 1).toByte
 
-  override def aggregator: AggregateFunction[Byte] = new ByteMinAggFunction()
+  override def aggregator: AggregateFunction[Byte, MinAccumulator[Byte]] =
+    new ByteMinAggFunction()
 }
 
 class ShortMinAggFunctionTest extends MinAggFunctionTest[Short] {
@@ -80,7 +79,8 @@ class ShortMinAggFunctionTest extends 
MinAggFunctionTest[Short] {
 
   override def maxVal = (Short.MaxValue - 1).toShort
 
-  override def aggregator: AggregateFunction[Short] = new ShortMinAggFunction()
+  override def aggregator: AggregateFunction[Short, MinAccumulator[Short]] =
+    new ShortMinAggFunction()
 }
 
 class IntMinAggFunctionTest extends MinAggFunctionTest[Int] {
@@ -89,7 +89,8 @@ class IntMinAggFunctionTest extends MinAggFunctionTest[Int] {
 
   override def maxVal = Int.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Int] = new IntMinAggFunction()
+  override def aggregator: AggregateFunction[Int, MinAccumulator[Int]] =
+    new IntMinAggFunction()
 }
 
 class LongMinAggFunctionTest extends MinAggFunctionTest[Long] {
@@ -98,7 +99,8 @@ class LongMinAggFunctionTest extends MinAggFunctionTest[Long] 
{
 
   override def maxVal = Long.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Long] = new LongMinAggFunction()
+  override def aggregator: AggregateFunction[Long, MinAccumulator[Long]] =
+    new LongMinAggFunction()
 }
 
 class FloatMinAggFunctionTest extends MinAggFunctionTest[Float] {
@@ -107,7 +109,8 @@ class FloatMinAggFunctionTest extends 
MinAggFunctionTest[Float] {
 
   override def maxVal = Float.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Float] = new FloatMinAggFunction()
+  override def aggregator: AggregateFunction[Float, MinAccumulator[Float]] =
+    new FloatMinAggFunction()
 }
 
 class DoubleMinAggFunctionTest extends MinAggFunctionTest[Double] {
@@ -116,10 +119,11 @@ class DoubleMinAggFunctionTest extends 
MinAggFunctionTest[Double] {
 
   override def maxVal = Double.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Double] = new 
DoubleMinAggFunction()
+  override def aggregator: AggregateFunction[Double, MinAccumulator[Double]] =
+    new DoubleMinAggFunction()
 }
 
-class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean] {
+class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean, 
MinAccumulator[Boolean]] {
 
   override def inputValueSets: Seq[Seq[Boolean]] = Seq(
     Seq(
@@ -155,12 +159,12 @@ class BooleanMinAggFunctionTest extends 
AggFunctionTestBase[Boolean] {
     null.asInstanceOf[Boolean]
   )
 
-  override def aggregator: AggregateFunction[Boolean] = new 
BooleanMinAggFunction()
-
-  override def supportRetraction: Boolean = false
+  override def aggregator: AggregateFunction[Boolean, MinAccumulator[Boolean]] 
=
+    new BooleanMinAggFunction()
 }
 
-class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+class DecimalMinAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, MinAccumulator[BigDecimal]] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -188,12 +192,12 @@ class DecimalMinAggFunctionTest extends 
AggFunctionTestBase[BigDecimal] {
     null
   )
 
-  override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalMinAggFunction()
-
-  override def supportRetraction: Boolean = false
+  override def aggregator: AggregateFunction[BigDecimal, 
MinAccumulator[BigDecimal]] =
+    new DecimalMinAggFunction()
 }
 
-class StringMinAggFunctionTest extends AggFunctionTestBase[String] {
+class StringMinAggFunctionTest
+  extends AggFunctionTestBase[String, MinAccumulator[String]] {
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
       new String("a"),
@@ -221,7 +225,6 @@ class StringMinAggFunctionTest extends 
AggFunctionTestBase[String] {
     new String("1House")
   )
 
-  override def aggregator: AggregateFunction[String] = new 
StringMinAggFunction()
-
-  override def supportRetraction: Boolean = false
+  override def aggregator: AggregateFunction[String, MinAccumulator[String]] =
+    new StringMinAggFunction()
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
index 9da3828..e13e69b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
@@ -25,7 +25,8 @@ import org.apache.flink.table.functions.AggregateFunction
   *
   * @tparam T the type for the aggregation result
   */
-abstract class MinWithRetractAggFunctionTest[T: Numeric] extends 
AggFunctionTestBase[T] {
+abstract class MinWithRetractAggFunctionTest[T: Numeric]
+  extends AggFunctionTestBase[T, MinWithRetractAccumulator[T]] {
 
   private val numeric: Numeric[T] = implicitly[Numeric[T]]
 
@@ -61,6 +62,8 @@ abstract class MinWithRetractAggFunctionTest[T: Numeric] 
extends AggFunctionTest
     minVal,
     null.asInstanceOf[T]
   )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
 class ByteMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Byte] {
@@ -69,7 +72,8 @@ class ByteMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[By
 
   override def maxVal = (Byte.MaxValue - 1).toByte
 
-  override def aggregator: AggregateFunction[Byte] = new 
ByteMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Byte, 
MinWithRetractAccumulator[Byte]] =
+    new ByteMinWithRetractAggFunction()
 }
 
 class ShortMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Short] {
@@ -78,7 +82,8 @@ class ShortMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[S
 
   override def maxVal = (Short.MaxValue - 1).toShort
 
-  override def aggregator: AggregateFunction[Short] = new 
ShortMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Short, 
MinWithRetractAccumulator[Short]] =
+    new ShortMinWithRetractAggFunction()
 }
 
 class IntMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Int] {
@@ -87,7 +92,8 @@ class IntMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Int
 
   override def maxVal = Int.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Int] = new 
IntMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Int, 
MinWithRetractAccumulator[Int]] =
+    new IntMinWithRetractAggFunction()
 }
 
 class LongMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Long] {
@@ -96,7 +102,8 @@ class LongMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Lo
 
   override def maxVal = Long.MaxValue - 1
 
-  override def aggregator: AggregateFunction[Long] = new 
LongMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Long, 
MinWithRetractAccumulator[Long]] =
+    new LongMinWithRetractAggFunction()
 }
 
 class FloatMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Float] {
@@ -105,7 +112,8 @@ class FloatMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[F
 
   override def maxVal = Float.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Float] = new 
FloatMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Float, 
MinWithRetractAccumulator[Float]] =
+    new FloatMinWithRetractAggFunction()
 }
 
 class DoubleMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[Double] {
@@ -114,10 +122,12 @@ class DoubleMinWithRetractAggFunctionTest extends 
MinWithRetractAggFunctionTest[
 
   override def maxVal = Double.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Double] = new 
DoubleMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Double, 
MinWithRetractAccumulator[Double]] =
+    new DoubleMinWithRetractAggFunction()
 }
 
-class BooleanMinWithRetractAggFunctionTest extends 
AggFunctionTestBase[Boolean] {
+class BooleanMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Boolean, MinWithRetractAccumulator[Boolean]] {
 
   override def inputValueSets: Seq[Seq[Boolean]] = Seq(
     Seq(
@@ -153,10 +163,14 @@ class BooleanMinWithRetractAggFunctionTest extends 
AggFunctionTestBase[Boolean]
     null.asInstanceOf[Boolean]
   )
 
-  override def aggregator: AggregateFunction[Boolean] = new 
BooleanMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[Boolean, 
MinWithRetractAccumulator[Boolean]] =
+    new BooleanMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
-class DecimalMinWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecimal] {
+class DecimalMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, 
MinWithRetractAccumulator[BigDecimal]] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -184,10 +198,14 @@ class DecimalMinWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecima
     null
   )
 
-  override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[BigDecimal, 
MinWithRetractAccumulator[BigDecimal]] =
+    new DecimalMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
-class StringMinWithRetractAggFunctionTest extends AggFunctionTestBase[String] {
+class StringMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[String, MinWithRetractAccumulator[String]] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -216,5 +234,8 @@ class StringMinWithRetractAggFunctionTest extends 
AggFunctionTestBase[String] {
     "e"
   )
 
-  override def aggregator: AggregateFunction[String] = new 
StringMinWithRetractAggFunction()
+  override def aggregator: AggregateFunction[String, 
MinWithRetractAccumulator[String]] =
+    new StringMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
index cd69187..6816a5e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
@@ -26,7 +26,8 @@ import org.apache.flink.table.functions.AggregateFunction
   *
   * @tparam T the type for the aggregation result
   */
-abstract class SumAggFunctionTestBase[T: Numeric] extends 
AggFunctionTestBase[T] {
+abstract class SumAggFunctionTestBase[T: Numeric]
+  extends AggFunctionTestBase[T, SumAccumulator[T]] {
 
   private val numeric: Numeric[T] = implicitly[Numeric[T]]
 
@@ -63,54 +64,59 @@ abstract class SumAggFunctionTestBase[T: Numeric] extends 
AggFunctionTestBase[T]
     numeric.fromInt(2),
     null.asInstanceOf[T]
   )
-
-  override def supportRetraction: Boolean = false
 }
 
 class ByteSumAggFunctionTest extends SumAggFunctionTestBase[Byte] {
 
   override def maxVal = (Byte.MaxValue / 2).toByte
 
-  override def aggregator: AggregateFunction[Byte] = new ByteSumAggFunction
+  override def aggregator: AggregateFunction[Byte, SumAccumulator[Byte]] =
+    new ByteSumAggFunction
 }
 
 class ShortSumAggFunctionTest extends SumAggFunctionTestBase[Short] {
 
   override def maxVal = (Short.MaxValue / 2).toShort
 
-  override def aggregator: AggregateFunction[Short] = new ShortSumAggFunction
+  override def aggregator: AggregateFunction[Short, SumAccumulator[Short]] =
+    new ShortSumAggFunction
 }
 
 class IntSumAggFunctionTest extends SumAggFunctionTestBase[Int] {
 
   override def maxVal = Int.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Int] = new IntSumAggFunction
+  override def aggregator: AggregateFunction[Int, SumAccumulator[Int]] =
+    new IntSumAggFunction
 }
 
 class LongSumAggFunctionTest extends SumAggFunctionTestBase[Long] {
 
   override def maxVal = Long.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Long] = new LongSumAggFunction
+  override def aggregator: AggregateFunction[Long, SumAccumulator[Long]] =
+    new LongSumAggFunction
 }
 
 class FloatSumAggFunctionTest extends SumAggFunctionTestBase[Float] {
 
   override def maxVal = 12345.6789f
 
-  override def aggregator: AggregateFunction[Float] = new FloatSumAggFunction
+  override def aggregator: AggregateFunction[Float, SumAccumulator[Float]] =
+    new FloatSumAggFunction
 }
 
 class DoubleSumAggFunctionTest extends SumAggFunctionTestBase[Double] {
 
   override def maxVal = 12345.6789d
 
-  override def aggregator: AggregateFunction[Double] = new DoubleSumAggFunction
+  override def aggregator: AggregateFunction[Double, SumAccumulator[Double]] =
+    new DoubleSumAggFunction
 }
 
 
-class DecimalSumAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+class DecimalSumAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, DecimalSumAccumulator] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -143,9 +149,8 @@ class DecimalSumAggFunctionTest extends 
AggFunctionTestBase[BigDecimal] {
     null
   )
 
-  override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalSumAggFunction()
-
-  override def supportRetraction: Boolean = false
+  override def aggregator: AggregateFunction[BigDecimal, 
DecimalSumAccumulator] =
+    new DecimalSumAggFunction()
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
index 72af358..8b6daba 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
@@ -26,7 +26,8 @@ import org.apache.flink.table.functions.AggregateFunction
   *
   * @tparam T the type for the aggregation result
   */
-abstract class SumWithRetractAggFunctionTestBase[T: Numeric] extends 
AggFunctionTestBase[T] {
+abstract class SumWithRetractAggFunctionTestBase[T: Numeric]
+  extends AggFunctionTestBase[T, SumWithRetractAccumulator[T]] {
 
   private val numeric: Numeric[T] = implicitly[Numeric[T]]
 
@@ -63,52 +64,61 @@ abstract class SumWithRetractAggFunctionTestBase[T: 
Numeric] extends AggFunction
     numeric.fromInt(2),
     null.asInstanceOf[T]
   )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
 class ByteSumWithRetractAggFunctionTest extends 
SumWithRetractAggFunctionTestBase[Byte] {
 
   override def maxVal = (Byte.MaxValue / 2).toByte
 
-  override def aggregator: AggregateFunction[Byte] = new 
ByteSumWithRetractAggFunction
+  override def aggregator: AggregateFunction[Byte, 
SumWithRetractAccumulator[Byte]] =
+    new ByteSumWithRetractAggFunction
 }
 
 class ShortSumWithRetractAggFunctionTest extends 
SumWithRetractAggFunctionTestBase[Short] {
 
   override def maxVal = (Short.MaxValue / 2).toShort
 
-  override def aggregator: AggregateFunction[Short] = new 
ShortSumWithRetractAggFunction
+  override def aggregator: AggregateFunction[Short, 
SumWithRetractAccumulator[Short]] =
+    new ShortSumWithRetractAggFunction
 }
 
 class IntSumWithRetractAggFunctionTest extends 
SumWithRetractAggFunctionTestBase[Int] {
 
   override def maxVal = Int.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Int] = new 
IntSumWithRetractAggFunction
+  override def aggregator: AggregateFunction[Int, 
SumWithRetractAccumulator[Int]] =
+    new IntSumWithRetractAggFunction
 }
 
 class LongSumWithRetractAggFunctionTest extends 
SumWithRetractAggFunctionTestBase[Long] {
 
   override def maxVal = Long.MaxValue / 2
 
-  override def aggregator: AggregateFunction[Long] = new 
LongSumWithRetractAggFunction
+  override def aggregator: AggregateFunction[Long, 
SumWithRetractAccumulator[Long]] =
+    new LongSumWithRetractAggFunction
 }
 
 class FloatSumWithRetractAggFunctionTest extends 
SumWithRetractAggFunctionTestBase[Float] {
 
   override def maxVal = 12345.6789f
 
-  override def aggregator: AggregateFunction[Float] = new 
FloatSumWithRetractAggFunction
+  override def aggregator: AggregateFunction[Float, 
SumWithRetractAccumulator[Float]] =
+    new FloatSumWithRetractAggFunction
 }
 
 class DoubleSumWithRetractAggFunctionTest extends 
SumWithRetractAggFunctionTestBase[Double] {
 
   override def maxVal = 12345.6789d
 
-  override def aggregator: AggregateFunction[Double] = new 
DoubleSumWithRetractAggFunction
+  override def aggregator: AggregateFunction[Double, 
SumWithRetractAccumulator[Double]] =
+    new DoubleSumWithRetractAggFunction
 }
 
 
-class DecimalSumWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecimal] {
+class DecimalSumWithRetractAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, DecimalSumWithRetractAccumulator] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
     Seq(
@@ -141,7 +151,10 @@ class DecimalSumWithRetractAggFunctionTest extends 
AggFunctionTestBase[BigDecima
     null
   )
 
-  override def aggregator: AggregateFunction[BigDecimal] = new 
DecimalSumWithRetractAggFunction()
+  override def aggregator: AggregateFunction[BigDecimal, 
DecimalSumWithRetractAccumulator] =
+    new DecimalSumWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, 
classOf[Any])
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc6409d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
index 16c493e..5e3e995 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
@@ -51,7 +51,7 @@ class BoundedProcessingOverRangeProcessFunctionTest {
 
     val aggregates =
       Array(new LongMinWithRetractAggFunction,
-            new 
LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_]]]
+            new 
LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
     val aggregationStateType: RowTypeInfo = 
AggregateUtil.createAccumulatorRowType(aggregates)
 
     val funcCode =
@@ -71,9 +71,9 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
         |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
         |    
"MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
-        |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbkDcXxs1apkP"
 +
+        |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S"
 +
         |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
-        |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uSa3YqbzCL3QCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
+        |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
         |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
         |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
         |
@@ -81,9 +81,9 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
         |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
         |    
"MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
-        |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbu4_w_gPePlO"
 +
+        |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf"
 +
         |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
-        |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uSa3YqbzCL3QCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
+        |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
         |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
         |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
         |  }
@@ -95,12 +95,14 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
         |      (org.apache.flink.table.functions.AggregateFunction) fmin;
         |    output.setField(5, baseClass0.getValue(
-        |      (org.apache.flink.table.functions.Accumulator) 
accs.getField(0)));
+        |      
(org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+        |      accs.getField(0)));
         |
         |    org.apache.flink.table.functions.AggregateFunction baseClass1 =
         |      (org.apache.flink.table.functions.AggregateFunction) fmax;
         |    output.setField(6, baseClass1.getValue(
-        |      (org.apache.flink.table.functions.Accumulator) 
accs.getField(1)));
+        |      
(org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+        |      accs.getField(1)));
         |  }
         |
         |  public void accumulate(
@@ -108,11 +110,13 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |    org.apache.flink.types.Row input) {
         |
         |    fmin.accumulate(
-        |      ((org.apache.flink.table.functions.Accumulator) 
accs.getField(0)),
+        |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+        |      accs.getField(0)),
         |      (java.lang.Long) input.getField(4));
         |
         |    fmax.accumulate(
-        |      ((org.apache.flink.table.functions.Accumulator) 
accs.getField(1)),
+        |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+        |      accs.getField(1)),
         |      (java.lang.Long) input.getField(4));
         |  }
         |
@@ -121,11 +125,13 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |    org.apache.flink.types.Row input) {
         |
         |    fmin.retract(
-        |      ((org.apache.flink.table.functions.Accumulator) 
accs.getField(0)),
+        |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+        |      accs.getField(0)),
         |      (java.lang.Long) input.getField(4));
         |
         |    fmax.retract(
-        |      ((org.apache.flink.table.functions.Accumulator) 
accs.getField(1)),
+        |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+        |      accs.getField(1)),
         |      (java.lang.Long) input.getField(4));
         |  }
         |

Reply via email to