Repository: flink
Updated Branches:
  refs/heads/master 2ef4900aa -> 6a0ada81e


http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
deleted file mode 100644
index 8cfd748..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
-import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
-import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.FilterableTableSource
-
-class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
-  operand(classOf[DataSetCalc],
-    operand(classOf[BatchTableSourceScan], none)),
-  "PushFilterIntoBatchTableSourceScanRule")
-  with PushFilterIntoTableSourceScanRuleBase {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
-    val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
-    scan.tableSource match {
-      case source: FilterableTableSource[_] =>
-        calc.getProgram.getCondition != null && !source.isFilterPushedDown
-      case _ => false
-    }
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
-    val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
-    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    val filterableSource = 
scan.tableSource.asInstanceOf[FilterableTableSource[_]]
-    pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, 
description)
-  }
-}
-
-object PushFilterIntoBatchTableSourceScanRule {
-  val INSTANCE: RelOptRule = new PushFilterIntoBatchTableSourceScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
deleted file mode 100644
index 8c83047..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.RelOptRule.{none, operand}
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
-import 
org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
-import org.apache.flink.table.sources.ProjectableTableSource
-
-/**
-  * This rule tries to push projections into a BatchTableSourceScan.
-  */
-class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
-  operand(classOf[DataSetCalc],
-    operand(classOf[BatchTableSourceScan], none)),
-  "PushProjectIntoBatchTableSourceScanRule")
-  with PushProjectIntoTableSourceScanRuleBase {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
-    scan.tableSource match {
-      case _: ProjectableTableSource[_] => true
-      case _ => false
-    }
-  }
-
-  override def onMatch(call: RelOptRuleCall) {
-    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
-    val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
-    pushProjectIntoScan(call, calc, scan)
-  }
-}
-
-object PushProjectIntoBatchTableSourceScanRule {
-  val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
index 052f738..f011b66 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -18,24 +18,25 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, 
DataStreamConvention}
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate
 
 import scala.collection.JavaConversions._
 
 class DataStreamAggregateRule
   extends ConverterRule(
-      classOf[LogicalWindowAggregate],
-      Convention.NONE,
-      DataStreamConvention.INSTANCE,
-      "DataStreamAggregateRule") {
+    classOf[FlinkLogicalWindowAggregate],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
+    "DataStreamAggregateRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
-    val agg: LogicalWindowAggregate = 
call.rel(0).asInstanceOf[LogicalWindowAggregate]
+    val agg: FlinkLogicalWindowAggregate = 
call.rel(0).asInstanceOf[FlinkLogicalWindowAggregate]
 
     // check if we have distinct aggregates
     val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
@@ -53,9 +54,9 @@ class DataStreamAggregateRule
   }
 
   override def convert(rel: RelNode): RelNode = {
-    val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(agg.getInput, 
DataStreamConvention.INSTANCE)
+    val agg: FlinkLogicalWindowAggregate = 
rel.asInstanceOf[FlinkLogicalWindowAggregate]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convInput: RelNode = RelOptRule.convert(agg.getInput, 
FlinkConventions.DATASTREAM)
 
     new DataStreamAggregate(
       agg.getWindow,

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
index 4e620c9..1777264 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
@@ -18,25 +18,25 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc
-import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc
 
 class DataStreamCalcRule
   extends ConverterRule(
-    classOf[LogicalCalc],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
+    classOf[FlinkLogicalCalc],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
     "DataStreamCalcRule")
 {
 
   def convert(rel: RelNode): RelNode = {
-    val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(calc.getInput, 
DataStreamConvention.INSTANCE)
+    val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convInput: RelNode = RelOptRule.convert(calc.getInput, 
FlinkConventions.DATASTREAM)
 
     new DataStreamCalc(
       rel.getCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
index adce9f4..ae39d40 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
@@ -18,45 +18,40 @@
 package org.apache.flink.table.plan.rules.datastream
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
-/**
-  * Rule to convert a LogicalCorrelate into a DataStreamCorrelate.
-  */
 class DataStreamCorrelateRule
   extends ConverterRule(
-    classOf[LogicalCorrelate],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
+    classOf[FlinkLogicalCorrelate],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
     "DataStreamCorrelateRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
-    val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+    val join: FlinkLogicalCorrelate = 
call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
     val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
     right match {
       // right node is a table function
-      case scan: LogicalTableFunctionScan => true
+      case scan: FlinkLogicalTableFunctionScan => true
       // a filter is pushed above the table function
-      case filter: LogicalFilter =>
-        filter
-          .getInput.asInstanceOf[RelSubset]
-          .getOriginal
-          .isInstanceOf[LogicalTableFunctionScan]
+      case calc: FlinkLogicalCalc =>
+        calc.getInput.asInstanceOf[RelSubset]
+            .getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan]
       case _ => false
     }
   }
 
   override def convert(rel: RelNode): RelNode = {
-    val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(join.getInput(0), 
DataStreamConvention.INSTANCE)
+    val join: FlinkLogicalCorrelate = rel.asInstanceOf[FlinkLogicalCorrelate]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convInput: RelNode = RelOptRule.convert(join.getInput(0), 
FlinkConventions.DATASTREAM)
     val right: RelNode = join.getInput(1)
 
     def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): 
DataStreamCorrelate = {
@@ -64,12 +59,12 @@ class DataStreamCorrelateRule
         case rel: RelSubset =>
           convertToCorrelate(rel.getRelList.get(0), condition)
 
-        case filter: LogicalFilter =>
+        case calc: FlinkLogicalCalc =>
           convertToCorrelate(
-            filter.getInput.asInstanceOf[RelSubset].getOriginal,
-            Some(filter.getCondition))
+            calc.getInput.asInstanceOf[RelSubset].getOriginal,
+            Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition)))
 
-        case scan: LogicalTableFunctionScan =>
+        case scan: FlinkLogicalTableFunctionScan =>
           new DataStreamCorrelate(
             rel.getCluster,
             traitSet,

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
index dc46753..8e96970 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
@@ -19,28 +19,25 @@
 package org.apache.flink.table.plan.rules.datastream
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalWindow
-import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalOverWindow
 
-/**
-  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
-  */
 class DataStreamOverAggregateRule
   extends ConverterRule(
-    classOf[LogicalWindow],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
+    classOf[FlinkLogicalOverWindow],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
     "DataStreamOverAggregateRule") {
 
   override def convert(rel: RelNode): RelNode = {
-    val logicWindow: LogicalWindow = rel.asInstanceOf[LogicalWindow]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val logicWindow: FlinkLogicalOverWindow = 
rel.asInstanceOf[FlinkLogicalOverWindow]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
     val convertInput: RelNode =
-      RelOptRule.convert(logicWindow.getInput, DataStreamConvention.INSTANCE)
+      RelOptRule.convert(logicWindow.getInput, FlinkConventions.DATASTREAM)
 
     val inputRowType = 
convertInput.asInstanceOf[RelSubset].getOriginal.getRowType
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
index 91fd6e2..5bf60a7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
@@ -18,25 +18,24 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamScan
 import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalNativeTableScan
 
 class DataStreamScanRule
   extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
+    classOf[FlinkLogicalNativeTableScan],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
     "DataStreamScanRule")
 {
 
   override def matches(call: RelOptRuleCall): Boolean = {
-    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+    val scan: FlinkLogicalNativeTableScan = 
call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan]
     val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
     dataSetTable match {
       case _: DataStreamTable[Any] =>
@@ -47,8 +46,8 @@ class DataStreamScanRule
   }
 
   def convert(rel: RelNode): RelNode = {
-    val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val scan: FlinkLogicalNativeTableScan = 
rel.asInstanceOf[FlinkLogicalNativeTableScan]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
 
     new DataStreamScan(
       rel.getCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
index 475c050..4241f53 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -18,26 +18,26 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
 
 class DataStreamUnionRule
   extends ConverterRule(
-    classOf[LogicalUnion],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
+    classOf[FlinkLogicalUnion],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
     "DataStreamUnionRule")
 {
 
   def convert(rel: RelNode): RelNode = {
-    val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), 
DataStreamConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(union.getInput(1), 
DataStreamConvention.INSTANCE)
+    val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), 
FlinkConventions.DATASTREAM)
+    val convRight: RelNode = RelOptRule.convert(union.getInput(1), 
FlinkConventions.DATASTREAM)
 
     new DataStreamUnion(
       rel.getCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
index db33842..fbad21f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -18,24 +18,24 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamValues, 
DataStreamConvention}
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamValues
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalValues
 
 class DataStreamValuesRule
   extends ConverterRule(
-    classOf[LogicalValues],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
+    classOf[FlinkLogicalValues],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
     "DataStreamValuesRule")
 {
 
   def convert(rel: RelNode): RelNode = {
-
-    val values: LogicalValues = rel.asInstanceOf[LogicalValues]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
 
     new DataStreamValues(
       rel.getCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
deleted file mode 100644
index 53a3bcd..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.datastream
-
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, 
StreamTableSourceScan}
-import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
-import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.FilterableTableSource
-
-class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
-  operand(classOf[DataStreamCalc],
-    operand(classOf[StreamTableSourceScan], none)),
-  "PushFilterIntoStreamTableSourceScanRule")
-  with PushFilterIntoTableSourceScanRuleBase {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
-    val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
-    scan.tableSource match {
-      case source: FilterableTableSource[_] =>
-        calc.getProgram.getCondition != null && !source.isFilterPushedDown
-      case _ => false
-    }
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
-    val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
-    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    val filterableSource = 
scan.tableSource.asInstanceOf[FilterableTableSource[_]]
-    pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, 
description)
-  }
-}
-
-object PushFilterIntoStreamTableSourceScanRule {
-  val INSTANCE: RelOptRule = new PushFilterIntoStreamTableSourceScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
deleted file mode 100644
index 903162e..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.datastream
-
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, 
StreamTableSourceScan}
-import 
org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
-import org.apache.flink.table.sources.{ProjectableTableSource, 
StreamTableSource}
-
-/**
-  * The rule is responsible for push project into a [[StreamTableSourceScan]]
-  */
-class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
-  operand(classOf[DataStreamCalc],
-    operand(classOf[StreamTableSourceScan], none())),
-  "PushProjectIntoStreamTableSourceScanRule")
-  with PushProjectIntoTableSourceScanRuleBase {
-
-  /** Rule must only match if [[StreamTableSource]] targets a 
[[ProjectableTableSource]] */
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
-    scan.tableSource match {
-      case _: ProjectableTableSource[_] => true
-      case _ => false
-    }
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val calc = call.rel(0).asInstanceOf[DataStreamCalc]
-    val scan = call.rel(1).asInstanceOf[StreamTableSourceScan]
-    pushProjectIntoScan(call, calc, scan)
-  }
-}
-
-object PushProjectIntoStreamTableSourceScanRule {
-  val INSTANCE: RelOptRule = new PushProjectIntoStreamTableSourceScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index a6db084..10cb1f7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -18,21 +18,21 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.table.plan.nodes.datastream.{StreamTableSourceScan, 
DataStreamConvention}
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan
 import org.apache.flink.table.sources.StreamTableSource
 
-/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
 class StreamTableSourceScanRule
   extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    DataStreamConvention.INSTANCE,
+    classOf[FlinkLogicalTableSourceScan],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
     "StreamTableSourceScanRule")
 {
 
@@ -54,18 +54,14 @@ class StreamTableSourceScanRule
   }
 
   def convert(rel: RelNode): RelNode = {
-    val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
-    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-
-    // The original registered table source
-    val table = scan.getTable.unwrap(classOf[TableSourceTable[_]])
-    val tableSource: StreamTableSource[_] = 
table.tableSource.asInstanceOf[StreamTableSource[_]]
+    val scan: FlinkLogicalTableSourceScan = 
rel.asInstanceOf[FlinkLogicalTableSourceScan]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
 
     new StreamTableSourceScan(
       rel.getCluster,
       traitSet,
       scan.getTable,
-      tableSource
+      scan.tableSource.asInstanceOf[StreamTableSource[_]]
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
new file mode 100644
index 0000000..9d02c87
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts an EnumerableTableScan into a LogicalTableScan.
+ * We need this rule because Calcite creates an EnumerableTableScan
+ * when parsing a SQL query. We convert it into a LogicalTableScan
+ * so we can merge the optimization process with any plan that might be created
+ * by the Table API.
+ */
+class EnumerableToLogicalTableScan(
+    operand: RelOptRuleOperand,
+    description: String) extends RelOptRule(operand, description) {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
+    val table = oldRel.getTable
+    val newRel = LogicalTableScan.create(oldRel.getCluster, table)
+    call.transformTo(newRel)
+  }
+}
+
+object EnumerableToLogicalTableScan {
+  val INSTANCE = new EnumerableToLogicalTableScan(
+      operand(classOf[EnumerableTableScan], any),
+    "EnumerableToLogicalTableScan")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
new file mode 100644
index 0000000..ae6129e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.sources.FilterableTableSource
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+
+class PushFilterIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+    operand(classOf[FlinkLogicalTableSourceScan], none)),
+  "PushFilterIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+    scan.tableSource match {
+      case source: FilterableTableSource[_] =>
+        calc.getProgram.getCondition != null && !source.isFilterPushedDown
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    val filterableSource = 
scan.tableSource.asInstanceOf[FilterableTableSource[_]]
+    pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, 
description)
+  }
+
+  private def pushFilterIntoScan(
+      call: RelOptRuleCall,
+      calc: FlinkLogicalCalc,
+      scan: FlinkLogicalTableSourceScan,
+      tableSourceTable: TableSourceTable[_],
+      filterableSource: FilterableTableSource[_],
+      description: String): Unit = {
+
+    Preconditions.checkArgument(!filterableSource.isFilterPushedDown)
+
+    val program = calc.getProgram
+    val functionCatalog = FunctionCatalog.withBuiltIns
+    val (predicates, unconvertedRexNodes) =
+      RexProgramExtractor.extractConjunctiveConditions(
+        program,
+        call.builder().getRexBuilder,
+        functionCatalog)
+    if (predicates.isEmpty) {
+      // no condition can be translated to expression
+      return
+    }
+
+    val remainingPredicates = new util.LinkedList[Expression]()
+    predicates.foreach(e => remainingPredicates.add(e))
+
+    val newTableSource = filterableSource.applyPredicate(remainingPredicates)
+
+    // check whether framework still need to do a filter
+    val relBuilder = call.builder()
+    val remainingCondition = {
+      if (!remainingPredicates.isEmpty || unconvertedRexNodes.nonEmpty) {
+        relBuilder.push(scan)
+        val remainingConditions =
+          (remainingPredicates.asScala.map(expr => expr.toRexNode(relBuilder))
+              ++ unconvertedRexNodes)
+        remainingConditions.reduce((l, r) => relBuilder.and(l, r))
+      } else {
+        null
+      }
+    }
+
+    // check whether we still need a RexProgram. An RexProgram is needed when 
either
+    // projection or filter exists.
+    val newScan = scan.copy(scan.getTraitSet, newTableSource)
+    val newRexProgram = {
+      if (remainingCondition != null || !program.projectsOnlyIdentity) {
+        val expandedProjectList = program.getProjectList.asScala
+            .map(ref => program.expandLocalRef(ref)).asJava
+        RexProgram.create(
+          program.getInputRowType,
+          expandedProjectList,
+          remainingCondition,
+          program.getOutputRowType,
+          relBuilder.getRexBuilder)
+      } else {
+        null
+      }
+    }
+
+    if (newRexProgram != null) {
+      val newCalc = calc.copy(calc.getTraitSet, newScan, newRexProgram)
+      call.transformTo(newCalc)
+    } else {
+      call.transformTo(newScan)
+    }
+  }
+}
+
+object PushFilterIntoTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushFilterIntoTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
new file mode 100644
index 0000000..99a6927
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.plan.util.{RexProgramExtractor, 
RexProgramRewriter}
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, 
ProjectableTableSource}
+
+class PushProjectIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+    operand(classOf[FlinkLogicalTableSourceScan], none)),
+  "PushProjectIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+    scan.tableSource match {
+      case _: ProjectableTableSource[_] => true
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+    val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
+
+    // if no fields can be projected, we keep the original plan.
+    val source = scan.tableSource
+    if (TableEnvironment.getFieldNames(source).length != usedFields.length) {
+
+      val newTableSource = source match {
+        case nested: NestedFieldsProjectableTableSource[_] =>
+          val nestedFields = RexProgramExtractor
+            .extractRefNestedInputFields(calc.getProgram, usedFields)
+          nested.projectNestedFields(usedFields, nestedFields)
+        case projecting: ProjectableTableSource[_] =>
+          projecting.projectFields(usedFields)
+      }
+
+      val newScan = scan.copy(scan.getTraitSet, newTableSource)
+      val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
+        calc.getProgram,
+        newScan.getRowType,
+        calc.getCluster.getRexBuilder,
+        usedFields)
+
+      if (newCalcProgram.isTrivial) {
+        // drop calc if the transformed program merely returns its input and 
doesn't exist filter
+        call.transformTo(newScan)
+      } else {
+        val newCalc = calc.copy(calc.getTraitSet, newScan, newCalcProgram)
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}
+
+object PushProjectIntoTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushProjectIntoTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index d4db13e..e165983 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -443,7 +443,10 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
 
