[FLINK-3596] DataSet RelNode refactoring

- remove the intermediate flink relnode layer and the dataset rules
- move code generation from rules to DataSet nodes
- remove unused DataSete nodes
- move code generation from join rule to DataSetJoin node
- merge DataSetMap and DataSetReduce into  DataSetAggregate


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22621e02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22621e02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22621e02

Branch: refs/heads/tableOnCalcite
Commit: 22621e0289ee6e2783b3220685d9aaba84476d77
Parents: e34e439
Author: vasia <va...@apache.org>
Authored: Tue Mar 8 17:29:32 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Mar 10 23:30:26 2016 +0100

----------------------------------------------------------------------
 .../api/java/table/JavaBatchTranslator.scala    |  28 +---
 .../plan/nodes/dataset/DataSetAggregate.scala   | 114 ++++++++++++++
 .../table/plan/nodes/dataset/DataSetCalc.scala  | 152 +++++++++++++++++++
 .../plan/nodes/dataset/DataSetExchange.scala    |  66 --------
 .../plan/nodes/dataset/DataSetFlatMap.scala     |  74 ---------
 .../plan/nodes/dataset/DataSetGroupReduce.scala | 105 -------------
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  83 ++++++++--
 .../table/plan/nodes/dataset/DataSetMap.scala   |  90 -----------
 .../plan/nodes/dataset/DataSetReduce.scala      |  66 --------
 .../table/plan/nodes/dataset/DataSetSort.scala  |  65 --------
 .../plan/nodes/logical/FlinkAggregate.scala     |  60 --------
 .../table/plan/nodes/logical/FlinkCalc.scala    |  37 -----
 .../plan/nodes/logical/FlinkConvention.scala    |  42 -----
 .../table/plan/nodes/logical/FlinkJoin.scala    |  46 ------
 .../api/table/plan/nodes/logical/FlinkRel.scala |  25 ---
 .../table/plan/nodes/logical/FlinkScan.scala    |  31 ----
 .../table/plan/nodes/logical/FlinkUnion.scala   |  38 -----
 .../api/table/plan/rules/FlinkRuleSets.scala    |  11 --
 .../rules/dataset/DataSetAggregateRule.scala    |  71 ---------
 .../plan/rules/dataset/DataSetCalcRule.scala    | 137 -----------------
 .../plan/rules/dataset/DataSetJoinRule.scala    | 135 ----------------
 .../plan/rules/dataset/DataSetScanRule.scala    |  51 -------
 .../plan/rules/dataset/DataSetUnionRule.scala   |  53 -------
 .../plan/rules/logical/FlinkAggregateRule.scala |  22 +--
 .../plan/rules/logical/FlinkCalcRule.scala      |  15 +-
 .../plan/rules/logical/FlinkJoinRule.scala      |  37 +++--
 .../logical/FlinkJoinUnionTransposeRule.scala   |   2 +-
 .../plan/rules/logical/FlinkScanRule.scala      |  11 +-
 .../plan/rules/logical/FlinkUnionRule.scala     |  22 +--
 .../table/runtime/aggregate/AggregateUtil.scala |  49 +++---
 30 files changed, 421 insertions(+), 1317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index f238df3..14ee78e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -76,8 +76,10 @@ class JavaBatchTranslator(config: TableConfig) extends 
