[FLINK-3226] implement GroupReduce translation; enable tests for supported 
operations

Squashes the following commits:
- Compute average as sum and count for byte, short and int type to avoid 
rounding errors
- Move aggregation functions to org.apache.flink.table.runtime
- Remove join-related changes
- Change integer average aggregations to maintain sum and count
- Long average uses a BigInteger sum


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18e7f2f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18e7f2f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18e7f2f4

Branch: refs/heads/tableOnCalcite
Commit: 18e7f2f4e0654ad2f00dcf7d1351846345accb13
Parents: 6dd2d77
Author: vasia <va...@apache.org>
Authored: Thu Feb 4 15:53:52 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Feb 12 11:34:09 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/plan/TypeConverter.scala    |   2 +-
 .../plan/functions/AggregateFunction.scala      |  71 ---------
 .../table/plan/functions/FunctionUtils.scala    |  37 -----
 .../plan/functions/aggregate/Aggregate.scala    |  42 ------
 .../functions/aggregate/AggregateFactory.scala  | 135 -----------------
 .../plan/functions/aggregate/AvgAggregate.scala | 148 -------------------
 .../functions/aggregate/CountAggregate.scala    |  34 -----
 .../plan/functions/aggregate/MaxAggregate.scala | 136 -----------------
 .../plan/functions/aggregate/MinAggregate.scala | 136 -----------------
 .../plan/functions/aggregate/SumAggregate.scala | 130 ----------------
 .../plan/nodes/dataset/DataSetGroupReduce.scala |  30 +++-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   6 +-
 .../plan/nodes/logical/FlinkAggregate.scala     |  16 --
 .../api/table/plan/rules/FlinkRuleSets.scala    |   3 +-
 .../rules/dataset/DataSetAggregateRule.scala    |  13 +-
 .../plan/rules/dataset/DataSetJoinRule.scala    | 102 +------------
 .../api/table/runtime/AggregateFunction.scala   |  76 ++++++++++
 .../api/table/runtime/aggregate/Aggregate.scala |  42 ++++++
 .../runtime/aggregate/AggregateFactory.scala    | 136 +++++++++++++++++
 .../table/runtime/aggregate/AvgAggregate.scala  | 131 ++++++++++++++++
 .../runtime/aggregate/CountAggregate.scala      |  34 +++++
 .../table/runtime/aggregate/MaxAggregate.scala  |  84 +++++++++++
 .../table/runtime/aggregate/MinAggregate.scala  |  86 +++++++++++
 .../table/runtime/aggregate/SumAggregate.scala  |  50 +++++++
 .../api/java/table/test/AggregationsITCase.java |  13 +-
 .../api/java/table/test/ExpressionsITCase.java  |   2 -
 .../flink/api/java/table/test/FilterITCase.java |   2 -
 .../table/test/GroupedAggregationsITCase.java   |   6 +-
 .../flink/api/java/table/test/SelectITCase.java |   2 -
 .../flink/api/java/table/test/UnionITCase.java  |   1 -
 .../scala/table/test/AggregationsITCase.scala   |  11 +-
 .../scala/table/test/ExpressionsITCase.scala    |   1 -
 .../table/test/GroupedAggregationsITCase.scala  |   6 +-
 33 files changed, 704 insertions(+), 1020 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index b7cb200..1fc482a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -135,7 +135,7 @@ object TypeConverter {
             logicalFieldTypes.head
           }
           else {
-            new TupleTypeInfo[Any](logicalFieldTypes.toArray:_*)
+            new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
           }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