-               CalciteConfig cc = new 
CalciteConfigBuilder().replaceOptRuleSet(RuleSets.ofList()).build();
+               CalciteConfig cc = new CalciteConfigBuilder()
+                               .replaceLogicalOptRuleSet(RuleSets.ofList())
+                               .replacePhysicalOptRuleSet(RuleSets.ofList())
+                               .build();
                tableEnv.getConfig().setCalciteConfig(cc);
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
index d0de8fa..ed29f0f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -37,8 +37,11 @@ class CalciteConfigBuilderTest {
     assertFalse(cc.replacesNormRuleSet)
     assertFalse(cc.getNormRuleSet.isDefined)
 
-    assertFalse(cc.replacesOptRuleSet)
-    assertFalse(cc.getOptRuleSet.isDefined)
+    assertFalse(cc.replacesLogicalOptRuleSet)
+    assertFalse(cc.getLogicalOptRuleSet.isDefined)
+
+    assertFalse(cc.replacesPhysicalOptRuleSet)
+    assertFalse(cc.getPhysicalOptRuleSet.isDefined)
 
     assertFalse(cc.replacesDecoRuleSet)
     assertFalse(cc.getDecoRuleSet.isDefined)
@@ -48,16 +51,20 @@ class CalciteConfigBuilderTest {
   def testRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      
.replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .build()
+        .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+        .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        
.replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+        .build()
 
     assertFalse(cc.replacesNormRuleSet)
     assertTrue(cc.getNormRuleSet.isDefined)
 
-    assertTrue(cc.replacesOptRuleSet)
-    assertTrue(cc.getOptRuleSet.isDefined)
+    assertTrue(cc.replacesLogicalOptRuleSet)
+    assertTrue(cc.getLogicalOptRuleSet.isDefined)
+
+    assertTrue(cc.replacesPhysicalOptRuleSet)
+    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
 
     assertTrue(cc.replacesDecoRuleSet)
     assertTrue(cc.getDecoRuleSet.isDefined)
@@ -126,30 +133,30 @@ class CalciteConfigBuilderTest {
   }
 
   @Test
-  def testReplaceOptimizationRules(): Unit = {
+  def testReplaceLogicalOptimizationRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .build()
+        .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        .build()
 
-    assertEquals(true, cc.replacesOptRuleSet)
-    assertTrue(cc.getOptRuleSet.isDefined)
-    val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
+    assertEquals(true, cc.replacesLogicalOptRuleSet)
+    assertTrue(cc.getLogicalOptRuleSet.isDefined)
+    val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
     assertEquals(1, cSet.size)
     assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
   }
 
   @Test
-  def testReplaceOptimizationAddRules(): Unit = {
+  def testReplaceLogicalOptimizationAddRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, 
CalcSplitRule.INSTANCE))
-      .build()
+        .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        .addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, 
CalcSplitRule.INSTANCE))
+        .build()
 