PlanTranslator {
     // optimize the logical Flink plan
     val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
     val flinkOutputProps = RelTraitSet.createEmpty()
+      .plus(DataSetConvention.INSTANCE)
+      .plus(RelCollations.of()).simplify()
 
-    val optPlan = try {
+    val dataSetPlan = try {
       optProgram.run(planner, decorPlan, flinkOutputProps)
     }
     catch {
@@ -89,30 +91,8 @@ class JavaBatchTranslator(config: TableConfig) extends 
PlanTranslator {
     }
 
     println("---------------")
-    println("Optimized Plan:")
-    println("---------------")
-    println(RelOptUtil.toString(optPlan))
-
-    // optimize the logical Flink plan
-    val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES)
-    val dataSetOutputProps = RelTraitSet.createEmpty()
-      .plus(DataSetConvention.INSTANCE)
-      .plus(RelCollations.of()).simplify()
-
-    val dataSetPlan = try {
-      dataSetProgram.run(planner, optPlan, dataSetOutputProps)
-    }
-    catch {
-      case e: CannotPlanException =>
-        throw new PlanGenException(
-          s"Cannot generate a valid execution plan for the given query: \n\n" +
-            s"${RelOptUtil.toString(lPlan)}\n" +
-            "Please consider filing a bug report.", e)
-    }
-
-    println("-------------")
     println("DataSet Plan:")
-    println("-------------")
+    println("---------------")
     println(RelOptUtil.toString(dataSetPlan))
 
     dataSetPlan match {

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
new file mode 100644
index 0000000..3856c5f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.plan.TypeConverter._
+import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableConfig}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with a LogicalAggregate.
+  */
+class DataSetAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    rowType: RelDataType,
+    inputType: RelDataType,
+    opName: String,
+    grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      namedAggregates,
+      rowType,
+      inputType,
+      opName,
+      grouping)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    expectedType match {
+      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+        throw new PlanGenException("Aggregate operations currently only 
support returning Rows.")
+      case _ => // ok
+    }
+
+    val groupingKeys = (0 until grouping.length).toArray
+    // add grouping fields, position keys in the input, and input type
+    val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+      inputType, rowType, grouping, config)
+
+    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
+      config,
+      // tell the input operator that this operator currently only supports 
Rows as input
+      Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+    // get the output types
+    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+    .map(f => f.getType.getSqlTypeName)
+    .map(n => TypeConverter.sqlTypeToTypeInfo(n))
+    .toArray
+
+    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val mappedInput = inputDS.map(aggregateResult.mapFunc)
+    val groupReduceFunction = aggregateResult.reduceGroupFunc
+
+    if (groupingKeys.length > 0) {
+      mappedInput.asInstanceOf[DataSet[Row]]
+        .groupBy(groupingKeys: _*)
+        .reduceGroup(groupReduceFunction)
+        .returns(rowTypeInfo)
+        .asInstanceOf[DataSet[Any]]
+    }
+    else {
+      // global aggregation
+      mappedInput.asInstanceOf[DataSet[Row]]
+        .reduceGroup(groupReduceFunction)
+        .returns(rowTypeInfo)
+        .asInstanceOf[DataSet[Any]]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
new file mode 100644
index 0000000..d7c71cc
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.plan.TypeConverter._
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.TableConfig
+import org.apache.calcite.rex.RexProgram
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with LogicalCalc.
+  *
+  */
+class DataSetCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    calcProgram: RexProgram,
+    opName: String,
+    ruleDescription: String)
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetCalc(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      calcProgram,
+      opName,
+      ruleDescription)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("name", opName)
+  }
+
+  override def toString = opName
+
+  override def translateToPlan(config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val generator = new CodeGenerator(config, inputDS.getType)
+
+    val condition = calcProgram.getCondition
+    val expandedExpressions = calcProgram.getProjectList.map(
+       expr => calcProgram.expandLocalRef(expr))
+    val projection = generator.generateResultExpression(
+      returnType,
+      rowType.getFieldNames,
+      expandedExpressions)
+
+    val body = {
+      // only projection
+      if (condition == null) {
+        s"""
+          |${projection.code}
+          |${generator.collectorTerm}.collect(${projection.resultTerm});
+          |""".stripMargin
+      }
+      else {
+        val filterCondition = generator.generateExpression(
+          calcProgram.expandLocalRef(calcProgram.getCondition))
+        // only filter
+        if (projection == null) {
+          // conversion
+          if (inputDS.getType != returnType) {
+            val conversion = generator.generateConverterResultExpression(
+              returnType,
+              rowType.getFieldNames)
+
+            s"""
+              |${filterCondition.code}
+              |if (${filterCondition.resultTerm}) {
+              |  ${conversion.code}
+              |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+              |}
+              |""".stripMargin
+          }
+          // no conversion
+          else {
+            s"""
+              |${filterCondition.code}
+              |if (${filterCondition.resultTerm}) {
+              |  ${generator.collectorTerm}.collect(${generator.input1Term});
+              |}
+              |""".stripMargin
+          }
+        }
+        // both filter and projection
+        else {
+          s"""
+            |${filterCondition.code}
+            |if (${filterCondition.resultTerm}) {
+            |  ${projection.code}
+            |  ${generator.collectorTerm}.collect(${projection.resultTerm});
+            |}
+            |""".stripMargin
+        }
+      }
+    }
+
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Any, Any]],
+      body,
+      returnType)
+
+    val mapFunc = new FlatMapRunner[Any, Any](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+    inputDS.flatMap(mapFunc)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
deleted file mode 100644
index 00cf899..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
-
-/**
-  * Flink RelNode which matches along with PartitionOperator.
-  */
-class DataSetExchange(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowType: RelDataType,
-    opName: String,
-    partitionKey: Array[Int],
-    partitionMethod: PartitionMethod)
-  extends SingleRel(cluster, traitSet, input)
-  with DataSetRel {
-
-  override def deriveRowType() = rowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new DataSetExchange(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      rowType,
-      opName,
-      partitionKey,
-      partitionMethod
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("name", opName)
-  }
-
-  override def translateToPlan(
-      config: TableConfig, 
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-    ???
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
deleted file mode 100644
index 9744792..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.plan.TypeConverter._
-
-/**
-  * Flink RelNode which matches along with FlatMapOperator.
-  *
-  */
-class DataSetFlatMap(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowType: RelDataType,
-    opName: String,
-    func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => 
FlatMapFunction[Any, Any])
-  extends SingleRel(cluster, traitSet, input)
-  with DataSetRel {
-
-  override def deriveRowType() = rowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new DataSetFlatMap(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      rowType,
-      opName,
-      func
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("name", opName)
-  }
-
-  override def toString = opName
-
-  override def translateToPlan(config: TableConfig,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-    val flatMapFunc = func.apply(config, inputDataSet.getType, returnType)
-    inputDataSet.flatMap(flatMapFunc)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
deleted file mode 100644
index b87a092..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableConfig}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode which matches along with ReduceGroupOperator.
-  */
-class DataSetGroupReduce(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowType: RelDataType,
-    opName: String,
-    groupingKeys: Array[Int],
-    func: (TableConfig, TypeInformation[Row], TypeInformation[Row]) =>
-        GroupReduceFunction[Row, Row])
-  extends SingleRel(cluster, traitSet, input)
-  with DataSetRel {
-
-  override def deriveRowType() = rowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new DataSetGroupReduce(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      rowType,
-      opName,
-      groupingKeys,
-      func
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("name", opName)
-  }
-
-  override def translateToPlan(
-      config: TableConfig,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        throw new PlanGenException("GroupReduce operations currently only 
support returning Rows.")
-      case _ => // ok
-    }
-
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
-      config,
-      // tell the input operator that this operator currently only supports 
Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    // get the output types
-    val fieldsNames = rowType.getFieldNames
-    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-    .map(f => f.getType.getSqlTypeName)
-    .map(n => TypeConverter.sqlTypeToTypeInfo(n))
-    .toArray
-
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
-    val groupReduceFunction =
-      func.apply(config, inputDS.getType.asInstanceOf[RowTypeInfo], 
rowTypeInfo)
-
-    if (groupingKeys.length > 0) {
-      inputDS.asInstanceOf[DataSet[Row]]
-          .groupBy(groupingKeys: _*)
-          .reduceGroup(groupReduceFunction)
-          .returns(rowTypeInfo)
-          .asInstanceOf[DataSet[Any]]
-    }
-    else {
-      // global aggregation
-      inputDS.asInstanceOf[DataSet[Row]].reduceGroup(groupReduceFunction)
-      .returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index c32853d..1d293d2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -20,21 +20,21 @@ package org.apache.flink.api.table.plan.nodes.dataset
 
 import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinInfo
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
-import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.calcite.util.mapping.IntPair
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.{TableConfig, Row}
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.FlatJoinRunner
+import org.apache.flink.api.table.{TableException, TableConfig}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.table.plan.TypeConverter._
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
-import org.apache.flink.api.table.plan.TypeConverter
+import org.apache.calcite.rex.RexNode
 
 /**
   * Flink RelNode which matches along with JoinOperator and its related 
operations.
@@ -46,12 +46,13 @@ class DataSetJoin(
     right: RelNode,
     rowType: RelDataType,
     opName: String,
-    joinKeysLeft: Array[Int],
-    joinKeysRight: Array[Int],
+    joinCondition: RexNode,
+    joinRowType: RelDataType,
+    joinInfo: JoinInfo,
+    keyPairs: List[IntPair],
     joinType: JoinType,
     joinHint: JoinHint,
-    func: (TableConfig, TypeInformation[Any], TypeInformation[Any], 
TypeInformation[Any]) =>
-      FlatJoinFunction[Any, Any, Any])
+    ruleDescription: String)
   extends BiRel(cluster, traitSet, left, right)
   with DataSetRel {
 
@@ -65,12 +66,13 @@ class DataSetJoin(
       inputs.get(1),
       rowType,
       opName,
-      joinKeysLeft,
-      joinKeysRight,
+      joinCondition,
+      joinRowType,
+      joinInfo,
+      keyPairs,
       joinType,
       joinHint,
-      func
-    )
+      ruleDescription)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -90,8 +92,57 @@ class DataSetJoin(
       config.getNullCheck,
       config.getEfficientTypeUsage)
 
-    val joinFun = func.apply(config, leftDataSet.getType, 
rightDataSet.getType, returnType)
-      leftDataSet.join(rightDataSet).where(joinKeysLeft: 
_*).equalTo(joinKeysRight: _*)
+    // get the equality keys
+    val leftKeys = ArrayBuffer.empty[Int]
+    val rightKeys = ArrayBuffer.empty[Int]
+    if (keyPairs.isEmpty) {
+      // if no equality keys => not supported
+      throw new TableException("Joins should have at least one equality 
condition")
+    }
+    else {
+      // at least one equality expression => generate a join function
+      keyPairs.foreach(pair => {
+        leftKeys.add(pair.source)
+        rightKeys.add(pair.target)
+      })
+    }
+
+    val generator = new CodeGenerator(config, leftDataSet.getType, 
Some(rightDataSet.getType))
+    val conversion = generator.generateConverterResultExpression(
+      returnType,
+      joinRowType.getFieldNames)
+
+    var body = ""
+
+    if (joinInfo.isEqui) {
+      // only equality condition
+      body = s"""
+           |${conversion.code}
+           |${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |""".stripMargin
+    }
+    else {
+      val condition = generator.generateExpression(joinCondition)
+      body = s"""
+           |${condition.code}
+           |if (${condition.resultTerm}) {
+           |  ${conversion.code}
+           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |}
+           |""".stripMargin
+    }
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatJoinFunction[Any, Any, Any]],
+      body,
+      returnType)
+
+    val joinFun = new FlatJoinRunner[Any, Any, Any](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+    leftDataSet.join(rightDataSet).where(leftKeys.toArray: 
_*).equalTo(rightKeys.toArray: _*)
       .`with`(joinFun).asInstanceOf[DataSet[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
deleted file mode 100644
index d87e047..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.{MapFunction, 
MapPartitionFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
-import org.apache.flink.api.table.plan.TypeConverter._
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableConfig}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode which matches along with MapOperator.
-  *
-  */
-class DataSetMap(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowType: RelDataType,
-    opName: String,
-    func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => 
MapFunction[Any, Any])
-  extends SingleRel(cluster, traitSet, input)
-  with DataSetRel {
-
-  override def deriveRowType() = rowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new DataSetMap(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      rowType,
-      opName,
-      func
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("name", opName)
-  }
-
-  override def toString = opName
-
-  override def translateToPlan(config: TableConfig,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        throw new PlanGenException("GroupReduce operations " +
-            "currently only support returning Rows.")
-      case _ => // ok
-    }
-
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
-      config,
-      // tell the input operator that this operator currently only supports 
Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-    val mapFunc = func.apply(config, inputDS.getType, returnType)
-    inputDS.map(mapFunc)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
deleted file mode 100644
index 361f869..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{TableConfig, Row}
-
-/**
-  * Flink RelNode which matches along with ReduceOperator.
-  */
-class DataSetReduce(
-    cluster: RelOptCluster,
-    traits: RelTraitSet,
-    input: RelNode,
-    rowType: RelDataType,
-    opName: String,
-    groupingKeys: Array[Int],
-    func: ReduceFunction[Any])
-  extends SingleRel(cluster, traits, input)
-  with DataSetRel {
-
-  override def deriveRowType() = rowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new DataSetReduce(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      rowType,
-      opName,
-      groupingKeys,
-      func
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("name", opName)
-  }
-
-  override def translateToPlan(
-      config: TableConfig,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-    ???
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
deleted file mode 100644
index 033711b..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
-
-/**
-  * Flink RelNode which matches along with SortPartitionOperator.
-  */
-class DataSetSort(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowType: RelDataType,
-    opName: String,
-    sortKey: Array[Int],
-    sortOrder: Array[Boolean])
-  extends SingleRel(cluster, traitSet, input)
-  with DataSetRel {
-
-  override def deriveRowType() = rowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new DataSetSort(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      rowType,
-      opName,
-      sortKey,
-      sortOrder
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("name", opName)
-  }
-
-  override def translateToPlan(
-      config: TableConfig,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-    ???
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
deleted file mode 100644
index 1fca03a..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import java.util
-
-import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, 
RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{AggregateCall, Aggregate}
-import org.apache.calcite.sql.fun.SqlAvgAggFunction
-import org.apache.calcite.util.ImmutableBitSet
-
-import scala.collection.JavaConversions._
-
-class FlinkAggregate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    indicator: Boolean,
-    groupSet: ImmutableBitSet,
-    groupSets: java.util.List[ImmutableBitSet],
-    aggCalls: java.util.List[AggregateCall])
-  extends Aggregate(cluster, traitSet, input, indicator, groupSet, groupSets, 
aggCalls)
-  with FlinkRel {
-
-  override def copy(
-      traitSet: RelTraitSet,
-      input: RelNode,
-      indicator: Boolean,
-      groupSet: ImmutableBitSet,
-      groupSets: util.List[ImmutableBitSet],
-      aggCalls: util.List[AggregateCall]): Aggregate = {
-
-    new FlinkAggregate(
-      cluster,
-      traitSet,
-      input,
-      indicator,
-      groupSet,
-      groupSets,
-      aggCalls
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
deleted file mode 100644
index bcfe8d7..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.Calc
-import org.apache.calcite.rex.RexProgram
-
-class FlinkCalc(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    program: RexProgram)
-  extends Calc(cluster, traitSet, input, program)
-  with FlinkRel {
-
-  override def copy(traitSet: RelTraitSet, child: RelNode, program: 
RexProgram): Calc = {
-    new FlinkCalc(cluster, traitSet, child, program)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
deleted file mode 100644
index 80137f2..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import org.apache.calcite.plan._
-
-class FlinkConvention extends Convention {
-
-  override def toString: String = getName
-
-  def getInterface: Class[_] = classOf[FlinkRel]
-
-  def getName: String = "FLINK"
-
-  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
-
-  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
-
-  def register(planner: RelOptPlanner): Unit = { }
-
-}
-
-object FlinkConvention {
-
-  val INSTANCE = new FlinkConvention
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
deleted file mode 100644
index 8b04b50..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{JoinRelType, Join}
-import org.apache.calcite.rex.RexNode
-
-class FlinkJoin(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    left: RelNode,
-    right: RelNode,
-    condition: RexNode,
-    joinType: JoinRelType,
-    variablesStopped: java.util.Set[String])
-  extends Join(cluster, traitSet, left, right, condition, joinType, 
variablesStopped)
-  with FlinkRel {
-
-  override def copy(
-      traitSet: RelTraitSet,
-      condition: RexNode,
-      left: RelNode,
-      right: RelNode,
-      joinType: JoinRelType,
-      semiJoinDone: Boolean): Join = {
-    new FlinkJoin(cluster, traitSet, left, right, condition, joinType, 
getVariablesStopped)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
deleted file mode 100644
index 9ebd7e4..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import org.apache.calcite.rel.RelNode
-
-trait FlinkRel extends RelNode {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
deleted file mode 100644
index 6d53a75..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import org.apache.calcite.plan.{RelOptTable, RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.core.TableScan
-
-class FlinkScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
-  with FlinkRel {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
deleted file mode 100644
index fd791d3..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import java.util
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{SetOp, Union}
-
-class FlinkUnion(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputs: java.util.List[RelNode],
-    all: Boolean)
-  extends Union(cluster, traitSet, inputs, all)
-  with FlinkRel {
-
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode], all: 
Boolean): SetOp = {
-    new FlinkUnion(cluster, traitSet, inputs, all)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index b5c3800..bd128b2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.table.plan.rules
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSets, RuleSet}
 import org.apache.flink.api.table.plan.rules.logical._
-import org.apache.flink.api.table.plan.rules.dataset._
 
 object FlinkRuleSets {
 
@@ -102,14 +101,4 @@ object FlinkRuleSets {
     FlinkUnionRule.INSTANCE
   )
 
-  val DATASET_TRANS_RULES: RuleSet = RuleSets.ofList(
-  
-    // translate to DataSet nodes
-    DataSetAggregateRule.INSTANCE,
-    DataSetCalcRule.INSTANCE,
-    DataSetJoinRule.INSTANCE,
-    DataSetScanRule.INSTANCE,
-    DataSetUnionRule.INSTANCE
-  )
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
deleted file mode 100644
index ba77fea..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataset
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetGroupReduce, DataSetMap}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, 
FlinkConvention}
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
-
-import scala.collection.JavaConversions._
-
-class DataSetAggregateRule
-  extends ConverterRule(
-    classOf[FlinkAggregate],
-    FlinkConvention.INSTANCE,
-    DataSetConvention.INSTANCE,
-    "DataSetAggregateRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val agg: FlinkAggregate = rel.asInstanceOf[FlinkAggregate]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataSetConvention.INSTANCE)
-
-    val grouping = agg.getGroupSet.toArray
-
-    val inputType = agg.getInput.getRowType()
-
-    // add grouping fields, position keys in the input, and input type
-    val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(agg.getNamedAggCalls,
-        inputType, rel.getRowType, grouping)
-
-    val mapNode = new DataSetMap(rel.getCluster,
-      traitSet,
-      convInput,
-      aggregateResult.intermediateDataType,
-      agg.toString,
-      aggregateResult.mapFunc)
-
-    new DataSetGroupReduce(
-      rel.getCluster,
-      traitSet,
-      mapNode,
-      rel.getRowType,
-      agg.toString,
-      (0 until grouping.length).toArray,
-      aggregateResult.reduceGroupFunc)
-  }
-}
-
-object DataSetAggregateRule {
-  val INSTANCE: RelOptRule = new DataSetAggregateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
deleted file mode 100644
index 256a085..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataset
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetFlatMap}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, 
FlinkConvention}
-import org.apache.flink.api.table.runtime.FlatMapRunner
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.common.functions.FlatMapFunction
-import scala.collection.JavaConversions._
-import org.apache.calcite.rex.RexLocalRef
-import org.apache.flink.api.table.codegen.GeneratedExpression
-
-class DataSetCalcRule
-  extends ConverterRule(
-    classOf[FlinkCalc],
-    FlinkConvention.INSTANCE,
-    DataSetConvention.INSTANCE,
-    "DataSetCalcRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val calc: FlinkCalc = rel.asInstanceOf[FlinkCalc]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(calc.getInput, 
DataSetConvention.INSTANCE)
-
-    val calcFunc = (
-        config: TableConfig,
-        inputType: TypeInformation[Any],
-        returnType: TypeInformation[Any]) => {
-      val generator = new CodeGenerator(config, inputType)
-
-      val calcProgram = calc.getProgram
-      val condition = calcProgram.getCondition
-      val expandedExpressions = calcProgram.getProjectList.map(
-          expr => calcProgram.expandLocalRef(expr.asInstanceOf[RexLocalRef]))
-      val projection = generator.generateResultExpression(
-        returnType,
-        calc.getRowType.getFieldNames,
-        expandedExpressions)
-      
-      val body = {
-        // only projection
-        if (condition == null) {
-          s"""
-            |${projection.code}
-            |${generator.collectorTerm}.collect(${projection.resultTerm});
-            |""".stripMargin
-        }
-        else {
-          val filterCondition = generator.generateExpression(
-              calcProgram.expandLocalRef(calcProgram.getCondition))
-          // only filter
-          if (projection == null) {
-            // conversion
-            if (inputType != returnType) {
-              val conversion = generator.generateConverterResultExpression(
-                  returnType,
-                  calc.getRowType.getFieldNames)
-
-                  s"""
-                    |${filterCondition.code}
-                    |if (${filterCondition.resultTerm}) {
-                    |  ${conversion.code}
-                    |  
${generator.collectorTerm}.collect(${conversion.resultTerm});
-                    |}
-                    |""".stripMargin
-            }
-            // no conversion
-            else {
-              s"""
-                |${filterCondition.code}
-                |if (${filterCondition.resultTerm}) {
-                |  ${generator.collectorTerm}.collect(${generator.input1Term});
-                |}
-                |""".stripMargin
-            }
-          }
-          // both filter and projection
-          else {
-            s"""
-              |${filterCondition.code}
-              |if (${filterCondition.resultTerm}) {
-              |  ${projection.code}
-              |  ${generator.collectorTerm}.collect(${projection.resultTerm});
-              |}
-              |""".stripMargin
-          }
-        }
-      }
-
-      val genFunction = generator.generateFunction(
-        description,
-        classOf[FlatMapFunction[Any, Any]],
-        body,
-        returnType)
-
-      new FlatMapRunner[Any, Any](
-        genFunction.name,
-        genFunction.code,
-        genFunction.returnType)
-    }
-
-    new DataSetFlatMap(
-      rel.getCluster,
-      traitSet,
-      convInput,
-      rel.getRowType,
-      calc.toString,
-      calcFunc)
-  }
-}
-
-object DataSetCalcRule {
-  val INSTANCE: RelOptRule = new DataSetCalcRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
deleted file mode 100644
index c045471..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataset
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetJoin}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, 
FlinkConvention}
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import org.apache.flink.api.table.plan.TypeConverter._
-import org.apache.flink.api.table.runtime.FlatJoinRunner
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rel.core.JoinInfo
-import org.apache.flink.api.table.TableException
-
-class DataSetJoinRule
-  extends ConverterRule(
-    classOf[FlinkJoin],
-    FlinkConvention.INSTANCE,
-    DataSetConvention.INSTANCE,
-    "DataSetJoinRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val join: FlinkJoin = rel.asInstanceOf[FlinkJoin]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
DataSetConvention.INSTANCE)
-
-    // get the equality keys
-    val joinInfo = join.analyzeCondition
-    val keyPairs = joinInfo.pairs
-
-    if (keyPairs.isEmpty) { // if no equality keys => not supported
-      throw new TableException("Joins should have at least one equality 
condition")
-    }
-    else { // at least one equality expression => generate a join function
-      val conditionType = join.getCondition.getType
-      val func = getJoinFunction(join, joinInfo)
-      val leftKeys = ArrayBuffer.empty[Int]
-      val rightKeys = ArrayBuffer.empty[Int]
-
-      keyPairs.foreach(pair => {
-        leftKeys.add(pair.source)
-        rightKeys.add(pair.target)}
-      )
-
-      new DataSetJoin(
-        rel.getCluster,
-        traitSet,
-        convLeft,
-        convRight,
-        rel.getRowType,
-        join.toString,
-        leftKeys.toArray,
-        rightKeys.toArray,
-        JoinType.INNER,
-        null,
-        func)
-    }
-  }
-
-  def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo):
-      ((TableConfig, TypeInformation[Any], TypeInformation[Any], 
TypeInformation[Any]) =>
-      FlatJoinFunction[Any, Any, Any]) = {
-
-    val func = (
-        config: TableConfig,
-        leftInputType: TypeInformation[Any],
-        rightInputType: TypeInformation[Any],
-        returnType: TypeInformation[Any]) => {
-
-      val generator = new CodeGenerator(config, leftInputType, 
Some(rightInputType))
-      val conversion = generator.generateConverterResultExpression(
-          returnType,
-          join.getRowType.getFieldNames)
-      var body = ""
-
-      if (joinInfo.isEqui) {
-        // only equality condition
-        body = s"""
-            |${conversion.code}
-            |${generator.collectorTerm}.collect(${conversion.resultTerm});
-            |""".stripMargin
-      }
-      else {
-        val condition = generator.generateExpression(join.getCondition)
-        body = s"""
-            |${condition.code}
-            |if (${condition.resultTerm}) {
-            |  ${conversion.code}
-            |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-            |}
-            |""".stripMargin
-      }
-      val genFunction = generator.generateFunction(
-        description,
-        classOf[FlatJoinFunction[Any, Any, Any]],
-        body,
-        returnType)
-
-      new FlatJoinRunner[Any, Any, Any](
-        genFunction.name,
-        genFunction.code,
-        genFunction.returnType)
-    }
-    func
-  }
-}
-
-object DataSetJoinRule {
-  val INSTANCE: RelOptRule = new DataSetJoinRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
deleted file mode 100644
index f995201..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataset
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSource}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, 
FlinkConvention}
-
-class DataSetScanRule
-  extends ConverterRule(
-    classOf[FlinkScan],
-    FlinkConvention.INSTANCE,
-    DataSetConvention.INSTANCE,
-    "DataSetScanRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val scan: FlinkScan = rel.asInstanceOf[FlinkScan]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
-
-    new DataSetSource(
-      rel.getCluster,
-      traitSet,
-      scan.getTable,
-      rel.getRowType
-    )
-  }
-}
-
-object DataSetScanRule {
-  val INSTANCE: RelOptRule = new DataSetScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
deleted file mode 100644
index a390374..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataset
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetUnion}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, 
FlinkConvention}
-
-class DataSetUnionRule
-  extends ConverterRule(
-    classOf[FlinkUnion],
-    FlinkConvention.INSTANCE,
-    DataSetConvention.INSTANCE,
-    "DataSetUnionRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val union: FlinkUnion = rel.asInstanceOf[FlinkUnion]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), 
DataSetConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(union.getInput(1), 
DataSetConvention.INSTANCE)
-
-    new DataSetUnion(
-      rel.getCluster,
-      traitSet,
-      convLeft,
-      convRight,
-      rel.getRowType,
-      union.toString)
-  }
-}
-
-object DataSetUnionRule {
-  val INSTANCE: RelOptRule = new DataSetUnionRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
index a5bfeb6..01a5130 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
@@ -22,30 +22,32 @@ import org.apache.calcite.plan.{Convention, RelOptRule, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, 
FlinkConvention}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, 
DataSetConvention}
+import scala.collection.JavaConversions._
 
 class FlinkAggregateRule
   extends ConverterRule(
       classOf[LogicalAggregate],
       Convention.NONE,
-      FlinkConvention.INSTANCE,
+      DataSetConvention.INSTANCE,
       "FlinkAggregateRule")
   {
 
     def convert(rel: RelNode): RelNode = {
       val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
-      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(agg.getInput, 
FlinkConvention.INSTANCE)
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataSetConvention.INSTANCE)
 
-      new FlinkAggregate(
+      new DataSetAggregate(
         rel.getCluster,
         traitSet,
         convInput,
-        agg.indicator,
-        agg.getGroupSet,
-        agg.getGroupSets,
-        agg.getAggCallList)
-    }
+        agg.getNamedAggCalls,
+        rel.getRowType,
+        agg.getInput.getRowType,
+        agg.toString,
+        agg.getGroupSet.toArray)
+      }
   }
 
 object FlinkAggregateRule {

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
index f40b04d..f5e9c68 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
@@ -22,26 +22,29 @@ import org.apache.calcite.plan.{Convention, RelOptRule, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, 
FlinkConvention}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, 
DataSetConvention}
 
 class FlinkCalcRule
   extends ConverterRule(
       classOf[LogicalCalc],
       Convention.NONE,
-      FlinkConvention.INSTANCE,
+      DataSetConvention.INSTANCE,
       "FlinkCalcRule")
   {
 
     def convert(rel: RelNode): RelNode = {
       val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
-      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(calc.getInput, 
FlinkConvention.INSTANCE)
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(calc.getInput, 
DataSetConvention.INSTANCE)
 
-      new FlinkCalc(
+      new DataSetCalc(
         rel.getCluster,
         traitSet,
         convInput,
-        calc.getProgram)
+        rel.getRowType,
+        calc.getProgram,
+        calc.toString,
+        description)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
index 82f3eaa..c8ce944 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
@@ -24,15 +24,16 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.calcite.rex.{RexInputRef, RexCall}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, 
FlinkConvention}
-
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, 
DataSetConvention}
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 class FlinkJoinRule
   extends ConverterRule(
       classOf[LogicalJoin],
       Convention.NONE,
-      FlinkConvention.INSTANCE,
+      DataSetConvention.INSTANCE,
       "FlinkJoinRule")
   {
 
@@ -85,19 +86,27 @@ class FlinkJoinRule
     }
 
     def convert(rel: RelNode): RelNode = {
+
       val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
-      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
-      val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
FlinkConvention.INSTANCE)
-      val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
FlinkConvention.INSTANCE)
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
DataSetConvention.INSTANCE)
+      val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
DataSetConvention.INSTANCE)
+      val joinInfo = join.analyzeCondition
 
-      new FlinkJoin(
-        rel.getCluster,
-        traitSet,
-        convLeft,
-        convRight,
-        join.getCondition,
-        join.getJoinType,
-        join.getVariablesStopped)
+        new DataSetJoin(
+          rel.getCluster,
+          traitSet,
+          convLeft,
+          convRight,
+          rel.getRowType,
+          join.toString,
+          join.getCondition,
+          join.getRowType,
+          joinInfo,
+          joinInfo.pairs.toList,
+          JoinType.INNER,
+          null,
+          description)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
index af54f37..ff3ee8f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
@@ -33,7 +33,7 @@ import org.apache.calcite.rel.core.Union
 /**
  * This rule is a copy of Calcite's JoinUnionTransposeRule.
  * Calcite's implementation checks whether one of the operands is a 
LogicalUnion,
- * which fails in our case, when it matches with a FlinkUnion.
+ * which fails in our case, when it matches with a DataSetUnion.
  * This rule changes this check to match Union, instead of LogicalUnion only.
  * The rest of the rule's logic has not been changed.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
index d789770..21da504 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
@@ -25,25 +25,24 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSource}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, 
FlinkConvention}
 import org.apache.flink.api.table.plan.schema.DataSetTable
 
 class FlinkScanRule
   extends ConverterRule(
       classOf[LogicalTableScan],
       Convention.NONE,
-      FlinkConvention.INSTANCE,
+      DataSetConvention.INSTANCE,
       "FlinkScanRule")
   {
     def convert(rel: RelNode): RelNode = {
       val scan: TableScan = rel.asInstanceOf[TableScan]
-      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
-      val dataSet: DataSet[_] = 
scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
 
-      new FlinkScan(
+      new DataSetSource(
         rel.getCluster,
         traitSet,
-        scan.getTable
+        scan.getTable,
+        rel.getRowType
       )
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
index d9869f8..11600a2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
@@ -22,30 +22,30 @@ import org.apache.calcite.plan.{Convention, RelOptRule, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, 
FlinkConvention}
-
-import scala.collection.JavaConversions._
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetUnion}
 
 class FlinkUnionRule
   extends ConverterRule(
       classOf[LogicalUnion],
       Convention.NONE,
-      FlinkConvention.INSTANCE,
+      DataSetConvention.INSTANCE,
       "FlinkUnionRule")
   {
 
     def convert(rel: RelNode): RelNode = {
+
       val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
-      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConvention.INSTANCE)
-      val convInputs = union.getInputs.toList.map(
-        RelOptRule.convert(_, FlinkConvention.INSTANCE)
-      )
+      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convLeft: RelNode = RelOptRule.convert(union.getInput(0), 
DataSetConvention.INSTANCE)
+      val convRight: RelNode = RelOptRule.convert(union.getInput(1), 
DataSetConvention.INSTANCE)
 
-      new FlinkUnion(
+      new DataSetUnion(
         rel.getCluster,
         traitSet,
-        convInputs,
-        union.all)
+        convLeft,
+        convRight,
+        rel.getRowType,
+        union.toString)
     }
   }
 

Reply via email to