This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fd04309  [FLINK-11834] [table-planner-blink] Introduce flink logical 
relational nodes (#7910)
fd04309 is described below

commit fd04309c8828637de895cbb8470bc210a3f33b66
Author: godfrey he <godfre...@163.com>
AuthorDate: Thu Mar 7 08:54:43 2019 +0800

    [FLINK-11834] [table-planner-blink] Introduce flink logical relational 
nodes (#7910)
    
    * [FLINK-11834] [table-planner-blink] Introduce flink logical relational 
nodes
    
    This commit includes most flink logical relational nodes, the rest will be 
introduced by other commit.
    
    * add FlinkLogicalTableFunctionScan
    * add FlinkLogicalTableSourceScan
    * move RankRange into Rank.scala file
---
 .../flink/table/calcite/FlinkTypeFactory.scala     |  67 ++++-
 .../flink/table/plan/nodes/FlinkRelNode.scala      |  89 +++++++
 .../flink/table/plan/nodes/calcite/Expand.scala    |  96 ++++++++
 .../table/plan/nodes/calcite/LogicalExpand.scala   |  72 ++++++
 .../table/plan/nodes/calcite/LogicalRank.scala     |  84 +++++++
 .../table/plan/nodes/calcite/LogicalSink.scala     |  57 +++++
 .../LogicalWatermarkAssigner.scala}                |  21 +-
 .../flink/table/plan/nodes/calcite/Rank.scala      | 190 ++++++++++++++
 .../flink/table/plan/nodes/calcite/Sink.scala      |  59 +++++
 .../plan/nodes/calcite/WatermarkAssigner.scala     |  64 +++++
 .../plan/nodes/logical/FlinkLogicalAggregate.scala | 160 ++++++++++++
 .../plan/nodes/logical/FlinkLogicalCalc.scala      | 108 ++++++++
 .../plan/nodes/logical/FlinkLogicalCorrelate.scala | 106 ++++++++
 .../logical/FlinkLogicalDataStreamTableScan.scala  |   4 +
 .../plan/nodes/logical/FlinkLogicalExpand.scala    | 103 ++++++++
 .../plan/nodes/logical/FlinkLogicalIntersect.scala |  89 +++++++
 .../plan/nodes/logical/FlinkLogicalJoin.scala      | 102 ++++++++
 .../plan/nodes/logical/FlinkLogicalMinus.scala     |  87 +++++++
 .../nodes/logical/FlinkLogicalOverWindow.scala     |  98 ++++++++
 .../plan/nodes/logical/FlinkLogicalRank.scala      | 116 +++++++++
 .../plan/nodes/logical/FlinkLogicalSink.scala      |  80 ++++++
 .../plan/nodes/logical/FlinkLogicalSort.scala      | 122 +++++++++
 .../logical/FlinkLogicalTableFunctionScan.scala    | 155 ++++++++++++
 .../logical/FlinkLogicalTableSourceScan.scala      | 116 +++++++++
 .../plan/nodes/logical/FlinkLogicalUnion.scala     |  93 +++++++
 .../plan/nodes/logical/FlinkLogicalValues.scala    |  84 +++++++
 .../logical/FlinkLogicalWatermarkAssigner.scala    |  81 ++++++
 .../flink/table/plan/schema/DataStreamTable.scala  |   3 +
 .../flink/table/plan/schema/FlinkRelOptTable.scala | 272 +++++++++++++++++++++
 .../TableSourceTable.scala}                        |  25 +-
 .../plan/schema/TimeIndicatorRelDataType.scala     |  57 +++++
 .../flink/table/plan/util/AggregateUtil.scala}     |  42 ++--
 .../apache/flink/table/plan/util/CalcUtil.scala    |  64 +++++
 .../FlinkRelMdUtil.scala}                          |  15 +-
 .../flink/table/plan/util/FlinkRelOptUtil.scala}   |  41 ++--
 .../FlinkRelNode.scala => util/SortUtil.scala}     |  17 +-
 .../BatchTableSource.scala}                        |  18 +-
 .../StreamTableSource.scala}                       |  18 +-
 .../flink/table/sources/TableSourceUtil.scala      |  65 +++++
 .../org/apache/flink/table/type/InternalTypes.java |   4 +
 .../org/apache/flink/table/type/TimestampType.java |  30 ++-
 41 files changed, 3099 insertions(+), 75 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 0ba2252..219c1f4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -19,13 +19,13 @@
 package org.apache.flink.table.calcite
 
 import org.apache.flink.table.`type`.{ArrayType, DecimalType, InternalType, 
InternalTypes, MapType, RowType}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, 
RowRelDataType, RowSchema}
+import org.apache.flink.table.api.{TableException, TableSchema}
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, 
RowRelDataType, RowSchema, TimeIndicatorRelDataType}
 
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`._
-import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 
 import java.util
 
@@ -92,6 +92,19 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   }
 
   /**
+    * Creates a indicator type for event-time, but with similar properties as 
SQL timestamp.
+    */
+  def createRowtimeIndicatorType(): RelDataType = {
+    val originalType = createTypeFromInternalType(InternalTypes.TIMESTAMP, 
isNullable = false)
+    canonize(
+      new TimeIndicatorRelDataType(
+        getTypeSystem,
+        originalType.asInstanceOf[BasicSqlType],
+        isEventTime = true)
+    )
+  }
+
+  /**
     * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory
     *
     * @param fieldNames field names
@@ -129,6 +142,49 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
     logicalRowTypeBuilder.build
   }
 
+  /**
+    * Created a struct type with the input table schema using FlinkTypeFactory
+    * @param tableSchema  the table schema
+    * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
+    */
+  def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
+    buildRelDataType(
+      tableSchema.getFieldNames.toSeq,
+      tableSchema.getFieldTypes map {
+        case InternalTypes.PROCTIME_INDICATOR if isStreaming.isDefined && 
!isStreaming.get =>
+          InternalTypes.TIMESTAMP
+        case InternalTypes.ROWTIME_INDICATOR if isStreaming.isDefined && 
!isStreaming.get =>
+          InternalTypes.TIMESTAMP
+        case tpe: InternalType => tpe
+      })
+  }
+
+  def buildRelDataType(
+      fieldNames: Seq[String],
+      fieldTypes: Seq[InternalType]): RelDataType = {
+    buildRelDataType(
+      fieldNames,
+      fieldTypes,
+      fieldTypes.map(!FlinkTypeFactory.isTimeIndicatorType(_)))
+  }
+
+  def buildRelDataType(
+      fieldNames: Seq[String],
+      fieldTypes: Seq[InternalType],
+      fieldNullables: Seq[Boolean]): RelDataType = {
+    val b = builder
+    val fields = fieldNames.zip(fieldTypes).zip(fieldNullables)
+    fields foreach {
+      case ((fieldName, fieldType), fieldNullable) =>
+        if (FlinkTypeFactory.isTimeIndicatorType(fieldType) && fieldNullable) {
+          throw new TableException(
+            s"$fieldName can not be nullable because it is TimeIndicatorType!")
+        }
+        b.add(fieldName, createTypeFromInternalType(fieldType, fieldNullable))
+    }
+    b.build
+  }
+
   // 
----------------------------------------------------------------------------------------------
 
   override def getJavaClass(`type`: RelDataType): java.lang.reflect.Type = {
@@ -221,6 +277,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 
 object FlinkTypeFactory {
 
+  def isTimeIndicatorType(t: InternalType): Boolean = t match {
+    case InternalTypes.ROWTIME_INDICATOR | InternalTypes.PROCTIME_INDICATOR => 
true
+    case _ => false
+  }
+
   def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
     case BOOLEAN => InternalTypes.BOOLEAN
     case TINYINT => InternalTypes.BYTE
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index 48b0bc6..fee8b3e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -18,11 +18,100 @@
 
 package org.apache.flink.table.plan.nodes
 
+import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat
+
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlAsOperator
+import org.apache.calcite.sql.SqlKind._
+
+import scala.collection.JavaConversions._
 
 /**
   * Base class for flink relational expression.
   */
 trait FlinkRelNode extends RelNode {
 
+  private[flink] def getExpressionString(
+      expr: RexNode,
+      inFields: List[String],
+      localExprsTable: Option[List[RexNode]]): String = {
+    getExpressionString(expr, inFields, localExprsTable, 
ExpressionFormat.Prefix)
+  }
+
+  private[flink] def getExpressionString(
+      expr: RexNode,
+      inFields: List[String],
+      localExprsTable: Option[List[RexNode]],
+      expressionFormat: ExpressionFormat): String = {
+
+    expr match {
+      case pr: RexPatternFieldRef =>
+        val alpha = pr.getAlpha
+        val field = inFields.get(pr.getIndex)
+        s"$alpha.$field"
+
+      case i: RexInputRef =>
+        inFields.get(i.getIndex)
+
+      case l: RexLiteral =>
+        l.toString
+
+      case l: RexLocalRef if localExprsTable.isEmpty =>
+        throw new IllegalArgumentException("Encountered RexLocalRef without " +
+          "local expression table")
+
+      case l: RexLocalRef =>
+        val lExpr = localExprsTable.get(l.getIndex)
+        getExpressionString(lExpr, inFields, localExprsTable, expressionFormat)
+
+      case c: RexCall =>
+        val op = c.getOperator.toString
+        val ops = c.getOperands.map(
+          getExpressionString(_, inFields, localExprsTable, expressionFormat))
+        c.getOperator match {
+          case _ : SqlAsOperator => ops.head
+          case _ =>
+            expressionFormat match {
+              case ExpressionFormat.Infix if ops.size() == 1 =>
+                val operand = ops.head
+                c.getKind match {
+                  case IS_FALSE | IS_NOT_FALSE | IS_TRUE | IS_NOT_TRUE | 
IS_UNKNOWN | IS_NULL |
+                       IS_NOT_NULL => s"$operand $op"
+                  case _ => s"$op($operand)"
+                }
+              case ExpressionFormat.Infix => s"(${ops.mkString(s" $op ")})"
+              case ExpressionFormat.PostFix => s"(${ops.mkString(", ")})$op"
+              case ExpressionFormat.Prefix => s"$op(${ops.mkString(", ")})"
+            }
+        }
+
+      case fa: RexFieldAccess =>
+        val referenceExpr = getExpressionString(
+          fa.getReferenceExpr,
+          inFields,
+          localExprsTable,
+          expressionFormat)
+        val field = fa.getField.getName
+        s"$referenceExpr.$field"
+      case cv: RexCorrelVariable =>
+        cv.toString
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown expression type 
'${expr.getClass}': $expr")
+    }
+  }
+
+}
+
+/**
+  * Infix, Postfix and Prefix notations are three different but equivalent 
ways of writing
+  * expressions. It is easiest to demonstrate the differences by looking at 
examples of operators
+  * that take two operands.
+  * Infix notation: (X + Y)
+  * Postfix notation: (X Y) +
+  * Prefix notation: + (X Y)
+  */
+object ExpressionFormat extends Enumeration {
+  type ExpressionFormat = Value
+  val Infix, PostFix, Prefix = Value
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Expand.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Expand.scala
new file mode 100644
index 0000000..6a3fc2a
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Expand.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.calcite
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.util.Litmus
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Relational expression that apply a number of projects to every input row,
+  * hence we will get multiple output rows for an input row.
+  *
+  * <p/> Values of expand_id should be unique.
+  *
+  * @param cluster       cluster that this relational expression belongs to
+  * @param traits        the traits of this rel
+  * @param input         input relational expression
+  * @param outputRowType output row type
+  * @param projects      all projects, each project contains list of 
expressions for
+  *                      the output columns
+  * @param expandIdIndex expand_id('$e') field index
+  */
+abstract class Expand(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    input: RelNode,
+    outputRowType: RelDataType,
+    val projects: util.List[util.List[RexNode]],
+    val expandIdIndex: Int)
+  extends SingleRel(cluster, traits, input) {
+
+  isValid(Litmus.THROW, null)
+
+  override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = {
+    if (projects.size() <= 1) {
+      return litmus.fail("Expand should output more than one rows, otherwise 
use Project.")
+    }
+    if (projects.exists(_.size != outputRowType.getFieldCount)) {
+      return litmus.fail("project filed count is not equal to output field 
count.")
+    }
+    if (expandIdIndex < 0 || expandIdIndex >= outputRowType.getFieldCount) {
+      return litmus.fail(
+        "expand_id field index should be greater than 0 and less than output 
field count.")
+    }
+    val expandIdValues = new util.HashSet[Any]()
+    for (project <- projects) {
+      project.get(expandIdIndex) match {
+        case literal: RexLiteral => expandIdValues.add(literal.getValue)
+        case _ => return litmus.fail("expand_id value should not be null.")
+      }
+    }
+    if (expandIdValues.size() != projects.size()) {
+      return litmus.fail("values of expand_id should be unique.")
+    }
+    litmus.succeed()
+  }
+
+  override def deriveRowType(): RelDataType = outputRowType
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val rowCnt = mq.getRowCount(this.getInput) * projects.size()
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
+
+  override def estimateRowCount(mq: RelMetadataQuery): Double = {
+    val childRowCnt = mq.getRowCount(this.getInput)
+    if (childRowCnt != null) {
+      childRowCnt * projects.size()
+    } else {
+      null.asInstanceOf[Double]
+    }
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalExpand.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalExpand.scala
new file mode 100644
index 0000000..38ae0fb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalExpand.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.calcite
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Expand]] that is a relational expression
+  * which returns multiple rows expanded from one input row.
+  * This class corresponds to Calcite logical rel.
+  */
+final class LogicalExpand(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    input: RelNode,
+    outputRowType: RelDataType,
+    projects: util.List[util.List[RexNode]],
+    expandIdIndex: Int)
+  extends Expand(cluster, traits, input, outputRowType, projects, 
expandIdIndex) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new LogicalExpand(cluster, traitSet, inputs.get(0), outputRowType, 
projects, expandIdIndex)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val names = outputRowType.getFieldNames
+    val terms = projects.map {
+      project =>
+        project.zipWithIndex.map {
+          case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
+          case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
+          case (o, _) => s"$o"
+        }.mkString("{", ", ", "}")
+    }.mkString(", ")
+    super.explainTerms(pw).item("projects", terms)
+  }
+}
+
+object LogicalExpand {
+  def create(
+      input: RelNode,
+      outputRowType: RelDataType,
+      projects: util.List[util.List[RexNode]],
+      expandIdIndex: Int): LogicalExpand = {
+    val traits = input.getCluster.traitSetOf(Convention.NONE)
+    new LogicalExpand(input.getCluster, traits, input, outputRowType, 
projects, expandIdIndex)
+  }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalRank.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalRank.scala
new file mode 100644
index 0000000..c5eb9ab
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalRank.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.calcite
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelCollation, RelNode}
+import org.apache.calcite.sql.SqlRankFunction
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Rank]] that is a relational expression which returns
+  * the rows in which the rank function value of each row is in the given 
range.
+  * This class corresponds to Calcite logical rel.
+  */
+final class LogicalRank(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rankFunction: SqlRankFunction,
+    partitionKey: ImmutableBitSet,
+    sortCollation: RelCollation,
+    rankRange: RankRange)
+  extends Rank(
+    cluster,
+    traitSet,
+    input,
+    rankFunction,
+    partitionKey,
+    sortCollation,
+    rankRange) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new LogicalRank(
+      cluster,
+      traitSet,
+      inputs.head,
+      rankFunction,
+      partitionKey,
+      sortCollation,
+      rankRange
+    )
+  }
+}
+
+object LogicalRank {
+
+  def create(
+      input: RelNode,
+      rankFunction: SqlRankFunction,
+      partitionKey: ImmutableBitSet,
+      sortCollation: RelCollation,
+      rankRange: RankRange): LogicalRank = {
+    val traits = input.getCluster.traitSetOf(Convention.NONE)
+    new LogicalRank(
+      input.getCluster,
+      traits,
+      input,
+      rankFunction,
+      partitionKey,
+      sortCollation,
+      rankRange
+    )
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalSink.scala
new file mode 100644
index 0000000..dcd6a29
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.calcite
+
+import java.util
+
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Sink]] that is a relational expression
+  * which writes out data of input node into a [[TableSink]].
+  * This class corresponds to Calcite logical rel.
+  */
+final class LogicalSink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    sink: TableSink[_],
+    sinkName: String)
+  extends Sink(cluster, traitSet, input, sink, sinkName) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new LogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
+  }
+
+}
+
+object LogicalSink {
+
+  def create(input: RelNode,
+      sink: TableSink[_],
+      sinkName: String): LogicalSink = {
+    val traits = input.getCluster.traitSetOf(Convention.NONE)
+    new LogicalSink(input.getCluster, traits, input, sink, sinkName)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala
similarity index 54%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala
index 48b0bc6..ab92a16 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala
@@ -16,13 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.nodes
+package org.apache.flink.table.plan.nodes.calcite
 
+import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 
+import java.util
+
 /**
-  * Base class for flink relational expression.
+  * Sub-class of [[WatermarkAssigner]] that is a relational operator
+  * which generates [[org.apache.flink.streaming.api.watermark.Watermark]].
+  * This class corresponds to Calcite logical rel.
   */
-trait FlinkRelNode extends RelNode {
+final class LogicalWatermarkAssigner(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    input: RelNode,
+    rowtimeField: String,
+    watermarkOffset: Long)
+  extends WatermarkAssigner(cluster, traits, input, rowtimeField, 
watermarkOffset) {
 
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new LogicalWatermarkAssigner(cluster, traits, inputs.get(0), rowtimeField, 
watermarkOffset)
+  }
 }
+
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala
new file mode 100644
index 0000000..b7008f8
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.calcite
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.util._
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.calcite.sql.SqlRankFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.{ImmutableBitSet, NumberUtil}
+
+import java.util
+import scala.collection.JavaConversions._
+
+/**
+  * Relational expression that returns the rows in which the rank function 
value of each row
+  * is in the given range.
+  *
+  * <p>NOTES: Different from 
[[org.apache.calcite.sql.fun.SqlStdOperatorTable.RANK]],
+  * [[Rank]] is a Relational expression, not a window function.
+  *
+  * <p>[[Rank]] will output rank function value as its last column.
+  *
+  * <p>This RelNode only handles single rank function, is an optimization for 
some cases. e.g.
+  * <ol>
+  * <li>
+  *   single rank function (on `OVER`) with filter in a SQL query statement
+  * </li>
+  * <li>
+  *   `ORDER BY` with `LIMIT` in a SQL query statement
+  *   (equivalent to `ROW_NUMBER` with filter and project)
+  * </li>
+  * </ol>
+  *
+  * @param cluster        cluster that this relational expression belongs to
+  * @param traitSet       the traits of this rel
+  * @param input          input relational expression
+  * @param rankFunction   rank function, including: CUME_DIST, DENSE_RANK, 
PERCENT_RANK, RANK,
+  *                       ROW_NUMBER
+  * @param partitionKey   partition keys (may be empty)
+  * @param sortCollation  order keys for rank function
+  * @param rankRange      the expected range of rank function value
+  */
+abstract class Rank(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    val rankFunction: SqlRankFunction,
+    val partitionKey: ImmutableBitSet,
+    val sortCollation: RelCollation,
+    val rankRange: RankRange)
+  extends SingleRel(cluster, traitSet, input) {
+
+  rankRange match {
+    case r: ConstantRankRange =>
+      if (r.rankEnd <= 0) {
+        throw new TableException(s"Rank end can't smaller than zero. The rank 
end is ${r.rankEnd}")
+      }
+      if (r.rankStart > r.rankEnd) {
+        throw new TableException(
+          s"Rank start '${r.rankStart}' can't greater than rank end 
'${r.rankEnd}'.")
+      }
+    case v: VariableRankRange =>
+      if (v.rankEndIndex < 0) {
+        throw new TableException(s"Rank end index can't smaller than zero.")
+      }
+      if (v.rankEndIndex >= input.getRowType.getFieldCount) {
+        throw new TableException(s"Rank end index can't greater than input 
field count.")
+      }
+  }
+
+  override def deriveRowType(): RelDataType = {
+    val typeFactory = cluster.getRexBuilder.getTypeFactory
+    val typeBuilder = typeFactory.builder()
+    input.getRowType.getFieldList.foreach(typeBuilder.add)
+    // rank function column is always the last column, and its type is BIGINT 
NOT NULL
+    val allFieldNames = new util.HashSet[String]()
+    allFieldNames.addAll(input.getRowType.getFieldNames)
+    val rankFieldName = FlinkRelOptUtil.buildUniqueFieldName(allFieldNames, 
"rk")
+    val bigIntType = typeFactory.createSqlType(SqlTypeName.BIGINT)
+    typeBuilder.add(rankFieldName, 
typeFactory.createTypeWithNullability(bigIntType, false))
+    typeBuilder.build()
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val select = getRowType.getFieldNames.zipWithIndex.map {
+      case (name, idx) => s"$name=$$$idx"
+    }.mkString(", ")
+    super.explainTerms(pw)
+      .item("rankFunction", rankFunction)
+      .item("partitionBy", partitionKey.map(i => s"$$$i").mkString(","))
+      .item("orderBy", Rank.sortFieldsToString(sortCollation))
+      .item("rankRange", rankRange.toString())
+      .item("select", select)
+  }
+
+  override def estimateRowCount(mq: RelMetadataQuery): Double = {
+    val countPerGroup = FlinkRelMdUtil.getRankRangeNdv(rankRange)
+    if (partitionKey.isEmpty) {
+      // only one group
+      countPerGroup
+    } else {
+      val inputRowCount = mq.getRowCount(input)
+      val numOfGroup = mq.getDistinctRowCount(input, partitionKey, null)
+      if (numOfGroup != null) {
+        NumberUtil.min(numOfGroup * countPerGroup, inputRowCount)
+      } else {
+        NumberUtil.min(mq.getRowCount(input) * 0.1, inputRowCount)
+      }
+    }
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val rowCount = mq.getRowCount(input)
+    val cpuCost = rowCount
+    planner.getCostFactory.makeCost(rowCount, cpuCost, 0)
+  }
+
+}
+
+sealed trait RankRange extends Serializable {
+  def toString(inputFieldNames: Seq[String]): String
+}
+
+/** [[ConstantRankRangeWithoutEnd]] is a RankRange which not specify RankEnd. 
*/
+case class ConstantRankRangeWithoutEnd(rankStart: Long) extends RankRange {
+  override def toString(inputFieldNames: Seq[String]): String = this.toString
+
+  override def toString: String = s"rankStart=$rankStart"
+}
+
+/** rankStart and rankEnd are inclusive, rankStart always start from one. */
+case class ConstantRankRange(rankStart: Long, rankEnd: Long) extends RankRange 
{
+
+  override def toString(inputFieldNames: Seq[String]): String = this.toString
+
+  override def toString: String = s"rankStart=$rankStart, rankEnd=$rankEnd"
+}
+
+/** changing rank limit depends on input */
+case class VariableRankRange(rankEndIndex: Int) extends RankRange {
+  override def toString(inputFieldNames: Seq[String]): String = {
+    s"rankEnd=${inputFieldNames(rankEndIndex)}"
+  }
+
+  override def toString: String = {
+    s"rankEnd=$$$rankEndIndex"
+  }
+}
+
+object Rank {
+  def sortFieldsToString(collationSort: RelCollation): String = {
+    val fieldCollations = collationSort.getFieldCollations
+      .map(c => (c.getFieldIndex, SortUtil.directionToOrder(c.getDirection)))
+
+    fieldCollations.map {
+      case (index, order) => s"$$$index ${order.getShortName}"
+    }.mkString(", ")
+  }
+
+  def sortFieldsToString(collationSort: RelCollation, inputType: RelDataType): 
String = {
+    val fieldCollations = collationSort.getFieldCollations
+      .map(c => (c.getFieldIndex, SortUtil.directionToOrder(c.getDirection)))
+    val inputFieldNames = inputType.getFieldNames
+
+    fieldCollations.map {
+      case (index, order) => s"${inputFieldNames.get(index)} 
${order.getShortName}"
+    }.mkString(", ")
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Sink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Sink.scala
new file mode 100644
index 0000000..29623c3
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Sink.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.calcite
+
+import org.apache.flink.table.`type`.TypeConverters
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+
+/**
+  * Relational expression that writes out data of input node into a 
[[TableSink]].
+  *
+  * @param cluster  cluster that this relational expression belongs to
+  * @param traitSet the traits of this rel
+  * @param input    input relational expression
+  * @param sink     Table sink to write into
+  * @param sinkName Name of tableSink, which is not required property, that 
is, it could be null
+  */
+abstract class Sink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    val sink: TableSink[_],
+    val sinkName: String)
+  extends SingleRel(cluster, traitSet, input) {
+
+  override def deriveRowType(): RelDataType = {
+    val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    val outputType = sink.getOutputType
+    val internalType = 
TypeConverters.createInternalTypeFromTypeInfo(outputType)
+    typeFactory.createTypeFromInternalType(internalType, isNullable = true)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .itemIf("name", sinkName, sinkName != null)
+      .item("fields", sink.getFieldNames.mkString(", "))
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
new file mode 100644
index 0000000..5976648
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.calcite
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Relational operator that generates 
[[org.apache.flink.streaming.api.watermark.Watermark]].
+  */
+abstract class WatermarkAssigner(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    inputNode: RelNode,
+    val rowtimeField: String,
+    val watermarkOffset: Long)
+  extends SingleRel(cluster, traits, inputNode) {
+
+  override def deriveRowType(): RelDataType = {
+    val inputRowType = inputNode.getRowType
+    val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    val newFieldList = inputRowType.getFieldList.map { f =>
+      if (f.getName.equals(rowtimeField)) {
+        val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType()
+        new RelDataTypeFieldImpl(rowtimeField, f.getIndex, 
rowtimeIndicatorType)
+      } else {
+        f
+      }
+    }
+
+    val builder = typeFactory.builder
+    builder.addAll(newFieldList)
+    builder.build()
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("fields", getRowType.getFieldNames)
+      .item("rowtimeField", rowtimeField)
+      .item("watermarkOffset", watermarkOffset)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
new file mode 100644
index 0000000..32ca930
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.util.AggregateUtil
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Aggregate]] that is a relational operator which eliminates
+  * duplicates and computes totals in Flink.
+  */
+class FlinkLogicalAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    child: RelNode,
+    indicator: Boolean,
+    groupSet: ImmutableBitSet,
+    groupSets: util.List[ImmutableBitSet],
+    aggCalls: util.List[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, 
aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      input: RelNode,
+      indicator: Boolean,
+      groupSet: ImmutableBitSet,
+      groupSets: util.List[ImmutableBitSet],
+      aggCalls: util.List[AggregateCall]): Aggregate = {
+    new FlinkLogicalAggregate(
+      cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    if (getGroupSets.size > 1 || 
AggregateUtil.getGroupIdExprIndexes(getAggCallList).nonEmpty) {
+      planner.getCostFactory.makeInfiniteCost()
+    } else {
+      val child = this.getInput
+      val rowCnt = mq.getRowCount(child)
+      val rowSize = mq.getAverageRowSize(child)
+      val aggCnt = this.getAggCallList.size
+      // group by CPU cost(multiple by 1.1 to encourage less group keys) + agg 
call CPU cost
+      val cpuCost: Double = rowCnt * getGroupCount * 1.1 + rowCnt * aggCnt
+      planner.getCostFactory.makeCost(rowCnt, cpuCost, rowCnt * rowSize)
+    }
+  }
+
+}
+
+private class FlinkLogicalAggregateBatchConverter
+  extends ConverterRule(
+    classOf[LogicalAggregate],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalAggregateBatchConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    // we do not support these functions natively
+    // they have to be converted using the FlinkAggregateReduceFunctionsRule
+    val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
+      // we support AVG
+      case SqlKind.AVG => true
+      // but none of the other AVG agg functions
+      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+      case _ => true
+    }
+
+    val hasAccurateDistinctCall = 
AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)
+
+    !hasAccurateDistinctCall && supported
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg = rel.asInstanceOf[LogicalAggregate]
+    val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
+    FlinkLogicalAggregate.create(
+      newInput,
+      agg.indicator,
+      agg.getGroupSet,
+      agg.getGroupSets,
+      agg.getAggCallList)
+  }
+}
+
+private class FlinkLogicalAggregateStreamConverter
+  extends ConverterRule(
+    classOf[LogicalAggregate],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalAggregateStreamConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    // we do not support these functions natively
+    // they have to be converted using the FlinkAggregateReduceFunctionsRule
+    agg.getAggCallList.map(_.getAggregation.getKind).forall {
+      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
+      case _ => true
+    }
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg = rel.asInstanceOf[LogicalAggregate]
+    val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
+    FlinkLogicalAggregate.create(
+      newInput,
+      agg.indicator,
+      agg.getGroupSet,
+      agg.getGroupSets,
+      agg.getAggCallList)
+  }
+}
+
+object FlinkLogicalAggregate {
+  val BATCH_CONVERTER: ConverterRule = new 
FlinkLogicalAggregateBatchConverter()
+  val STREAM_CONVERTER: ConverterRule = new 
FlinkLogicalAggregateStreamConverter()
+
+  def create(
+      input: RelNode,
+      indicator: Boolean,
+      groupSet: ImmutableBitSet,
+      groupSets: util.List[ImmutableBitSet],
+      aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
+    val cluster = input.getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, 
groupSets, aggCalls)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
new file mode 100644
index 0000000..61a2244
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.util.CalcUtil
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexProgram}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Calc]] that is a relational expression which computes 
project expressions
+  * and also filters in Flink.
+  */
+class FlinkLogicalCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    calcProgram: RexProgram)
+  extends Calc(cluster, traitSet, input, calcProgram)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: 
RexProgram): Calc = {
+    new FlinkLogicalCalc(cluster, traitSet, child, program)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    FlinkLogicalCalc.computeCost(calcProgram, planner, mq, this)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    pw.input("input", getInput)
+      .item("select", CalcUtil.selectionToString(calcProgram, 
getExpressionString))
+      .itemIf("where",
+        CalcUtil.conditionToString(calcProgram, getExpressionString),
+        calcProgram.getCondition != null)
+  }
+
+}
+
+private class FlinkLogicalCalcConverter
+  extends ConverterRule(
+    classOf[LogicalCalc],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalCalcConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val calc = rel.asInstanceOf[LogicalCalc]
+    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
+    FlinkLogicalCalc.create(newInput, calc.getProgram)
+  }
+}
+
+object FlinkLogicalCalc {
+  val CONVERTER: ConverterRule = new FlinkLogicalCalcConverter()
+
+  def create(
+      input: RelNode,
+      calcProgram: RexProgram): FlinkLogicalCalc = {
+    val cluster = input.getCluster
+    val traitSet = 
cluster.traitSet.replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalCalc(cluster, traitSet, input, calcProgram)
+  }
+
+  def computeCost(
+      calcProgram: RexProgram,
+      planner: RelOptPlanner,
+      mq: RelMetadataQuery,
+      calc: Calc): RelOptCost = {
+    // compute number of expressions that do not access a field or literal, 
i.e. computations,
+    // conditions, etc. We only want to account for computations, not for 
simple projections.
+    // CASTs in RexProgram are reduced as far as possible by 
ReduceExpressionsRule
+    // in normalization stage. So we should ignore CASTs here in optimization 
stage.
+    val compCnt = 
calcProgram.getProjectList.map(calcProgram.expandLocalRef).toList.count {
+      case _: RexInputRef => false
+      case _: RexLiteral => false
+      case c: RexCall if c.getOperator.getName.equals("CAST") => false
+      case _ => true
+    }
+    val newRowCnt = mq.getRowCount(calc)
+    // TODO use inputRowCnt to compute cpu cost
+    planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
new file mode 100644
index 0000000..53800f6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Correlate, CorrelationId}
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.calcite.util.ImmutableBitSet
+
+/**
+  * Sub-class of [[Correlate]] that is a relational operator
+  * which performs nested-loop joins in Flink.
+  */
+class FlinkLogicalCorrelate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    correlationId: CorrelationId,
+    requiredColumns: ImmutableBitSet,
+    joinType: SemiJoinType)
+  extends Correlate(cluster, traitSet, left, right, correlationId, 
requiredColumns, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      left: RelNode,
+      right: RelNode,
+      correlationId: CorrelationId,
+      requiredColumns: ImmutableBitSet,
+      joinType: SemiJoinType): Correlate = {
+
+    new FlinkLogicalCorrelate(
+      cluster,
+      traitSet,
+      left,
+      right,
+      correlationId,
+      requiredColumns,
+      joinType)
+  }
+
+}
+
+class FlinkLogicalCorrelateConverter
+  extends ConverterRule(
+    classOf[LogicalCorrelate],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalCorrelateConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val correlate = rel.asInstanceOf[LogicalCorrelate]
+    val newLeft = RelOptRule.convert(correlate.getLeft, 
FlinkConventions.LOGICAL)
+    val newRight = RelOptRule.convert(correlate.getRight, 
FlinkConventions.LOGICAL)
+    FlinkLogicalCorrelate.create(
+      newLeft,
+      newRight,
+      correlate.getCorrelationId,
+      correlate.getRequiredColumns,
+      correlate.getJoinType)
+  }
+}
+
+object FlinkLogicalCorrelate {
+  val CONVERTER: ConverterRule = new FlinkLogicalCorrelateConverter()
+
+  def create(
+      left: RelNode,
+      right: RelNode,
+      correlationId: CorrelationId,
+      requiredColumns: ImmutableBitSet,
+      joinType: SemiJoinType): FlinkLogicalCorrelate = {
+    val cluster = left.getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalCorrelate(
+      cluster,
+      traitSet,
+      left,
+      right,
+      correlationId,
+      requiredColumns,
+      joinType)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
index b16f629..f6ca632 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
@@ -30,6 +30,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 
 import java.util
 
+/**
+  * Sub-class of [[TableScan]] that is a relational operator
+  * which returns the contents of a [[DataStreamTable]] in Flink.
+  */
 class FlinkLogicalDataStreamTableScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalExpand.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalExpand.scala
new file mode 100644
index 0000000..b7f37d0
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalExpand.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.calcite.{Expand, LogicalExpand}
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Expand]] that is a relational expression
+  * which returns multiple rows expanded from one input row.
+  */
+class FlinkLogicalExpand(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    input: RelNode,
+    outputRowType: RelDataType,
+    projects: util.List[util.List[RexNode]],
+    expandIdIndex: Int)
+  extends Expand(cluster, traits, input, outputRowType, projects, 
expandIdIndex)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new FlinkLogicalExpand(cluster, traitSet, inputs.get(0), outputRowType, 
projects, expandIdIndex)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val names = outputRowType.getFieldNames
+    val terms = projects.map {
+      project =>
+        project.zipWithIndex.map {
+          case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
+          case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
+          case (o, _) => s"$o"
+        }.mkString("{", ", ", "}")
+    }.mkString(", ")
+    super.explainTerms(pw).item("projects", terms)
+  }
+
+}
+
+private class FlinkLogicalExpandConverter
+  extends ConverterRule(
+    classOf[LogicalExpand],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalExpandConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val expand = rel.asInstanceOf[LogicalExpand]
+    val newInput = RelOptRule.convert(expand.getInput, 
FlinkConventions.LOGICAL)
+    FlinkLogicalExpand.create(
+      newInput,
+      expand.getRowType,
+      expand.projects,
+      expand.expandIdIndex)
+  }
+}
+
+object FlinkLogicalExpand {
+  val CONVERTER: ConverterRule = new FlinkLogicalExpandConverter()
+
+  def create(
+      input: RelNode,
+      outputRowType: RelDataType,
+      projects: util.List[util.List[RexNode]],
+      expandIdIndex: Int): FlinkLogicalExpand = {
+    val cluster = input.getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalExpand(
+      cluster,
+      traitSet,
+      input,
+      outputRowType,
+      projects,
+      expandIdIndex)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalIntersect.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalIntersect.scala
new file mode 100644
index 0000000..6c6a857
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalIntersect.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Intersect, SetOp}
+import org.apache.calcite.rel.logical.LogicalIntersect
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Intersect]] that is a relational expression
+  * which returns the intersection of the rows of its inputs in Flink.
+  */
+class FlinkLogicalIntersect(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputs: util.List[RelNode],
+    all: Boolean)
+  extends Intersect(cluster, traitSet, inputs, all)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode], all: 
Boolean): SetOp = {
+    new FlinkLogicalIntersect(cluster, traitSet, inputs, all)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val zeroCost = planner.getCostFactory.makeCost(0, 0, 0)
+    this.getInputs.foldLeft(zeroCost) {
+      (cost, input) =>
+        val rowCnt = mq.getRowCount(input)
+        val rowSize = mq.getAverageRowSize(input)
+        val inputCost = planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt 
* rowSize)
+        cost.plus(inputCost)
+    }
+  }
+
+}
+
+private class FlinkLogicalIntersectConverter
+  extends ConverterRule(
+    classOf[LogicalIntersect],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalIntersectConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val intersect = rel.asInstanceOf[LogicalIntersect]
+    val newInputs = intersect.getInputs.map {
+      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
+    }
+    FlinkLogicalIntersect.create(newInputs, intersect.all)
+  }
+}
+
+object FlinkLogicalIntersect {
+  val CONVERTER: ConverterRule = new FlinkLogicalIntersectConverter()
+
+  def create(
+      inputs: util.List[RelNode],
+      all: Boolean): FlinkLogicalIntersect = {
+    val cluster = inputs.get(0).getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalIntersect(cluster, traitSet, inputs, all)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
new file mode 100644
index 0000000..d33630b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Join]] that is a relational expression
+  * which combines two relational expressions according to some condition in 
Flink.
+  */
+class FlinkLogicalJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    condition: RexNode,
+    joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId], joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      conditionExpr: RexNode,
+      left: RelNode,
+      right: RelNode,
+      joinType: JoinRelType,
+      semiJoinDone: Boolean): Join = {
+    new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val leftRowCnt = mq.getRowCount(getLeft)
+    val leftRowSize = mq.getAverageRowSize(getLeft)
+
+    val rightRowCnt = mq.getRowCount(getRight)
+    val rightRowSize = mq.getAverageRowSize(getRight)
+
+    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+    val cpuCost = leftRowCnt + rightRowCnt
+    val rowCnt = leftRowCnt + rightRowCnt
+
+    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+/**
+  * Support all joins.
+  */
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+    classOf[LogicalJoin],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalJoinConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val join = rel.asInstanceOf[LogicalJoin]
+    val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
+    val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
+    FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, 
join.getJoinType)
+  }
+}
+
+object FlinkLogicalJoin {
+  val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter
+
+  def create(
+      left: RelNode,
+      right: RelNode,
+      condition: RexNode,
+      joinType: JoinRelType): FlinkLogicalJoin = {
+    val cluster = left.getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalJoin(cluster, traitSet, left, right, condition, joinType)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMinus.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMinus.scala
new file mode 100644
index 0000000..870262b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMinus.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Minus, SetOp}
+import org.apache.calcite.rel.logical.LogicalMinus
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Minus]] that is a relational expression which returns the 
rows of
+  * its first input minus any matching rows from its other inputs in Flink.
+  */
+class FlinkLogicalMinus(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputs: JList[RelNode],
+    all: Boolean)
+  extends Minus(cluster, traitSet, inputs, all)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: 
Boolean): SetOp = {
+    new FlinkLogicalMinus(cluster, traitSet, inputs, all)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val zeroCost = planner.getCostFactory.makeCost(0, 0, 0)
+    this.getInputs.foldLeft(zeroCost) {
+      (cost, input) =>
+        val rowCnt = mq.getRowCount(input)
+        val rowSize = mq.getAverageRowSize(input)
+        val inputCost = planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt 
* rowSize)
+        cost.plus(inputCost)
+    }
+  }
+
+}
+
+private class FlinkLogicalMinusConverter
+  extends ConverterRule(
+    classOf[LogicalMinus],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalMinusConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val minus = rel.asInstanceOf[LogicalMinus]
+    val newInputs = minus.getInputs.map {
+      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
+    }
+    FlinkLogicalMinus.create(newInputs, minus.all)
+  }
+}
+
+object FlinkLogicalMinus {
+  val CONVERTER: ConverterRule = new FlinkLogicalMinusConverter()
+
+  def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalMinus = {
+    val cluster = inputs.get(0).getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalMinus(cluster, traitSet, inputs, all)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
new file mode 100644
index 0000000..9550bb6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Window
+import org.apache.calcite.rel.logical.LogicalWindow
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.sql.SqlRankFunction
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Window]] that is a relational expression
+  * which represents a set of over window aggregates in Flink.
+  */
+class FlinkLogicalOverWindow(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    windowConstants: JList[RexLiteral],
+    rowType: RelDataType,
+    windowGroups: JList[Window.Group])
+  extends Window(cluster, traitSet, input, windowConstants, rowType, 
windowGroups)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+    new FlinkLogicalOverWindow(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      windowConstants,
+      getRowType,
+      windowGroups)
+  }
+
+}
+
+class FlinkLogicalOverWindowConverter
+  extends ConverterRule(
+    classOf[LogicalWindow],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalOverWindowConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val window = rel.asInstanceOf[LogicalWindow]
+    val cluster = rel.getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    val newInput = RelOptRule.convert(window.getInput, 
FlinkConventions.LOGICAL)
+
+    window.groups.foreach { group =>
+      val orderKeySize = group.orderKeys.getFieldCollations.size()
+      group.aggCalls.foreach { winAggCall =>
+        if (orderKeySize == 0 && winAggCall.op.isInstanceOf[SqlRankFunction]) {
+          throw new ValidationException("Over Agg: The window rank function 
without order by. " +
+            "please re-check the over window statement.")
+        }
+      }
+    }
+
+    new FlinkLogicalOverWindow(
+      rel.getCluster,
+      traitSet,
+      newInput,
+      window.constants,
+      window.getRowType,
+      window.groups)
+  }
+}
+
+object FlinkLogicalOverWindow {
+  val CONVERTER = new FlinkLogicalOverWindowConverter
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala
new file mode 100644
index 0000000..c93fd29
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.calcite.{LogicalRank, Rank, RankRange}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.sql.SqlRankFunction
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Rank]] that is a relational expression which returns
+  * the rows in which the rank function value of each row is in the given 
range.
+  */
+class FlinkLogicalRank(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rankFunction: SqlRankFunction,
+    partitionKey: ImmutableBitSet,
+    sortCollation: RelCollation,
+    rankRange: RankRange,
+    val outputRankFunColumn: Boolean)
+  extends Rank(cluster, traitSet, input, rankFunction, partitionKey, 
sortCollation, rankRange)
+  with FlinkLogicalRel {
+
+  override def deriveRowType(): RelDataType = {
+    if (outputRankFunColumn) {
+      super.deriveRowType()
+    } else {
+      input.getRowType
+    }
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val inputFieldNames = input.getRowType.getFieldNames
+    pw.item("input", getInput)
+      .item("rankFunction", rankFunction)
+      .item("partitionBy", 
partitionKey.map(inputFieldNames.get(_)).mkString(","))
+      .item("orderBy", Rank.sortFieldsToString(sortCollation, 
input.getRowType))
+      .item("rankRange", rankRange.toString(inputFieldNames))
+      .item("select", getRowType.getFieldNames.mkString(", "))
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new FlinkLogicalRank(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rankFunction,
+      partitionKey,
+      sortCollation,
+      rankRange,
+      outputRankFunColumn)
+  }
+
+}
+
+private class FlinkLogicalRankConverter extends ConverterRule(
+  classOf[LogicalRank],
+  Convention.NONE,
+  FlinkConventions.LOGICAL,
+  "FlinkLogicalRankConverter") {
+  override def convert(rel: RelNode): RelNode = {
+    val rank = rel.asInstanceOf[LogicalRank]
+    val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
+    FlinkLogicalRank.create(
+      newInput,
+      rank.rankFunction,
+      rank.partitionKey,
+      rank.sortCollation,
+      rank.rankRange,
+      outputRankFunColumn = true
+    )
+  }
+}
+
+object FlinkLogicalRank {
+  val CONVERTER: ConverterRule = new FlinkLogicalRankConverter
+
+  def create(
+      input: RelNode,
+      rankFunction: SqlRankFunction,
+      partitionKey: ImmutableBitSet,
+      sortCollation: RelCollation,
+      rankRange: RankRange,
+      outputRankFunColumn: Boolean): FlinkLogicalRank = {
+    val cluster = input.getCluster
+    val traits = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalRank(cluster, traits, input, rankFunction, partitionKey,
+      sortCollation, rankRange, outputRankFunColumn)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
new file mode 100644
index 0000000..28b98a2
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.calcite.{LogicalSink, Sink}
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Sink]] that is a relational expression
+  * which writes out data of input node into a [[TableSink]].
+  */
+class FlinkLogicalSink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    sink: TableSink[_],
+    sinkName: String)
+  extends Sink(cluster, traitSet, input, sink, sinkName)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new FlinkLogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
+  }
+
+}
+
+private class FlinkLogicalSinkConverter
+  extends ConverterRule(
+    classOf[LogicalSink],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalSinkConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val sink = rel.asInstanceOf[LogicalSink]
+    val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
+    FlinkLogicalSink.create(
+      newInput,
+      sink.sink,
+      sink.sinkName)
+  }
+}
+
+object FlinkLogicalSink {
+  val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter()
+
+  def create(
+      input: RelNode,
+      sink: TableSink[_],
+      sinkName: String): FlinkLogicalSink = {
+    val cluster = input.getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalSink(cluster, traitSet, input, sink, sinkName)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala
new file mode 100644
index 0000000..38e7bd5
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+
+/**
+  * Sub-class of [[Sort]] that is a relational expression which imposes
+  * a particular sort order on its input without otherwise changing its 
content in Flink.
+  */
+class FlinkLogicalSort(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    child: RelNode,
+    collation: RelCollation,
+    sortOffset: RexNode,
+    sortFetch: RexNode)
+  extends Sort(cluster, traits, child, collation, sortOffset, sortFetch)
+  with FlinkLogicalRel {
+
+  private lazy val limitStart: Long = if (offset != null) 
RexLiteral.intValue(offset) else 0L
+
+  override def copy(
+      traitSet: RelTraitSet,
+      newInput: RelNode,
+      newCollation: RelCollation,
+      offset: RexNode,
+      fetch: RexNode): Sort = {
+    new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, 
fetch)
+  }
+
+  override def estimateRowCount(mq: RelMetadataQuery): Double = {
+    val inputRowCnt = mq.getRowCount(this.getInput)
+    if (inputRowCnt == null) {
+      inputRowCnt
+    } else {
+      val rowCount = (inputRowCnt - limitStart).max(1.0)
+      if (fetch != null) {
+        val limit = RexLiteral.intValue(fetch)
+        rowCount.min(limit)
+      } else {
+        rowCount
+      }
+    }
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    // by default, assume cost is proportional to number of rows
+    val rowCount: Double = mq.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+}
+
+class FlinkLogicalSortStreamConverter
+  extends ConverterRule(
+    classOf[LogicalSort],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalSortStreamConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val sort = rel.asInstanceOf[LogicalSort]
+    val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL)
+    FlinkLogicalSort.create(newInput, sort.getCollation, sort.offset, 
sort.fetch)
+  }
+}
+
+class FlinkLogicalSortBatchConverter extends ConverterRule(
+  classOf[LogicalSort],
+  Convention.NONE,
+  FlinkConventions.LOGICAL,
+  "FlinkLogicalSortBatchConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val sort = rel.asInstanceOf[LogicalSort]
+    val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL)
+    // TODO supports range sort
+    FlinkLogicalSort.create(newInput, sort.getCollation, sort.offset, 
sort.fetch)
+  }
+}
+
+object FlinkLogicalSort {
+  val BATCH_CONVERTER: RelOptRule = new FlinkLogicalSortBatchConverter
+  val STREAM_CONVERTER: RelOptRule = new FlinkLogicalSortStreamConverter
+
+  def create(
+      input: RelNode,
+      collation: RelCollation,
+      sortOffset: RexNode,
+      sortFetch: RexNode): FlinkLogicalSort = {
+    val cluster = input.getCluster
+    val collationTrait = RelCollationTraitDef.INSTANCE.canonize(collation)
+    val traitSet = input.getTraitSet.replace(FlinkConventions.LOGICAL)
+      .replace(collationTrait).simplify()
+    new FlinkLogicalSort(cluster, traitSet, input, collation, sortOffset, 
sortFetch)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
new file mode 100644
index 0000000..18d3d35
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelColumnMapping
+import org.apache.calcite.rex.{RexLiteral, RexNode, RexUtil}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.lang.reflect.Type
+import java.util
+
+/**
+  * Sub-class of [[TableFunctionScan]] that is a relational expression
+  * which calls a table-valued function in Flink.
+  */
+class FlinkLogicalTableFunctionScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputs: util.List[RelNode],
+    rexCall: RexNode,
+    elementType: Type,
+    rowType: RelDataType,
+    columnMappings: util.Set[RelColumnMapping])
+  extends TableFunctionScan(
+    cluster,
+    traitSet,
+    inputs,
+    rexCall,
+    elementType,
+    rowType,
+    columnMappings)
+  with FlinkLogicalRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      inputs: util.List[RelNode],
+      rexCall: RexNode,
+      elementType: Type,
+      rowType: RelDataType,
+      columnMappings: util.Set[RelColumnMapping]): TableFunctionScan = {
+
+    new FlinkLogicalTableFunctionScan(
+      cluster,
+      traitSet,
+      inputs,
+      rexCall,
+      elementType,
+      rowType,
+      columnMappings)
+  }
+
+}
+
+class FlinkLogicalTableFunctionScanConverter
+  extends ConverterRule(
+    classOf[LogicalTableFunctionScan],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalTableFunctionScanConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    // TODO This rule do not match to TemporalTableFunction
+    super.matches(call)
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val scan = rel.asInstanceOf[LogicalTableFunctionScan]
+    val traitSet = 
rel.getCluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+
+    val constantTableFunction = RexUtil.isConstant(scan.getCall) && 
scan.getInputs.isEmpty
+    if (constantTableFunction) {
+      convertConstantFunctionTableScan(scan, traitSet)
+    } else {
+      createFlinkLogicalTableScan(scan, traitSet)
+    }
+  }
+
+  def createFlinkLogicalTableScan(
+      scan: LogicalTableFunctionScan,
+      traitSet: RelTraitSet): FlinkLogicalTableFunctionScan = {
+    new FlinkLogicalTableFunctionScan(
+      scan.getCluster,
+      traitSet,
+      scan.getInputs,
+      scan.getCall,
+      scan.getElementType,
+      scan.getRowType,
+      scan.getColumnMappings
+    )
+  }
+
+  /**
+    * Converts [[LogicalTableFunctionScan]] with constant RexCall to
+    * {{{
+    *                    [[FlinkLogicalCorrelate]]
+    *                          /       \
+    * empty [[FlinkLogicalValues]]  [[FlinkLogicalTableFunctionScan]]
+    * }}}
+    */
+  def convertConstantFunctionTableScan(
+      scan: LogicalTableFunctionScan,
+      traitSet: RelTraitSet): RelNode = {
+    val cluster = scan.getCluster
+
+    // create correlate left
+    val values = new FlinkLogicalValues(
+      cluster,
+      traitSet,
+      cluster.getTypeFactory.createStructType(ImmutableList.of(), 
ImmutableList.of()),
+      ImmutableList.of(ImmutableList.of[RexLiteral]())
+    )
+
+    // create correlate right
+    val newScan = createFlinkLogicalTableScan(scan, traitSet)
+
+    new FlinkLogicalCorrelate(
+      cluster,
+      traitSet,
+      values,
+      newScan,
+      cluster.createCorrel(), // a dummy CorrelationId
+      ImmutableBitSet.of(),
+      SemiJoinType.INNER)
+  }
+}
+
+object FlinkLogicalTableFunctionScan {
+  val CONVERTER = new FlinkLogicalTableFunctionScanConverter
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
new file mode 100644
index 0000000..6b6cd03
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import 
org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan
+import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
+import org.apache.flink.table.sources._
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+
+/**
+  * Sub-class of [[TableScan]] that is a relational operator
+  * which returns the contents of a [[TableSource]] in Flink.
+  */
+class FlinkLogicalTableSourceScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    relOptTable: FlinkRelOptTable)
+  extends TableScan(cluster, traitSet, relOptTable)
+  with FlinkLogicalRel {
+
+  val tableSource: TableSource[_] = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+  def copy(
+      traitSet: RelTraitSet,
+      relOptTable: FlinkRelOptTable): FlinkLogicalTableSourceScan = {
+    new FlinkLogicalTableSourceScan(cluster, traitSet, relOptTable)
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new FlinkLogicalTableSourceScan(cluster, traitSet, relOptTable)
+  }
+
+  override def deriveRowType(): RelDataType = {
+    val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    tableSource match {
+      case s: StreamTableSource[_] =>
+        TableSourceUtil.getRelDataType(s, None, streaming = true, 
flinkTypeFactory)
+      case _: BatchTableSource[_] =>
+        flinkTypeFactory.buildLogicalRowType(
+          tableSource.getTableSchema, isStreaming = Option.apply(false))
+      case _ => throw new TableException("Unknown TableSource type.")
+    }
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val rowCnt = mq.getRowCount(this)
+    val rowSize = mq.getAverageRowSize(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("fields", tableSource.getTableSchema.getColumnNames.mkString(", "))
+  }
+
+}
+
+class FlinkLogicalTableSourceScanConverter
+  extends ConverterRule(
+    classOf[LogicalTableScan],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalTableSourceScanConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: TableScan = call.rel(0)
+    isTableSourceScan(scan)
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val scan = rel.asInstanceOf[TableScan]
+    val table = scan.getTable.asInstanceOf[FlinkRelOptTable]
+    FlinkLogicalTableSourceScan.create(rel.getCluster, table)
+  }
+}
+
+object FlinkLogicalTableSourceScan {
+  val CONVERTER = new FlinkLogicalTableSourceScanConverter
+
+  def isTableSourceScan(scan: TableScan): Boolean = {
+    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+    tableSourceTable != null
+  }
+
+  def create(cluster: RelOptCluster, relOptTable: FlinkRelOptTable): 
FlinkLogicalTableSourceScan = {
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalTableSourceScan(cluster, traitSet, relOptTable)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUnion.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUnion.scala
new file mode 100644
index 0000000..a723f12
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUnion.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{SetOp, Union}
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Union]] that is a relational expression
+  * which returns the union of the rows of its inputs in Flink.
+  */
+class FlinkLogicalUnion(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputs: JList[RelNode],
+    all: Boolean)
+  extends Union(cluster, traitSet, inputs, all)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: 
Boolean): SetOp = {
+    new FlinkLogicalUnion(cluster, traitSet, inputs, all)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val rowCnt = this.getInputs.foldLeft(0D) {
+      (rows, input) =>
+        val inputRowCount = mq.getRowCount(input)
+        rows + inputRowCount
+    }
+    planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+}
+
+private class FlinkLogicalUnionConverter
+  extends ConverterRule(
+    classOf[LogicalUnion],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalUnionConverter") {
+
+  /**
+    * Only translate UNION ALL.
+    */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val union: LogicalUnion = call.rel(0)
+    union.all
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val union = rel.asInstanceOf[LogicalUnion]
+    val newInputs = union.getInputs.map {
+      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
+    }
+    FlinkLogicalUnion.create(newInputs, union.all)
+  }
+}
+
+object FlinkLogicalUnion {
+  val CONVERTER: ConverterRule = new FlinkLogicalUnionConverter()
+
+  def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalUnion = {
+    val cluster = inputs.get(0).getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalUnion(cluster, traitSet, inputs, all)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalValues.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalValues.scala
new file mode 100644
index 0000000..e27def1
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalValues.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexLiteral
+
+import java.util
+
+/**
+  * Sub-class of [[Values]] that is a relational expression
+  * whose value is a sequence of zero or more literal row values in Flink.
+  */
+class FlinkLogicalValues(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    rowRelDataType: RelDataType,
+    tuples: ImmutableList[ImmutableList[RexLiteral]])
+  extends Values(cluster, rowRelDataType, tuples, traitSet)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new FlinkLogicalValues(cluster, traitSet, rowRelDataType, tuples)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val dRows = mq.getRowCount(this)
+    // Assume CPU is negligible since values are precomputed.
+    val dCpu = 1
+    val dIo = 0
+    planner.getCostFactory.makeCost(dRows, dCpu, dIo)
+  }
+
+}
+
+private class FlinkLogicalValuesConverter
+  extends ConverterRule(
+    classOf[LogicalValues],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalValuesConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val values = rel.asInstanceOf[LogicalValues]
+    FlinkLogicalValues.create(rel.getCluster, values.getRowType, 
values.getTuples())
+  }
+}
+
+object FlinkLogicalValues {
+  val CONVERTER: ConverterRule = new FlinkLogicalValuesConverter()
+
+  def create(
+      cluster: RelOptCluster,
+      rowType: RelDataType,
+      tuples: ImmutableList[ImmutableList[RexLiteral]]): FlinkLogicalValues = {
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalValues(cluster, traitSet, rowType, tuples)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
new file mode 100644
index 0000000..5703315
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.calcite.{LogicalWatermarkAssigner, 
WatermarkAssigner}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+import java.util
+
+/**
+  * Sub-class of [[WatermarkAssigner]] that is a relational operator
+  * which generates [[org.apache.flink.streaming.api.watermark.Watermark]].
+  */
+class FlinkLogicalWatermarkAssigner(
+    cluster: RelOptCluster,
+    traits: RelTraitSet,
+    input: RelNode,
+    rowtimeField: String,
+    watermarkOffset: Long)
+  extends WatermarkAssigner(cluster, traits, input, rowtimeField, 
watermarkOffset)
+  with FlinkLogicalRel {
+
+  override def copy(
+      traitSet: RelTraitSet,
+      inputs: util.List[RelNode]): RelNode = {
+    new FlinkLogicalWatermarkAssigner(
+      cluster, traitSet, inputs.get(0), rowtimeField, watermarkOffset)
+  }
+
+}
+
+class FlinkLogicalWatermarkAssignerConverter extends ConverterRule(
+  classOf[LogicalWatermarkAssigner],
+  Convention.NONE,
+  FlinkConventions.LOGICAL,
+  "FlinkLogicalWatermarkAssignerConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val watermark = rel.asInstanceOf[LogicalWatermarkAssigner]
+    val newInput = RelOptRule.convert(watermark.getInput, 
FlinkConventions.LOGICAL)
+    FlinkLogicalWatermarkAssigner.create(
+      newInput,
+      watermark.rowtimeField,
+      watermark.watermarkOffset)
+  }
+}
+
+object FlinkLogicalWatermarkAssigner {
+  val CONVERTER = new FlinkLogicalWatermarkAssignerConverter
+
+  def create(
+      input: RelNode,
+      rowtimeField: String,
+      watermarkOffset: Long): FlinkLogicalWatermarkAssigner = {
+    val cluster = input.getCluster
+    val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeField, 
watermarkOffset)
+  }
+}
+
+
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
index 2fe6f3b..a23bbde 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -21,6 +21,9 @@ package org.apache.flink.table.plan.schema
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.plan.stats.FlinkStatistic
 
+/**
+  * The class that wraps [[DataStream]] as a Calcite Table.
+  */
 class DataStreamTable[T](
     val dataStream: DataStream[T],
     val producesUpdates: Boolean,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
new file mode 100644
index 0000000..0644e5a
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema}
+import org.apache.calcite.prepare.Prepare.AbstractPreparingTable
+import org.apache.calcite.prepare.{CalcitePrepareImpl, RelOptTableImpl}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema._
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+/**
+  * [[FlinkRelOptTable]] wraps a [[FlinkTable]]
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *                respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+class FlinkRelOptTable protected(
+    schema: RelOptSchema,
+    rowType: RelDataType,
+    names: JList[String],
+    table: FlinkTable)
+  extends AbstractPreparingTable {
+
+  // Default value of rowCount if there is no available stats.
+  // Sets a bigger default value to avoid broadcast join.
+  val DEFAULT_ROWCOUNT: Double = 1E8
+
+  def copy(newTable: FlinkTable, newRowType: RelDataType): FlinkRelOptTable =
+    new FlinkRelOptTable(schema, newRowType, names, newTable)
+
+  /**
+    * Obtains an identifier for this table.
+    *
+    * Note: the qualified names are used for computing the digest of TableScan.
+    *
+    * @return qualified name
+    */
+  override def getQualifiedName: JList[String] = names
+
+  /**
+    * Obtains the access type of the table.
+    *
+    * @return all access types including SELECT/UPDATE/INSERT/DELETE
+    */
+  override def getAllowedAccess: SqlAccessType = SqlAccessType.ALL
+
+  override def unwrap[T](clazz: Class[T]): T = {
+    if (clazz.isInstance(this)) {
+      clazz.cast(this)
+    } else if (clazz.isInstance(table)) {
+      clazz.cast(table)
+    } else {
+      null.asInstanceOf[T]
+    }
+  }
+
+  /**
+    * Returns true if the given modality is supported, else false.
+    */
+  override def supportsModality(modality: SqlModality): Boolean = modality 
match {
+    case SqlModality.STREAM =>
+      table.isInstanceOf[StreamableTable]
+    case _ =>
+      !table.isInstanceOf[StreamableTable]
+  }
+
+  /**
+    * Returns the type of rows returned by this table.
+    */
+  override def getRowType: RelDataType = rowType
+
+  /**
+    * Obtains whether a given column is monotonic.
+    *
+    * @param columnName column name
+    * @return true if the given column is monotonic
+    */
+  override def getMonotonicity(columnName: String): SqlMonotonicity = {
+    val columnIdx = rowType.getFieldNames.indexOf(columnName)
+    if (columnIdx >= 0) {
+      for (collation: RelCollation <- table.getStatistic.getCollations) {
+        val fieldCollation: RelFieldCollation = 
collation.getFieldCollations.get(0)
+        if (fieldCollation.getFieldIndex == columnIdx) {
+          return fieldCollation.direction.monotonicity
+        }
+      }
+    }
+    SqlMonotonicity.NOT_MONOTONIC
+  }
+
+  /**
+    * Returns flink table statistics.
+    */
+  def getFlinkStatistic: FlinkStatistic = {
+    if (table.getStatistic != null) {
+      table.getStatistic
+    } else {
+      FlinkStatistic.UNKNOWN
+    }
+  }
+
+  /**
+    * Returns an estimate of the number of rows in the table.
+    */
+  override def getRowCount: Double =
+    if (table.getStatistic != null) {
+      table.getStatistic match {
+        case stats: FlinkStatistic =>
+          if (stats.getRowCount != null) {
+            stats.getRowCount
+          } else {
+            DEFAULT_ROWCOUNT
+          }
+        case _ => throw new AssertionError
+      }
+    } else {
+      DEFAULT_ROWCOUNT
+    }
+
+  /**
+    * Converts this table into a [[RelNode]] relational expression.
+    *
+    * @return the RelNode converted from this table
+    */
+  override def toRel(context: ToRelContext): RelNode = {
+    val cluster: RelOptCluster = context.getCluster
+    if (table.isInstanceOf[TranslatableTable]) {
+      table.asInstanceOf[TranslatableTable].toRel(context, this)
+    } else if (Hook.ENABLE_BINDABLE.get(false)) {
+      LogicalTableScan.create(cluster, this)
+    } else if (CalcitePrepareImpl.ENABLE_ENUMERABLE) {
+      EnumerableTableScan.create(cluster, this)
+    } else {
+      throw new AssertionError
+    }
+  }
+
+  /**
+    * Returns the [[RelOptSchema]] this table belongs to.
+    */
+  override def getRelOptSchema: RelOptSchema = schema
+
+  /**
+    * Returns whether the given columns are a key or a superset of a unique key
+    * of this table.
+    *
+    * Note: Return true means TRUE. However return false means FALSE or NOT 
KNOWN.
+    * It's better to use 
[[org.apache.calcite.rel.metadata.RelMetadataQuery]].areRowsUnique to
+    * distinguish FALSE with NOT KNOWN.
+    *
+    * @param columns Ordinals of key columns
+    * @return if the input columns bits represents a unique column set; false 
if not (or
+    *         if no metadata is available)
+    */
+  @Deprecated
+  override def isKey(columns: ImmutableBitSet): Boolean = false
+
+  /**
+    * Returns the referential constraints existing for this table. These 
constraints
+    * are represented over other tables using [[RelReferentialConstraint]] 
nodes.
+    */
+  override def getReferentialConstraints: JList[RelReferentialConstraint] = {
+    if (table.getStatistic != null) {
+      table.getStatistic.getReferentialConstraints
+    } else {
+      ImmutableList.of()
+    }
+  }
+
+
+  /**
+    * Returns a description of the physical ordering (or orderings) of the rows
+    * returned from this table.
+    *
+    * @see 
[[org.apache.calcite.rel.metadata.RelMetadataQuery#collations(RelNode)]]
+    */
+  override def getCollationList: JList[RelCollation] = {
+    if (table.getStatistic != null) {
+      table.getStatistic.getCollations
+    } else {
+      ImmutableList.of()
+    }
+  }
+
+  /**
+    * Returns a description of the physical distribution of the rows
+    * in this table.
+    *
+    * @see 
[[org.apache.calcite.rel.metadata.RelMetadataQuery#distribution(RelNode)]]
+    */
+  override def getDistribution: RelDistribution = {
+    if (table.getStatistic != null) {
+      table.getStatistic.getDistribution
+    } else {
+      null
+    }
+  }
+
+  /**
+    * Generates code for this table, which is not supported now.
+    *
+    * @param clazz The desired collection class; for example 
[[org.apache.calcite.linq4j.Queryable]]
+    */
+  override def getExpression(clazz: java.lang.Class[_]): Expression =
+    throw new UnsupportedOperationException
+
+  /**
+    * Obtains whether the ordinal column has a default value, which is not 
supported now.
+    *
+    * @param rowType            rowType of field
+    * @param ordinal            index of the given column
+    * @param initializerContext the context for
+    *                           
[[org.apache.calcite.sql2rel.InitializerExpressionFactory]] methods
+    * @return true if the column has a default value
+    */
+  override def columnHasDefaultValue(
+      rowType: RelDataType,
+      ordinal: Int,
+      initializerContext: InitializerContext): Boolean = false
+
+  override def getColumnStrategies: JList[ColumnStrategy] = 
RelOptTableImpl.columnStrategies(this)
+
+  override def extend(extendedTable: Table) =
+    throw new RuntimeException("Extending column not supported")
+
+}
+
+object FlinkRelOptTable {
+
+  def create(schema: RelOptSchema,
+      rowType: RelDataType,
+      names: JList[String],
+      table: FlinkTable): FlinkRelOptTable =
+    new FlinkRelOptTable(schema, rowType, names, table)
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
similarity index 50%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index 48b0bc6..0fc8078 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -16,13 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.nodes
+package org.apache.flink.table.plan.schema
 
-import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
 
 /**
-  * Base class for flink relational expression.
+  * Abstract class which define the interfaces required to
+  * convert a [[TableSource]] to a Calcite Table.
   */
-trait FlinkRelNode extends RelNode {
+abstract class TableSourceTable[T](
+    val tableSource: TableSource[T],
+    val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends FlinkTable {
 
+  /**
+    * Returns statistics of current table.
+    */
+  override def getStatistic: FlinkStatistic = statistic
+
+  /**
+    * Replaces table source with the given one, and create a new table source 
table.
+    *
+    * @param tableSource tableSource to replace.
+    * @return new TableSourceTable
+    */
+  def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T]
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
new file mode 100644
index 0000000..c6b4bcc
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
+import org.apache.calcite.sql.`type`.BasicSqlType
+
+import java.lang
+
+/**
+  * Creates a time indicator type for event-time or processing-time, but with 
similar properties
+  * as a basic SQL type.
+  */
+class TimeIndicatorRelDataType(
+    typeSystem: RelDataTypeSystem,
+    originalType: BasicSqlType,
+    val isEventTime: Boolean)
+  extends BasicSqlType(
+    typeSystem,
+    originalType.getSqlTypeName,
+    originalType.getPrecision) {
+
+  override def equals(other: Any): Boolean = other match {
+    case that: TimeIndicatorRelDataType =>
+      super.equals(that) &&
+        isEventTime == that.isEventTime
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    super.hashCode() + 42 // we change the hash code to differentiate from 
regular timestamps
+  }
+
+  override def toString: String = {
+    s"TIME ATTRIBUTE(${if (isEventTime) "ROWTIME" else "PROCTIME"})"
+  }
+
+  override def generateTypeString(sb: lang.StringBuilder, withDetail: 
Boolean): Unit = {
+    sb.append(toString)
+  }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
similarity index 51%
copy from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 12aa689..3f09a7d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -15,30 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.table.plan.util
 
-package org.apache.flink.table.type;
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.sql.SqlKind
 
-/**
- * Sql timestamp type.
- */
-public class TimestampType implements AtomicType {
-
-       public static final TimestampType INSTANCE = new TimestampType();
-
-       private TimestampType() {}
-
-       @Override
-       public boolean equals(Object o) {
-               return this == o || o != null && getClass() == o.getClass();
-       }
+object AggregateUtil {
 
-       @Override
-       public int hashCode() {
-               return getClass().hashCode();
-       }
+  def getGroupIdExprIndexes(aggCalls: Seq[AggregateCall]): Seq[Int] = {
+    aggCalls.zipWithIndex.filter { case (call, _) =>
+      call.getAggregation.getKind match {
+        case SqlKind.GROUP_ID | SqlKind.GROUPING | SqlKind.GROUPING_ID => true
+        case _ => false
+      }
+    }.map { case (_, idx) => idx }
+  }
 
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
-       }
+  /**
+    * Returns whether any of the aggregates are accurate DISTINCT.
+    *
+    * @return Whether any of the aggregates are accurate DISTINCT
+    */
+  def containsAccurateDistinctCall(aggCalls: Seq[AggregateCall]): Boolean = {
+    aggCalls.exists(call => call.isDistinct && !call.isApproximate)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CalcUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CalcUtil.scala
new file mode 100644
index 0000000..f8dfbeb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CalcUtil.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.table.plan.nodes.ExpressionFormat
+import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat
+
+import org.apache.calcite.rex.{RexNode, RexProgram}
+
+import scala.collection.JavaConversions._
+
+object CalcUtil {
+
+  private[flink] def conditionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
String = {
+    val cond = calcProgram.getCondition
+    val inFields = calcProgram.getInputRowType.getFieldNames.toList
+    val localExprs = calcProgram.getExprList.toList
+
+    if (cond != null) {
+      expression(cond, inFields, Some(localExprs))
+    } else {
+      ""
+    }
+  }
+
+  private[flink] def selectionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]], 
ExpressionFormat) => String,
+      expressionFormat: ExpressionFormat = ExpressionFormat.Prefix): String = {
+
+    val proj = calcProgram.getProjectList.toList
+    val inFields = calcProgram.getInputRowType.getFieldNames.toList
+    val localExprs = calcProgram.getExprList.toList
+    val outFields = calcProgram.getOutputRowType.getFieldNames.toList
+
+    proj
+      .map(expression(_, inFields, Some(localExprs), expressionFormat))
+      .zip(outFields).map { case (e, o) =>
+      if (e != o) {
+        e + " AS " + o
+      } else {
+        e
+      }
+    }.mkString(", ")
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
similarity index 65%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
index 48b0bc6..b75283d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
@@ -16,13 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.nodes
+package org.apache.flink.table.plan.util
 
-import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.plan.nodes.calcite.{ConstantRankRange, RankRange}
+
+import java.lang.Double
 
 /**
-  * Base class for flink relational expression.
+  * FlinkRelMdUtil provides utility methods used by the metadata provider 
methods.
   */
-trait FlinkRelNode extends RelNode {
+object FlinkRelMdUtil {
+
+  def getRankRangeNdv(rankRange: RankRange): Double = rankRange match {
+    case r: ConstantRankRange => (r.rankEnd - r.rankStart + 1).toDouble
+    case _ => 100D // default value now
+  }
 
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
similarity index 58%
copy from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
index 12aa689..f483f08 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
@@ -15,30 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.table.plan.util
 
-package org.apache.flink.table.type;
+import java.util
 
-/**
- * Sql timestamp type.
- */
-public class TimestampType implements AtomicType {
-
-       public static final TimestampType INSTANCE = new TimestampType();
-
-       private TimestampType() {}
-
-       @Override
-       public boolean equals(Object o) {
-               return this == o || o != null && getClass() == o.getClass();
-       }
+object FlinkRelOptUtil {
 
-       @Override
-       public int hashCode() {
-               return getClass().hashCode();
-       }
+  /**
+    * Get unique field name based on existed `allFieldNames` collection.
+    * NOTES: the new unique field name will be added to existed 
`allFieldNames` collection.
+    */
+  def buildUniqueFieldName(
+      allFieldNames: util.Set[String],
+      toAddFieldName: String): String = {
+    var name: String = toAddFieldName
+    var i: Int = 0
+    while (allFieldNames.contains(name)) {
+      name = toAddFieldName + "_" + i
+      i += 1
+    }
+    allFieldNames.add(name)
+    name
+  }
 
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
-       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SortUtil.scala
similarity index 60%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SortUtil.scala
index 48b0bc6..d78a711 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SortUtil.scala
@@ -16,13 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.nodes
+package org.apache.flink.table.plan.util
 
-import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.operators.Order
+
+import org.apache.calcite.rel.RelFieldCollation.Direction
 
 /**
-  * Base class for flink relational expression.
+  * Common methods for Flink sort operators.
   */
-trait FlinkRelNode extends RelNode {
+object SortUtil {
 
+  def directionToOrder(direction: Direction): Order = {
+    direction match {
+      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => 
Order.ASCENDING
+      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => 
Order.DESCENDING
+      case _ => throw new IllegalArgumentException("Unsupported direction.")
+    }
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
similarity index 55%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
index 48b0bc6..bdd5641 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
@@ -16,13 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.nodes
+package org.apache.flink.table.sources
 
-import org.apache.calcite.rel.RelNode
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 
 /**
-  * Base class for flink relational expression.
+  * Defines an external batch exec table and provides access to its data.
+  *
+  * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
   */
-trait FlinkRelNode extends RelNode {
+trait BatchTableSource[T] extends TableSource[T] {
 
+  /**
+    * Returns the data of the table as a [[DataStream]].
+    *
+    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
+    * Do not use it in Table API programs.
+    */
+  def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStream[T]
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
similarity index 55%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
index 48b0bc6..07d7fa0 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
@@ -16,13 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.nodes
+package org.apache.flink.table.sources
 
-import org.apache.calcite.rel.RelNode
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 
 /**
-  * Base class for flink relational expression.
+  * Defines an external stream table and provides access to its data.
+  *
+  * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
   */
-trait FlinkRelNode extends RelNode {
+trait StreamTableSource[T] extends TableSource[T] {
 
+  /**
+    * Returns the data of the table as a [[DataStream]].
+    *
+    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
+    * Do not use it in Table API programs.
+    */
+  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
new file mode 100644
index 0000000..985cc37
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.`type`.TypeConverters
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+import org.apache.calcite.rel.`type`.RelDataType
+
+/** Util class for [[TableSource]]. */
+object TableSourceUtil {
+
+  /**
+    * Returns the Calcite schema of a [[TableSource]].
+    *
+    * @param tableSource The [[TableSource]] for which the Calcite schema is 
generated.
+    * @param selectedFields The indices of all selected fields. None, if all 
fields are selected.
+    * @param streaming Flag to determine whether the schema of a stream or 
batch table is created.
+    * @param typeFactory The type factory to create the schema.
+    * @return The Calcite schema for the selected fields of the given 
[[TableSource]].
+    */
+  def getRelDataType(
+      tableSource: TableSource[_],
+      selectedFields: Option[Array[Int]],
+      streaming: Boolean,
+      typeFactory: FlinkTypeFactory): RelDataType = {
+
+    val fieldNames = tableSource.getTableSchema.getFieldNames
+    val fieldTypes = tableSource.getTableSchema.getFieldTypes
+      .map(TypeConverters.createInternalTypeFromTypeInfo)
+    // TODO get fieldNullables from TableSchema
+    val fieldNullables = fieldTypes.map(_ => true)
+
+    // TODO supports time attributes after Expression is ready
+
+    val (selectedFieldNames, selectedFieldTypes, selectedFieldNullables) =
+      if (selectedFields.isDefined) {
+        // filter field names and types by selected fields
+        (
+          selectedFields.get.map(fieldNames(_)),
+          selectedFields.get.map(fieldTypes(_)),
+          selectedFields.get.map(fieldNullables(_)))
+      } else {
+        (fieldNames, fieldTypes, fieldNullables)
+      }
+    typeFactory.buildRelDataType(selectedFieldNames, selectedFieldTypes, 
selectedFieldNullables)
+  }
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
index f3429b0..2b94cd7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
@@ -53,6 +53,10 @@ public class InternalTypes {
 
        public static final DecimalType SYSTEM_DEFAULT_DECIMAL = 
DecimalType.SYSTEM_DEFAULT;
 
+       public static final TimestampType ROWTIME_INDICATOR = 
TimestampType.ROWTIME_INDICATOR;
+
+       public static final TimestampType PROCTIME_INDICATOR = 
TimestampType.PROCTIME_INDICATOR;
+
        public static ArrayType createArrayType(InternalType elementType) {
                return new ArrayType(elementType);
        }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
index 12aa689..26ab8cd 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java
@@ -23,22 +23,42 @@ package org.apache.flink.table.type;
  */
 public class TimestampType implements AtomicType {
 
-       public static final TimestampType INSTANCE = new TimestampType();
+       public static final TimestampType INSTANCE = new TimestampType(0, 
"TimestampType");
+       public static final TimestampType INTERVAL_MILLIS =
+                       new TimestampType(1, "IntervalMillis");
+       public static final TimestampType ROWTIME_INDICATOR =
+                       new TimestampType(2, "RowTimeIndicator");
+       public static final TimestampType PROCTIME_INDICATOR =
+                       new TimestampType(3, "ProctimeTimeIndicator");
 
-       private TimestampType() {}
+       private int id;
+       private String name;
+
+       private TimestampType(int id, String name) {
+               this.id = id;
+               this.name = name;
+       }
 
        @Override
        public boolean equals(Object o) {
-               return this == o || o != null && getClass() == o.getClass();
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               return id == ((TimestampType) o).id;
        }
 
        @Override
        public int hashCode() {
-               return getClass().hashCode();
+               int result = getClass().hashCode();
+               result = 31 * result + name.hashCode();
+               return result;
        }
 
        @Override
        public String toString() {
-               return getClass().getSimpleName();
+               return name;
        }
 }

Reply via email to