-    assertEquals(true, cc.replacesOptRuleSet)
-    assertTrue(cc.getOptRuleSet.isDefined)
-    val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
+    assertEquals(true, cc.replacesLogicalOptRuleSet)
+    assertTrue(cc.getLogicalOptRuleSet.isDefined)
+    val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
     assertEquals(3, cSet.size)
     assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
     assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
@@ -157,30 +164,64 @@ class CalciteConfigBuilderTest {
   }
 
   @Test
-  def testAddOptimizationRules(): Unit = {
+  def testAddLogicalOptimizationRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .build()
+        .addLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        .addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, 
CalcSplitRule.INSTANCE))
+        .build()
 
-    assertEquals(false, cc.replacesOptRuleSet)
-    assertTrue(cc.getOptRuleSet.isDefined)
-    val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
+    assertEquals(false, cc.replacesLogicalOptRuleSet)
+    assertTrue(cc.getLogicalOptRuleSet.isDefined)
+    val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
+    assertEquals(3, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+  }
+
+  @Test
+  def testReplacePhysicalOptimizationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+        .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        .build()
+
+    assertEquals(true, cc.replacesPhysicalOptRuleSet)
+    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
+    val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
     assertEquals(1, cSet.size)
     assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
   }
 
   @Test
-  def testAddAddOptimizationRules(): Unit = {
+  def testReplacePhysicalOptimizationAddRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, 
CalcSplitRule.INSTANCE))
-      .build()
+        .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        .addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, 
CalcSplitRule.INSTANCE))
+        .build()
+
+    assertEquals(true, cc.replacesPhysicalOptRuleSet)
+    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
+    val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
+    assertEquals(3, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+  }
+
+  @Test
+  def testAddPhysicalOptimizationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+        .addPhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+        .addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, 
CalcSplitRule.INSTANCE))
+        .build()
 