deleted file mode 100644
index 4abf2d2..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions
-
-import java.lang.Iterable
-
-import com.google.common.base.Preconditions
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-
-import scala.collection.JavaConversions._
-
-/**
- * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped 
data as input,
- * feed to the aggregates, and collect the record with aggregated value.
- *
- * @param aggregates Sql aggregate functions.
- * @param fields  The grouped keys' index.
- */
-class AggregateFunction(
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {
-
-  override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(fields)
-    Preconditions.checkArgument(aggregates.size == fields.size)
-
-    aggregates.foreach(_.initiateAggregate)
-  }
-
-  override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = {
-    var currentValue: Any = null
-
-    // iterate all input records, feed to each aggregate.
-    val aggregateAndField = aggregates.zip(fields)
-    records.foreach {
-      value =>
-        currentValue = value
-        aggregateAndField.foreach {
-          case (aggregate, field) =>
-            aggregate.aggregate(FunctionUtils.getFieldValue(value, field))
-        }
-    }
-
-    // reuse the latest record, and set all the aggregated values.
-    aggregateAndField.foreach {
-      case (aggregate, field) =>
-        FunctionUtils.putFieldValue(currentValue, field, 
aggregate.getAggregated())
-    }
-
-    out.collect(currentValue)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala
deleted file mode 100644
index 9d62b7c..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions
-
-import org.apache.flink.api.table.Row
-
-object FunctionUtils {
-
-  def getFieldValue(record: Any, fieldIndex: Int): Any = {
-    record match {
-      case row: Row => row.productElement(fieldIndex)
-      case _ => throw new UnsupportedOperationException("Do not support types 
other than Row now.")
-    }
-  }
-
-  def putFieldValue(record: Any, fieldIndex: Int, fieldValue: Any): Unit = {
-    record match {
-      case row: Row => row.setField(fieldIndex, fieldValue)
-      case _ => throw new UnsupportedOperationException("Do not support types 
other than Row now.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala
deleted file mode 100644
index 5800d5f..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions.aggregate
-
-/**
- * Represent a Sql aggregate function, user should initiate the aggregate at 
first, then feed it
- * with grouped aggregate field values, and get aggregated value finally.
- * @tparam T
- */
-trait Aggregate[T] {
-  /**
-   * Initiate current aggregate state.
-   */
-  def initiateAggregate
-
-  /**
-   * Feed the aggregate field value.
-   * @param value
-   */
-  def aggregate(value: Any)
-
-  /**
-   * Return final aggregated value.
-   * @return
-   */
-  def getAggregated(): T
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala
deleted file mode 100644
index a95a163..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions.aggregate
-
-import java.util
-
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.sql.SqlAggFunction
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.plan.functions.AggregateFunction
-
-object AggregateFactory {
-
-  def createAggregateInstance(aggregateCalls: Seq[AggregateCall]):
-    RichGroupReduceFunction[Any, Any] = {
-
-    val fieldIndexes = new Array[Int](aggregateCalls.size)
-    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
-    aggregateCalls.zipWithIndex.map { case (aggregateCall, index) =>
-      val sqlType = aggregateCall.getType
-      val argList: util.List[Integer] = aggregateCall.getArgList
-      // currently assume only aggregate on singleton field.
-      if (argList.isEmpty) {
-        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-          fieldIndexes(index) = 0
-        } else {
-          throw new PlanGenException("Aggregate fields should not be empty.")
-        }
-      } else {
-        fieldIndexes(index) = argList.get(0);
-      }
-      aggregateCall.getAggregation match {
-        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
-          sqlType.getSqlTypeName match {
-            case TINYINT =>
-              aggregates(index) = new TinyIntSumAggregate
-            case SMALLINT =>
-              aggregates(index) = new SmallIntSumAggregate
-            case INTEGER =>
-              aggregates(index) = new IntSumAggregate
-            case BIGINT =>
-              aggregates(index) = new LongSumAggregate
-            case FLOAT =>
-              aggregates(index) = new FloatSumAggregate
-            case DOUBLE =>
-              aggregates(index) = new DoubleSumAggregate
-            case sqlType: SqlTypeName =>
-              throw new PlanGenException("Sum aggregate does no support type:" 
+ sqlType)
-          }
-        }
-        case _: SqlAvgAggFunction => {
-          sqlType.getSqlTypeName match {
-            case TINYINT =>
-              aggregates(index) = new TinyIntAvgAggregate
-            case SMALLINT =>
-              aggregates(index) = new SmallIntAvgAggregate
-            case INTEGER =>
-              aggregates(index) = new IntAvgAggregate
-            case BIGINT =>
-              aggregates(index) = new LongAvgAggregate
-            case FLOAT =>
-              aggregates(index) = new FloatAvgAggregate
-            case DOUBLE =>
-              aggregates(index) = new DoubleAvgAggregate
-            case sqlType: SqlTypeName =>
-              throw new PlanGenException("Avg aggregate does no support type:" 
+ sqlType)
-          }
-        }
-        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
-          if (sqlMinMaxFunction.isMin) {
-            sqlType.getSqlTypeName match {
-              case TINYINT =>
-                aggregates(index) = new TinyIntMinAggregate
-              case SMALLINT =>
-                aggregates(index) = new SmallIntMinAggregate
-              case INTEGER =>
-                aggregates(index) = new IntMinAggregate
-              case BIGINT =>
-                aggregates(index) = new LongMinAggregate
-              case FLOAT =>
-                aggregates(index) = new FloatMinAggregate
-              case DOUBLE =>
-                aggregates(index) = new DoubleMinAggregate
-              case sqlType: SqlTypeName =>
-                throw new PlanGenException("Min aggregate does no support 
type:" + sqlType)
-            }
-          } else {
-            sqlType.getSqlTypeName match {
-              case TINYINT =>
-                aggregates(index) = new TinyIntMaxAggregate
-              case SMALLINT =>
-                aggregates(index) = new SmallIntMaxAggregate
-              case INTEGER =>
-                aggregates(index) = new IntMaxAggregate
-              case BIGINT =>
-                aggregates(index) = new LongMaxAggregate
-              case FLOAT =>
-                aggregates(index) = new FloatMaxAggregate
-              case DOUBLE =>
-                aggregates(index) = new DoubleMaxAggregate
-              case sqlType: SqlTypeName =>
-                throw new PlanGenException("Max aggregate does no support 
type:" + sqlType)
-            }
-          }
-        }
-        case _: SqlCountAggFunction =>
-          aggregates(index) = new CountAggregate
-        case unSupported: SqlAggFunction =>
-          throw new PlanGenException("unsupported Function: " + 
unSupported.getName)
-      }
-    }
-
-    new AggregateFunction(aggregates, fieldIndexes)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala
deleted file mode 100644
index e9c5f8f..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions.aggregate
-
-abstract class AvgAggregate[T] extends Aggregate[T] {
-
-}
-
-// TinyInt average aggregate return Int as aggregated value.
-class TinyIntAvgAggregate extends AvgAggregate[Int] {
-  private var avgValue: Double = 0
-  private var count: Int = 0
-
-  override def initiateAggregate: Unit = {
-    avgValue = 0
-    count = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    count += 1
-    val current = value.asInstanceOf[Byte]
-    avgValue += (current - avgValue) / count
-  }
-
-  override def getAggregated(): Int = {
-    avgValue.toInt
-  }
-}
-
-// SmallInt average aggregate return Int as aggregated value.
-class SmallIntAvgAggregate extends AvgAggregate[Int] {
-  private var avgValue: Double = 0
-  private var count: Int = 0
-
-  override def initiateAggregate: Unit = {
-    avgValue = 0
-    count = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    count += 1
-    val current = value.asInstanceOf[Short]
-    avgValue += (current - avgValue) / count
-  }
-
-  override def getAggregated(): Int = {
-    avgValue.toInt
-  }
-}
-
-// Int average aggregate return Int as aggregated value.
-class IntAvgAggregate extends AvgAggregate[Int] {
-  private var avgValue: Double = 0
-  private var count: Int = 0
-
-  override def initiateAggregate: Unit = {
-    avgValue = 0
-    count = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    count += 1
-    val current = value.asInstanceOf[Int]
-    avgValue += (current - avgValue) / count
-  }
-
-  override def getAggregated(): Int = {
-    avgValue.toInt
-  }
-}
-
-// Long average aggregate return Long as aggregated value.
-class LongAvgAggregate extends AvgAggregate[Long] {
-  private var avgValue: Double = 0
-  private var count: Int = 0
-
-  override def initiateAggregate: Unit = {
-    avgValue = 0
-    count = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    count += 1
-    val current = value.asInstanceOf[Long]
-    avgValue += (current - avgValue) / count
-  }
-
-  override def getAggregated(): Long = {
-    avgValue.toLong
-  }
-}
-
-// Float average aggregate return Float as aggregated value.
-class FloatAvgAggregate extends AvgAggregate[Float] {
-  private var avgValue: Double = 0
-  private var count: Int = 0
-
-  override def initiateAggregate: Unit = {
-    avgValue = 0
-    count = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    count += 1
-    val current = value.asInstanceOf[Float]
-    avgValue += (current - avgValue) / count
-  }
-
-  override def getAggregated(): Float = {
-    avgValue.toFloat
-  }
-}
-
-// Double average aggregate return Double as aggregated value.
-class DoubleAvgAggregate extends AvgAggregate[Double] {
-  private var avgValue: Double = 0
-  private var count: Int = 0
-
-  override def initiateAggregate: Unit = {
-    avgValue = 0
-    count = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    count += 1
-    val current = value.asInstanceOf[Double]
-    avgValue += (current - avgValue) / count
-  }
-
-  override def getAggregated(): Double = {
-    avgValue
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala
deleted file mode 100644
index ab6b170..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions.aggregate
-
-class CountAggregate extends Aggregate[Long] {
-  private var count: Long = 0L
-
-  override def initiateAggregate: Unit = {
-    count = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    count += 1
-  }
-
-  override def getAggregated(): Long = {
-    count
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
deleted file mode 100644
index 072eb3f..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions.aggregate
-
-abstract class MaxAggregate[T] extends Aggregate[T]{
-
-}
-
-class TinyIntMaxAggregate extends MaxAggregate[Byte] {
-  private var max = Byte.MaxValue
-
-  override def initiateAggregate: Unit = {
-    max = Byte.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Byte]
-    if (current < max) {
-      max = current
-    }
-  }
-
-  override def getAggregated(): Byte = {
-    max
-  }
-}
-
-class SmallIntMaxAggregate extends MaxAggregate[Short] {
-  private var max = Short.MaxValue
-
-  override def initiateAggregate: Unit = {
-    max = Short.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Short]
-    if (current < max) {
-      max = current
-    }
-  }
-
-  override def getAggregated(): Short = {
-    max
-  }
-}
-
-class IntMaxAggregate extends MaxAggregate[Int] {
-  private var max = Int.MaxValue
-
-  override def initiateAggregate: Unit = {
-    max = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Int]
-    if (current < max) {
-      max = current
-    }
-  }
-
-  override def getAggregated(): Int = {
-    max
-  }
-}
-
-class LongMaxAggregate extends MaxAggregate[Long] {
-  private var max = Long.MaxValue
-
-  override def initiateAggregate: Unit = {
-    max = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Long]
-    if (current < max) {
-      max = current
-    }
-  }
-
-  override def getAggregated(): Long = {
-    max
-  }
-}
-
-class FloatMaxAggregate extends MaxAggregate[Float] {
-  private var max = Float.MaxValue
-
-  override def initiateAggregate: Unit = {
-    max = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Float]
-    if (current < max) {
-      max = current
-    }
-  }
-
-  override def getAggregated(): Float = {
-    max
-  }
-}
-
-class DoubleMaxAggregate extends MaxAggregate[Double] {
-  private var max = Double.MaxValue
-
-  override def initiateAggregate: Unit = {
-    max = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Double]
-    if (current < max) {
-      max = current
-    }
-  }
-
-  override def getAggregated(): Double = {
-    max
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala
deleted file mode 100644
index c233b8e..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions.aggregate
-
-abstract class MinAggregate[T] extends Aggregate[T]{
-
-}
-
-class TinyIntMinAggregate extends MinAggregate[Byte] {
-  private var min = Byte.MaxValue
-
-  override def initiateAggregate: Unit = {
-    min = Byte.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Byte]
-    if (current < min) {
-      min = current
-    }
-  }
-
-  override def getAggregated(): Byte = {
-    min
-  }
-}
-
-class SmallIntMinAggregate extends MinAggregate[Short] {
-  private var min = Short.MaxValue
-
-  override def initiateAggregate: Unit = {
-    min = Short.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Short]
-    if (current < min) {
-      min = current
-    }
-  }
-
-  override def getAggregated(): Short = {
-    min
-  }
-}
-
-class IntMinAggregate extends MinAggregate[Int] {
-  private var min = Int.MaxValue
-
-  override def initiateAggregate: Unit = {
-    min = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Int]
-    if (current < min) {
-      min = current
-    }
-  }
-
-  override def getAggregated(): Int = {
-    min
-  }
-}
-
-class LongMinAggregate extends MinAggregate[Long] {
-  private var min = Long.MaxValue
-
-  override def initiateAggregate: Unit = {
-    min = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Long]
-    if (current < min) {
-      min = current
-    }
-  }
-
-  override def getAggregated(): Long = {
-    min
-  }
-}
-
-class FloatMinAggregate extends MinAggregate[Float] {
-  private var min = Float.MaxValue
-
-  override def initiateAggregate: Unit = {
-    min = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Float]
-    if (current < min) {
-      min = current
-    }
-  }
-
-  override def getAggregated(): Float = {
-    min
-  }
-}
-
-class DoubleMinAggregate extends MinAggregate[Double] {
-  private var min = Double.MaxValue
-
-  override def initiateAggregate: Unit = {
-    min = Int.MaxValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    val current = value.asInstanceOf[Double]
-    if (current < min) {
-      min = current
-    }
-  }
-
-  override def getAggregated(): Double = {
-    min
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
deleted file mode 100644
index 14d1a73..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.functions.aggregate
-
-abstract class SumAggregate[T] extends Aggregate[T]{
-
-}
-
-// TinyInt sum aggregate return Int as aggregated value.
-class TinyIntSumAggregate extends SumAggregate[Int] {
-
-  private var sumValue: Int = 0
-
-  override def initiateAggregate: Unit = {
-    sumValue = 0
-  }
-
-
-  override def getAggregated(): Int = {
-    sumValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    sumValue += value.asInstanceOf[Byte]
-  }
-}
-
-// SmallInt sum aggregate return Int as aggregated value.
-class SmallIntSumAggregate extends SumAggregate[Int] {
-
-  private var sumValue: Int = 0
-
-  override def initiateAggregate: Unit = {
-    sumValue = 0
-  }
-
-  override def getAggregated(): Int = {
-    sumValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    sumValue += value.asInstanceOf[Short]
-  }
-}
-
-// Int sum aggregate return Int as aggregated value.
-class IntSumAggregate extends SumAggregate[Int] {
-
-  private var sumValue: Int = 0
-
-  override def initiateAggregate: Unit = {
-    sumValue = 0
-  }
-
-
-  override def getAggregated(): Int = {
-    sumValue
-  }
-
-  override def aggregate(value: Any): Unit = {
-    sumValue += value.asInstanceOf[Int]
-  }
-}
-
-// Long sum aggregate return Long as aggregated value.
-class LongSumAggregate extends SumAggregate[Long] {
-
-  private var sumValue: Long = 0L
-
-  override def initiateAggregate: Unit = {
-    sumValue = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    sumValue += value.asInstanceOf[Long]
-  }
-
-  override def getAggregated(): Long = {
-    sumValue
-  }
-}
-
-// Float sum aggregate return Float as aggregated value.
-class FloatSumAggregate extends SumAggregate[Float] {
-  private var sumValue: Float = 0
-
-  override def initiateAggregate: Unit = {
-    sumValue = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    sumValue += value.asInstanceOf[Float]
-  }
-
-  override def getAggregated(): Float = {
-    sumValue
-  }
-}
-
-// Double sum aggregate return Double as aggregated value.
-class DoubleSumAggregate extends SumAggregate[Double] {
-  private var sumValue: Double = 0
-
-  override def initiateAggregate: Unit = {
-    sumValue = 0
-  }
-
-  override def aggregate(value: Any): Unit = {
-    sumValue += value.asInstanceOf[Double]
-  }
-
-  override def getAggregated(): Double = {
-    sumValue
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
index 70810c8..ad7e0e9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -25,6 +25,11 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.{TableConfig, Row}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import scala.collection.JavaConverters._
+import org.apache.flink.api.table.plan.TypeConverter
 
 /**
   * Flink RelNode which matches along with ReduceGroupOperator.
@@ -36,7 +41,7 @@ class DataSetGroupReduce(
     rowType: RelDataType,
     opName: String,
     groupingKeys: Array[Int],
-    func: GroupReduceFunction[Any, Any])
+    func: GroupReduceFunction[Row, Row])
   extends SingleRel(cluster, traitSet, input)
   with DataSetRel {
 
@@ -61,6 +66,27 @@ class DataSetGroupReduce(
   override def translateToPlan(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-    ???
+
+    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
+
+    // get the output types
+    val fieldsNames = rowType.getFieldNames
+    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+    .map(f => f.getType.getSqlTypeName)
+    .map(n => TypeConverter.sqlTypeToTypeInfo(n))
+    .toArray
+
+    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+
+    if (groupingKeys.length > 0) {
+      inputDS.asInstanceOf[DataSet[Row]].groupBy(groupingKeys: 
_*).reduceGroup(func)
+      .returns(rowTypeInfo)
+      .asInstanceOf[DataSet[Any]]
+    }
+    else {
+      // global aggregation
+      inputDS.asInstanceOf[DataSet[Row]].reduceGroup(func)
+      .returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 6f988be..de436be 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -42,7 +42,7 @@ class DataSetJoin(
     joinKeysRight: Array[Int],
     joinType: JoinType,
     joinHint: JoinHint,
-    func: JoinFunction[Any, Any, Any])
+    func: JoinFunction[Row, Row, Row])
   extends BiRel(cluster, traitSet, left, right)
   with DataSetRel {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
index f66cb71..1fca03a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
@@ -57,20 +57,4 @@ class FlinkAggregate(
       aggCalls
     )
   }
-
-  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
-
-    val origCosts = super.computeSelfCost(planner)
-    val deltaCost = planner.getCostFactory.makeHugeCost()
-
-    // only prefer aggregations with transformed Avg
-    aggCalls.toList.foldLeft[RelOptCost](origCosts){
-      (c: RelOptCost, a: AggregateCall) =>
-        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
-          c.plus(deltaCost)
-        } else {
-          c
-        }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 97e8b32..ac52b48 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -60,7 +60,8 @@ object FlinkRuleSets {
     AggregateRemoveRule.INSTANCE,
     AggregateJoinTransposeRule.EXTENDED,
     AggregateUnionAggregateRule.INSTANCE,
-    AggregateReduceFunctionsRule.INSTANCE,
+    // deactivate this rule temporarily
+    // AggregateReduceFunctionsRule.INSTANCE,
     AggregateExpandDistinctAggregatesRule.INSTANCE,
 
     // remove unnecessary sort rule

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
index 9ecd9d0..c6afb8a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
@@ -21,11 +21,10 @@ package org.apache.flink.api.table.plan.rules.dataset
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.plan.functions.aggregate.AggregateFactory
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetGroupReduce}
 import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, 
FlinkConvention}
-
 import scala.collection.JavaConversions._
+import org.apache.flink.api.table.runtime.aggregate.AggregateFactory
 
 class DataSetAggregateRule
   extends ConverterRule(
@@ -40,11 +39,13 @@ class DataSetAggregateRule
     val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
     val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataSetConvention.INSTANCE)
 
-    val grouping = agg.getGroupSet.asList().map {
-      case a: java.lang.Integer => a.intValue
-    }.toArray
+    val grouping = agg.getGroupSet.toArray
+
+    val inputType = agg.getInput.getRowType()
 
-    val aggregateFunction = 
AggregateFactory.createAggregateInstance(agg.getAggCallList)
+    // add grouping fields, position keys in the input, and input type
+    val aggregateFunction = 
AggregateFactory.createAggregateInstance(agg.getAggCallList,
+        inputType, grouping)
 
     new DataSetGroupReduce(
       rel.getCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
index 69c86c8..3d2117d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
@@ -20,17 +20,10 @@ package org.apache.flink.api.table.plan.rules.dataset
 
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataType}
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rex.{RexCall, RexInputRef}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.api.table.plan.PlanGenException
+import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetJoin}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, 
FlinkJoin}
-import org.apache.flink.api.table.plan.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, 
FlinkConvention}
 
 class DataSetJoinRule
   extends ConverterRule(
@@ -46,10 +39,6 @@ class DataSetJoinRule
     val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
     val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
DataSetConvention.INSTANCE)
 
-    val joinKeys = getJoinKeys(join)
-
-    // There would be a FlinkProject after FlinkJoin to handle the output 
fields afterward join,
-    // so we do not need JoinFunction here by now.
     new DataSetJoin(
       rel.getCluster,
       traitSet,
@@ -57,93 +46,12 @@ class DataSetJoinRule
       convRight,
       rel.getRowType,
       join.toString,
-      joinKeys._1,
-      joinKeys._2,
-      sqlJoinTypeToFlinkJoinType(join.getJoinType),
+      Array[Int](),
+      Array[Int](),
+      JoinType.INNER,
       null,
       null)
   }
-
-  private def getJoinKeys(join: FlinkJoin): (Array[Int], Array[Int]) = {
-    val joinKeys = ArrayBuffer.empty[(Int, Int)]
-    parseJoinRexNode(join.getCondition.asInstanceOf[RexCall], joinKeys)
-
-    val joinedRowType= join.getRowType
-    val leftRowType = join.getLeft.getRowType
-    val rightRowType = join.getRight.getRowType
-
-    // The fetched join key index from Calcite is based on joined row type, we 
need
-    // the join key index based on left/right input row type.
-    val joinKeyPairs: ArrayBuffer[(Int, Int)] = joinKeys.map {
-      case (first, second) =>
-        var leftIndex = findIndexInSingleInput(first, joinedRowType, 
leftRowType)
-        if (leftIndex == -1) {
-          leftIndex = findIndexInSingleInput(second, joinedRowType, 
leftRowType)
-          if (leftIndex == -1) {
-            throw new PlanGenException("Invalid join condition, could not find 
" +
-              joinedRowType.getFieldNames.get(first) + " and " +
-              joinedRowType.getFieldNames.get(second) + " in left table")
-          }
-          val rightIndex = findIndexInSingleInput(first, joinedRowType, 
rightRowType)
-          if (rightIndex == -1) {
-            throw new PlanGenException("Invalid join condition could not find 
" +
-              joinedRowType.getFieldNames.get(first) + " in right table")
-          }
-          (leftIndex, rightIndex)
-        } else {
-          val rightIndex = findIndexInSingleInput(second, joinedRowType, 
rightRowType)
-          if (rightIndex == -1) {
-            throw new PlanGenException("Invalid join condition could not find 
" +
-              joinedRowType.getFieldNames.get(second) + " in right table")
-          }
-          (leftIndex, rightIndex)
-        }
-    }
-
-    val joinKeysPair = joinKeyPairs.unzip
-
-    (joinKeysPair._1.toArray, joinKeysPair._2.toArray)
-  }
-
-  // Parse the join condition recursively, find all the join keys' index.
-  private def parseJoinRexNode(condition: RexCall, joinKeys: ArrayBuffer[(Int, 
Int)]): Unit = {
-    condition.getOperator.getKind match {
-      case SqlKind.AND =>
-        condition.getOperands.foreach {
-          operand => parseJoinRexNode(operand.asInstanceOf[RexCall], joinKeys)
-        }
-      case SqlKind.EQUALS =>
-        val operands = condition.getOperands
-        val leftIndex = operands(0).asInstanceOf[RexInputRef].getIndex
-        val rightIndex = operands(1).asInstanceOf[RexInputRef].getIndex
-        joinKeys += (leftIndex -> rightIndex)
-      case _ =>
-        // Do not support operands like OR in join condition due to the 
limitation
-        // of current Flink JoinOperator implementation.
-        throw new PlanGenException("Do not support operands other than " +
-          "AND and Equals in join condition now.")
-    }
-  }
-
-  // Find the field index of input row type.
-  private def findIndexInSingleInput(
-      globalIndex: Int,
-      joinedRowType: RelDataType,
-      inputRowType: RelDataType): Int = {
-
-    val fieldType: RelDataTypeField = 
joinedRowType.getFieldList.get(globalIndex)
-    inputRowType.getFieldList.zipWithIndex.foreach {
-      case (inputFieldType, index) =>
-        if (inputFieldType.getName.equals(fieldType.getName)
-          && inputFieldType.getType.equals(fieldType.getType)) {
-
-          return index
-        }
-    }
-
-    // return -1 if match none field of input row type.
-    -1
-  }
 }
 
 object DataSetJoinRule {

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala
new file mode 100644
index 0000000..47f903f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import java.lang.Iterable
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import scala.collection.JavaConversions._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.runtime.aggregate.Aggregate
+
+/**
+ * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped 
data as input,
+ * feed to the aggregates, and collect the record with aggregated value.
+ *
+ * @param aggregates SQL aggregate functions.
+ * @param fields The grouped keys' indices in the input.
+ * @param groupingKeys The grouping keys' positions.
+ */
+class AggregateFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val fields: Array[Int],
+    private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, 
Row] {
+
+  override def open(config: Configuration) {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(fields)
+    Preconditions.checkNotNull(groupingKeys)
+    Preconditions.checkArgument(aggregates.size == fields.size)
+  }
+
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+    aggregates.foreach(_.initiateAggregate)
+
+    var currentRecord: Row = null
+
+    // iterate all input records, feed to each aggregate.
+    val recordIterator = records.iterator
+    while (recordIterator.hasNext) {
+      currentRecord = recordIterator.next()
+      for (i <- 0 until aggregates.length) {
+        aggregates(i).aggregate(currentRecord.productElement(fields(i)))
+      }
+    }
+
+    // output a new Row type that contains the grouping keys and aggregates
+    var outValue: Row = new Row(groupingKeys.length + aggregates.length)
+
+    // copy the grouping fields from the last input row to the output row
+    for (i <- 0 until groupingKeys.length) {
+      outValue.setField(i, currentRecord.productElement(groupingKeys(i)))
+    }
+    // copy the results of the aggregate functions to the output row
+    for (i <- groupingKeys.length until groupingKeys.length + 
aggregates.length) {
+      outValue.setField(i, aggregates(i - groupingKeys.length).getAggregated)
+    }
+    out.collect(outValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
new file mode 100644
index 0000000..5bc744a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+/**
+ * Represents a SQL aggregate function. The user should first initialize the 
aggregate, then feed it
+ * with grouped aggregate field values, and finally get the aggregated value.
+ * @tparam T the output type
+ */
+trait Aggregate[T] extends Serializable {
+  /**
+   * Initialize the aggregate state.
+   */
+  def initiateAggregate
+
+  /**
+   * Feed the aggregate field value.
+   * @param value
+   */
+  def aggregate(value: Any)
+
+  /**
+   * Return final aggregated value.
+   * @return
+   */
+  def getAggregated(): T
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala
new file mode 100644
index 0000000..bb045fe
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.util
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun._
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.plan.PlanGenException
+import org.apache.flink.api.table.runtime.AggregateFunction
+import org.apache.flink.api.table.Row
+import org.apache.calcite.rel.`type`.RelDataType
+
+object AggregateFactory {
+
+  def createAggregateInstance(aggregateCalls: Seq[AggregateCall],
+      inputType: RelDataType, groupings: Array[Int]): 
RichGroupReduceFunction[Row, Row] = {
+
+    val fieldIndexes = new Array[Int](aggregateCalls.size)
+    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
+    aggregateCalls.zipWithIndex.map { case (aggregateCall, index) =>
+      val argList: util.List[Integer] = aggregateCall.getArgList
+      // currently assume only aggregate on singleton field.
+      if (argList.isEmpty) {
+        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+          fieldIndexes(index) = 0
+        } else {
+          throw new PlanGenException("Aggregate fields should not be empty.")
+        }
+      } else {
+          fieldIndexes(index) = argList.get(0)
+      }
+      val sqlTypeName = 
inputType.getFieldList.get(fieldIndexes(index)).getType.getSqlTypeName
+      aggregateCall.getAggregation match {
+        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
+          sqlTypeName match {
+            case TINYINT =>
+              aggregates(index) = new SumAggregate[Byte]
+            case SMALLINT =>
+              aggregates(index) = new SumAggregate[Short]
+            case INTEGER =>
+              aggregates(index) = new SumAggregate[Int]
+            case BIGINT =>
+              aggregates(index) = new SumAggregate[Long]
+            case FLOAT =>
+              aggregates(index) = new SumAggregate[Float]
+            case DOUBLE =>
+              aggregates(index) = new SumAggregate[Double]
+            case sqlType: SqlTypeName =>
+              throw new PlanGenException("Sum aggregate does no support type:" 
+ sqlType)
+          }
+        }
+        case _: SqlAvgAggFunction => {
+          sqlTypeName match {
+            case TINYINT =>
+              aggregates(index) = new ByteAvgAggregate
+            case SMALLINT =>
+              aggregates(index) = new ShortAvgAggregate
+            case INTEGER =>
+              aggregates(index) = new IntAvgAggregate
+            case BIGINT =>
+              aggregates(index) = new LongAvgAggregate
+            case FLOAT =>
+              aggregates(index) = new FloatAvgAggregate
+            case DOUBLE =>
+              aggregates(index) = new DoubleAvgAggregate
+            case sqlType: SqlTypeName =>
+              throw new PlanGenException("Avg aggregate does no support type:" 
+ sqlType)
+          }
+        }
+        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
+          if (sqlMinMaxFunction.isMin) {
+            sqlTypeName match {
+              case TINYINT =>
+                aggregates(index) = new TinyMinAggregate
+              case SMALLINT =>
+                aggregates(index) = new SmallMinAggregate
+              case INTEGER =>
+                aggregates(index) = new IntMinAggregate
+              case BIGINT =>
+                aggregates(index) = new LongMinAggregate
+              case FLOAT =>
+                aggregates(index) = new FloatMinAggregate
+              case DOUBLE =>
+                aggregates(index) = new DoubleMinAggregate
+              case sqlType: SqlTypeName =>
+                throw new PlanGenException("Min aggregate does no support 
type:" + sqlType)
+            }
+          } else {
+            sqlTypeName match {
+              case TINYINT =>
+                aggregates(index) = new TinyIntMaxAggregate
+              case SMALLINT =>
+                aggregates(index) = new SmallIntMaxAggregate
+              case INTEGER =>
+                aggregates(index) = new IntMaxAggregate
+              case BIGINT =>
+                aggregates(index) = new LongMaxAggregate
+              case FLOAT =>
+                aggregates(index) = new FloatMaxAggregate
+              case DOUBLE =>
+                aggregates(index) = new DoubleMaxAggregate
+              case sqlType: SqlTypeName =>
+                throw new PlanGenException("Max aggregate does no support 
type:" + sqlType)
+            }
+          }
+        }
+        case _: SqlCountAggFunction =>
+          aggregates(index) = new CountAggregate
+        case unSupported: SqlAggFunction =>
+          throw new PlanGenException("unsupported Function: " + 
unSupported.getName)
+      }
+    }
+
+    new AggregateFunction(aggregates, fieldIndexes, groupings)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
new file mode 100644
index 0000000..6a5a5a3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.math.BigInteger
+import com.google.common.math.LongMath
+
+// for byte, short, int we return int
+abstract class IntegralAvgAggregate[T: Numeric] extends Aggregate[T] {
+  
+  var sum: Long = 0
+  var count: Long = 0
+
+  override def initiateAggregate: Unit = {
+    sum = 0
+    count = 0
+  }
+
+}
+
+class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Byte])
+  }
+
+  override def getAggregated(): Byte = {
+    (sum / count).toByte
+  }
+}
+
+class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Short])
+  }
+
+  override def getAggregated(): Short = {
+    (sum / count).toShort
+  }
+}
+
+class IntAvgAggregate extends IntegralAvgAggregate[Int] {
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Int])
+  }
+
+  override def getAggregated(): Int = {
+    (sum / count).toInt
+  }
+}
+
+// Long average aggregate return Long as aggregated value.
+class LongAvgAggregate extends Aggregate[Long] {
+
+  var sum: BigInteger = BigInteger.ZERO
+  var count: Long = 0
+
+  override def initiateAggregate: Unit = {
+    sum = BigInteger.ZERO
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    sum = sum.add(BigInteger.valueOf(value.asInstanceOf[Long]))
+  }
+
+  override def getAggregated(): Long = {
+    sum.divide(BigInteger.valueOf(count)).longValue
+  }
+}
+
+// Float average aggregate return Float as aggregated value.
+abstract class FloatingPointAvgAggregate[T: Numeric] extends Aggregate[T] {
+
+  var avgValue: Double = 0
+  var count: Long = 0
+
+  override def initiateAggregate: Unit = {
+    avgValue = 0
+    count = 0
+  }
+}
+
+// Double average aggregate return Double as aggregated value.
+class FloatAvgAggregate extends FloatingPointAvgAggregate[Float] {
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Float]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Float = {
+    avgValue.toFloat
+  }
+}
+
+// Double average aggregate return Double as aggregated value.
+class DoubleAvgAggregate extends FloatingPointAvgAggregate[Double] {
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Double]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Double = {
+    avgValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
new file mode 100644
index 0000000..b2dd434
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+class CountAggregate extends Aggregate[Long] {
+  private var count: Long = 0L
+
+  override def initiateAggregate: Unit = {
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+  }
+
+  override def getAggregated(): Long = {
+    count
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
new file mode 100644
index 0000000..3cf0ba9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+abstract class MaxAggregate[T: Numeric] extends Aggregate[T] {
+
+  var result: T = _
+  val numericResult = implicitly[Numeric[T]]
+
+  override def aggregate(value: Any): Unit = {
+    val input: T = value.asInstanceOf[T]
+
+    result = numericResult.max(result, input)
+  }
+
+  override def getAggregated(): T = {
+    result
+  }
+
+}
+
+// Numeric doesn't have min value
+class TinyIntMaxAggregate extends MaxAggregate[Byte] {
+
+  override def initiateAggregate: Unit = {
+    result = Byte.MinValue
+  }
+
+}
+
+class SmallIntMaxAggregate extends MaxAggregate[Short] {
+
+  override def initiateAggregate: Unit = {
+    result = Short.MinValue
+  }
+
+}
+
+class IntMaxAggregate extends MaxAggregate[Int] {
+
+  override def initiateAggregate: Unit = {
+    result = Int.MinValue
+  }
+
+}
+
+class LongMaxAggregate extends MaxAggregate[Long] {
+
+  override def initiateAggregate: Unit = {
+    result = Long.MinValue
+  }
+
+}
+
+class FloatMaxAggregate extends MaxAggregate[Float] {
+
+  override def initiateAggregate: Unit = {
+    result = Float.MinValue
+  }
+
+}
+
+class DoubleMaxAggregate extends MaxAggregate[Double] {
+
+  override def initiateAggregate: Unit = {
+    result = Double.MinValue
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
new file mode 100644
index 0000000..e024bb4
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import scala.reflect.runtime.universe._
+
+abstract class MinAggregate[T: Numeric] extends Aggregate[T] {
+
+  var result: T = _
+  val numericResult = implicitly[Numeric[T]]
+
+  override def aggregate(value: Any): Unit = {
+    val input: T = value.asInstanceOf[T]
+
+    result = numericResult.min(result, input)
+  }
+
+  override def getAggregated(): T = {
+    result
+  }
+
+}
+
+// Numeric doesn't have max value
+class TinyMinAggregate extends MinAggregate[Byte] {
+
+  override def initiateAggregate: Unit = {
+    result = Byte.MaxValue
+  }
+
+}
+
+class SmallMinAggregate extends MinAggregate[Short] {
+
+  override def initiateAggregate: Unit = {
+    result = Short.MaxValue
+  }
+
+}
+
+class IntMinAggregate extends MinAggregate[Int] {
+
+    override def initiateAggregate: Unit = {
+    result = Int.MaxValue
+  }
+
+}
+
+class LongMinAggregate extends MinAggregate[Long] {
+
+  override def initiateAggregate: Unit = {
+    result = Long.MaxValue
+  }
+
+}
+
+class FloatMinAggregate extends MinAggregate[Float] {
+
+  override def initiateAggregate: Unit = {
+    result = Float.MaxValue
+  }
+
+}
+
+class DoubleMinAggregate extends MinAggregate[Double] {
+
+  override def initiateAggregate: Unit = {
+    result = Double.MaxValue
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
new file mode 100644
index 0000000..84e1ae7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+class SumAggregate[T: Numeric] extends Aggregate[T] {
+
+  private var result: T = _
+  val numericResult = implicitly[Numeric[T]]
+  /**
+    * Initialize the aggregate state.
+    */
+  override def initiateAggregate: Unit = {
+    result = implicitly[Numeric[T]].zero
+  }
+
+  /**
+    * Feed the aggregate field value.
+    *
+    * @param value
+    */
+  override def aggregate(value: Any): Unit = {
+    val input: T = value.asInstanceOf[T]
+
+    result = numericResult.plus(result, input.asInstanceOf[T])
+  }
+
+  /**
+    * Return final aggregated value.
+    *
+    * @return
+    */
+  override def getAggregated(): T = {
+    result
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index dd51b14..8e81893 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -50,6 +50,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 import scala.NotImplementedError;
 
 import java.util.List;
@@ -61,7 +62,8 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Ignore //DataSetMap needs to be implemented
+       @Test
        public void testAggregationTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -93,7 +95,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testWorkingAggregationDataTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -103,8 +105,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                                                new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"),
                                                new Tuple7<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d, "Ciao"));
 
-               Table table =
-                               tableEnv.fromDataSet(input);
+               Table table = tableEnv.fromDataSet(input);
 
                Table result =
                                table.select("f0.avg, f1.avg, f2.avg, f3.avg, 
f4.avg, f5.avg, f6.count");
@@ -115,6 +116,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
+       @Ignore // it seems like the arithmetic expression is added to the 
field position
        @Test(expected = NotImplementedError.class)
        public void testAggregationWithArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -138,7 +140,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testAggregationWithTwoCount() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -199,6 +201,5 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                String expected = "";
                compareResultAsText(results, expected);
        }
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 51f666e..222f161 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -27,11 +27,9 @@ import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.test.TableProgramsTestBase;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.NotImplementedError;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index f48be48..b8ca4cd 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -26,11 +26,9 @@ import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.test.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.NotImplementedError;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
index 524dd4e..910f601 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -59,7 +59,7 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testGroupedAggregate() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -78,7 +78,7 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testGroupingKeyForwardIfNotUsed() throws Exception {
 
                // the grouping key needs to be forwarded to the intermediate 
DataSet, even
@@ -101,7 +101,7 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testGroupNoAggregation() throws Exception {
 
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index ada0e06..c4ac138 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -26,11 +26,9 @@ import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.test.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.NotImplementedError;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
index ec4cd1c..8876dc8 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.ExpressionException;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 76bdcba..64f6757 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -27,13 +27,13 @@ import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Test(expected = classOf[NotImplementedError])
+  @Ignore //DataSetMap needs to be implemented
+  @Test
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -57,7 +57,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -71,6 +71,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Ignore // it seems like the arithmetic expression is added to the field 
position
   @Test(expected = classOf[NotImplementedError])
   def testAggregationWithArithmetic(): Unit = {
 
@@ -83,7 +84,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testAggregationWithTwoCount(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -120,7 +121,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testSQLStyleAggregations(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even

Reply via email to