-    assertEquals(false, cc.replacesOptRuleSet)
-    assertTrue(cc.getOptRuleSet.isDefined)
-    val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
+    assertEquals(false, cc.replacesPhysicalOptRuleSet)
+    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
+    val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
     assertEquals(3, cSet.size)
     assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
     assertTrue(cSet.contains(CalcMergeRule.INSTANCE))

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
index 696468d..d801644 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
@@ -116,7 +116,7 @@ class ExternalCatalogTest extends TableTestBase {
         sourceStreamTableNode(table1Path, table1ProjectedFields),
         term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
       ),
-      term("union", "_c0", "e", "_c2")
+      term("union all", "_c0", "e", "_c2")
     )
 
     util.verifyTable(result, expected)
@@ -143,7 +143,7 @@ class ExternalCatalogTest extends TableTestBase {
         sourceStreamTableNode(table1Path, table1ProjectedFields),
         term("select", "*(a, 2) AS EXPR$0", "b", "c")
       ),
-      term("union", "EXPR$0", "e", "g"))
+      term("union all", "EXPR$0", "e", "g"))
 
     util.verifySql(sqlQuery, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 50fafbe..3d93f45 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -333,13 +333,14 @@ class TableEnvironmentTest extends TableTestBase {
 
     val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-    val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT 
a, b, c FROM $table")
+    val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+        s"UNION ALL SELECT a, b, c FROM $table")
 
     val expected2 = binaryNode(
       "DataStreamUnion",
       streamTableNode(1),
       streamTableNode(0),
-      term("union", "d, e, f"))
+      term("union all", "d, e, f"))
 
     util.verifyTable(sqlTable2, expected2)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 6f03bec..324b4d6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -516,7 +516,7 @@ class WindowAggregateTest extends TableTestBase {
       )
     streamUtil.verifySql(sql, expected)
   }
-  
+
   @Test
   def testBoundPartitionedProcTimeWindowWithRowRange() = {
     val sql = "SELECT " +

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 9a6562a..e3e292e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.expressions.utils
 
-import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, 
HepProgramBuilder}
 import java.util
 import java.util.concurrent.Future
 
 import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, 
HepProgramBuilder}
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql2rel.RelDecorrelator
@@ -43,7 +43,8 @@ import org.apache.flink.table.calcite.FlinkPlannerImpl
 import org.apache.flink.table.codegen.{CodeGenerator, Compiler, 
GeneratedFunction}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, 
DataSetConvention}
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.dataset.DataSetCalc
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -66,7 +67,8 @@ abstract class ExpressionTestBase {
     context._2.getFrameworkConfig,
     context._2.getPlanner,
     context._2.getTypeFactory)
-  private val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
+  private val logicalOptProgram = 
Programs.ofRules(FlinkRuleSets.LOGICAL_OPT_RULES)
+  private val dataSetOptProgram = 
Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
 
   private def hepPlanner = {
     val builder = new HepProgramBuilder
@@ -194,9 +196,14 @@ abstract class ExpressionTestBase {
       decorPlan
     }
 
-    // create DataSetCalc
-    val flinkOutputProps = 
converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-    val dataSetCalc = optProgram.run(context._2.getPlanner, normalizedPlan, 
flinkOutputProps,
+    // convert to logical plan
+    val logicalProps = 
converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+    val logicalCalc = logicalOptProgram.run(context._2.getPlanner, 
normalizedPlan, logicalProps,
+      ImmutableList.of(), ImmutableList.of())
+
+    // convert to dataset plan
+    val physicalProps = 
converted.getTraitSet.replace(FlinkConventions.DATASET).simplify()
+    val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, 
logicalCalc, physicalProps,
       ImmutableList.of(), ImmutableList.of())
 
     // extract RexNode
@@ -219,8 +226,15 @@ abstract class ExpressionTestBase {
 
     // create DataSetCalc
     val decorPlan = RelDecorrelator.decorrelateQuery(converted)
-    val flinkOutputProps = 
converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-    val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, 
flinkOutputProps,
+
+    // convert to logical plan
+    val flinkLogicalProps = 
converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+    val logicalCalc = logicalOptProgram.run(context._2.getPlanner, decorPlan, 
flinkLogicalProps,
+      ImmutableList.of(), ImmutableList.of())
+
+    // convert to dataset plan
+    val flinkPhysicalProps = 
converted.getTraitSet.replace(FlinkConventions.DATASET).simplify()
+    val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, 
logicalCalc, flinkPhysicalProps,
       ImmutableList.of(), ImmutableList.of())
 
     // extract RexNode

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
index 8b6d6cf..b563a8b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
@@ -29,14 +29,15 @@ import org.junit.Test
 class NormalizationRulesTest extends TableTestBase {
 
   @Test
-  def testApplyNormalizationRuleForForBatchSQL(): Unit = {
+  def testApplyNormalizationRuleForBatchSQL(): Unit = {
     val util = batchTestUtil()
 
     // rewrite distinct aggregate
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
-      .replaceOptRuleSet(RuleSets.ofList())
-      .build()
+        
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
+        .replaceLogicalOptRuleSet(RuleSets.ofList())
+        .replacePhysicalOptRuleSet(RuleSets.ofList())
+        .build()
     util.tEnv.getConfig.setCalciteConfig(cc)
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -62,14 +63,15 @@ class NormalizationRulesTest extends TableTestBase {
   }
 
   @Test
-  def testApplyNormalizationRuleForForStreamSQL(): Unit = {
+  def testApplyNormalizationRuleForStreamSQL(): Unit = {
     val util = streamTestUtil()
 
     // rewrite distinct aggregate
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
-      .replaceOptRuleSet(RuleSets.ofList())
-      .build()
+        
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
+        .replaceLogicalOptRuleSet(RuleSets.ofList())
+        .replacePhysicalOptRuleSet(RuleSets.ofList())
+        .build()
     util.tEnv.getConfig.setCalciteConfig(cc)
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 6a86ace..c5e13a1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -35,5 +35,5 @@ class MockTableEnvironment extends TableEnvironment(new 
TableConfig) {
 
   override protected def getBuiltInNormRuleSet: RuleSet = ???
 
-  override protected def getBuiltInOptRuleSet: RuleSet = ???
+  override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out 
b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index fc83c0d..2d19bdc 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataStreamTable_1]])
 
 == Optimized Logical Plan ==
-DataStreamUnion(union=[count, word])
+DataStreamUnion(union all=[count, word])
   DataStreamScan(table=[[_DataStreamTable_0]])
   DataStreamScan(table=[[_DataStreamTable_1]])
 

Reply via email to