http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index f946ed9..095cf04 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -18,57 +18,49 @@
 
 package org.apache.flink.api.table.plan
 
-import org.apache.calcite.tools.RelBuilder.AggCall
 import org.apache.flink.api.table.TableEnvironment
-
 import org.apache.flink.api.table.expressions._
 
 object RexNodeTranslator {
 
   /**
-    * Extracts all aggregation expressions (zero, one, or more) from an 
expression, translates
-    * these aggregation expressions into Calcite AggCalls, and replaces the 
original aggregation
-    * expressions by field accesses expressions.
+    * Extracts all aggregation expressions (zero, one, or more) from an 
expression,
+    * and replaces the original aggregation expressions by field accesses 
expressions.
     */
-  def extractAggCalls(
+  def extractAggregations(
     exp: Expression,
-    tableEnv: TableEnvironment): Pair[Expression, List[AggCall]] = {
-
-    val relBuilder = tableEnv.getRelBuilder
+    tableEnv: TableEnvironment): Pair[Expression, List[NamedExpression]] = {
 
     exp match {
       case agg: Aggregation =>
         val name = tableEnv.createUniqueAttributeName()
-        val aggCall = agg.toAggCall(name)(relBuilder)
+        val aggCall = Alias(agg, name)
         val fieldExp = new UnresolvedFieldReference(name)
         (fieldExp, List(aggCall))
-      case n@Naming(agg: Aggregation, name) =>
-        val aggCall = agg.toAggCall(name)(relBuilder)
+      case n @ Alias(agg: Aggregation, name) =>
         val fieldExp = new UnresolvedFieldReference(name)
-        (fieldExp, List(aggCall))
+        (fieldExp, List(n))
       case l: LeafExpression =>
         (l, Nil)
       case u: UnaryExpression =>
-        val c = extractAggCalls(u.child, tableEnv)
-        (u.makeCopy(List(c._1)), c._2)
+        val c = extractAggregations(u.child, tableEnv)
+        (u.makeCopy(Array(c._1)), c._2)
       case b: BinaryExpression =>
-        val l = extractAggCalls(b.left, tableEnv)
-        val r = extractAggCalls(b.right, tableEnv)
-        (b.makeCopy(List(l._1, r._1)), l._2 ::: r._2)
-      case e: Eval =>
-        val c = extractAggCalls(e.condition, tableEnv)
-        val t = extractAggCalls(e.ifTrue, tableEnv)
-        val f = extractAggCalls(e.ifFalse, tableEnv)
-        (e.makeCopy(List(c._1, t._1, f._1)), c._2 ::: t._2 ::: f._2)
+        val l = extractAggregations(b.left, tableEnv)
+        val r = extractAggregations(b.right, tableEnv)
+        (b.makeCopy(Array(l._1, r._1)), l._2 ::: r._2)
 
       // Scalar functions
-      case c@Call(name, args@_*) =>
-        val newArgs = args.map(extractAggCalls(_, tableEnv)).toList
-        (c.makeCopy(name :: newArgs.map(_._1)), newArgs.flatMap(_._2))
+      case c @ Call(name, args) =>
+        val newArgs = args.map(extractAggregations(_, tableEnv))
+        (c.makeCopy((name :: newArgs.map(_._1) :: Nil).toArray), 
newArgs.flatMap(_._2).toList)
 
-      case e@AnyRef =>
-        throw new IllegalArgumentException(
-          s"Expression $e of type ${e.getClass} not supported yet")
+      case e: Expression =>
+        val newArgs = e.productIterator.map {
+          case arg: Expression =>
+            extractAggregations(arg, tableEnv)
+        }
+        (e.makeCopy(newArgs.map(_._1).toArray), newArgs.flatMap(_._2).toList)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
new file mode 100644
index 0000000..dae02bd
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.typeutils.TypeCoercion
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  *
+  * Expressions' resolution and transformation ([[resolveExpressions]]):
+  *
+  * - translate [[UnresolvedFieldReference]] into [[ResolvedFieldReference]]
+  *     using child operator's output
+  * - translate [[Call]](UnresolvedFunction) into solid Expression
+  * - generate alias names for query output
+  * - ....
+  *
+  * LogicalNode validation ([[validate]]):
+  *
+  * - check no [[UnresolvedFieldReference]] exists any more
+  * - check if all expressions have children of needed type
+  * - check each logical operator have desired input
+  *
+  * Once we pass the validation phase, we can safely convert LogicalNode into 
Calcite's RelNode.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+    // resolve references and function calls
+    val exprResolved = expressionPostOrderTransform {
+      case u @ UnresolvedFieldReference(name) =>
+        resolveReference(name).getOrElse(u)
+      case c @ Call(name, children) if c.childrenValid =>
+        tableEnv.getFunctionCatalog.lookupFunction(name, children)
+    }
+
+    exprResolved.expressionPostOrderTransform {
+      case ips: InputTypeSpec if ips.childrenValid =>
+        var changed: Boolean = false
+        val newChildren = ips.expectedTypes.zip(ips.children).map { case (tpe, 
child) =>
+          val childType = child.resultType
+          if (childType != tpe && TypeCoercion.canSafelyCast(childType, tpe)) {
+            changed = true
+            Cast(child, tpe)
+          } else {
+            child
+          }
+        }.toArray[AnyRef]
+        if (changed) ips.makeCopy(newChildren) else ips
+    }
+  }
+
+  final def toRelNode(relBuilder: RelBuilder): RelNode = 
construct(relBuilder).build()
+
+  protected[logical] def construct(relBuilder: RelBuilder): RelBuilder
+
+  def validate(tableEnv: TableEnvironment): LogicalNode = {
+    val resolvedNode = resolveExpressions(tableEnv)
+    resolvedNode.expressionPostOrderTransform {
+      case a: Attribute if !a.valid =>
+        val from = children.flatMap(_.output).map(_.name).mkString(", ")
+        failValidation(s"cannot resolve [${a.name}] given input [$from]")
+
+      case e: Expression if e.validateInput().isFailure =>
+        failValidation(s"Expression $e failed on input check: " +
+          s"${e.validateInput().asInstanceOf[ValidationFailure].message}")
+    }
+  }
+
+  /**
+    * Resolves the given strings to a [[NamedExpression]] using the input from 
all child
+    * nodes of this LogicalPlan.
+    */
+  def resolveReference(name: String): Option[NamedExpression] = {
+    val childrenOutput = children.flatMap(_.output)
+    val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(name))
+    if (candidates.length > 1) {
+      failValidation(s"Reference $name is ambiguous")
+    } else if (candidates.length == 0) {
+      None
+    } else {
+      Some(candidates.head.withName(name))
+    }
+  }
+
+  /**
+    * Runs [[postOrderTransform]] with `rule` on all expressions present in 
this logical node.
+    *
+    * @param rule the rule to be applied to every expression in this logical 
node.
+    */
+  def expressionPostOrderTransform(rule: PartialFunction[Expression, 
Expression]): LogicalNode = {
+    var changed = false
+
+    def expressionPostOrderTransform(e: Expression): Expression = {
+      val newExpr = e.postOrderTransform(rule)
+      if (newExpr.fastEquals(e)) {
+        e
+      } else {
+        changed = true
+        newExpr
+      }
+    }
+
+    val newArgs = productIterator.map {
+      case e: Expression => expressionPostOrderTransform(e)
+      case Some(e: Expression) => Some(expressionPostOrderTransform(e))
+      case seq: Traversable[_] => seq.map {
+        case e: Expression => expressionPostOrderTransform(e)
+        case other => other
+      }
+      case other: AnyRef => other
+    }.toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  protected def failValidation(msg: String): Nothing = {
+    throw new ValidationException(msg)
+  }
+}
+
+abstract class LeafNode extends LogicalNode {
+  override def children: Seq[LogicalNode] = Nil
+}
+
+abstract class UnaryNode extends LogicalNode {
+  def child: LogicalNode
+
+  override def children: Seq[LogicalNode] = child :: Nil
+}
+
+abstract class BinaryNode extends LogicalNode {
+  def left: LogicalNode
+  def right: LogicalNode
+
+  override def children: Seq[LogicalNode] = left :: right :: Nil
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
new file mode 100644
index 0000000..d347651
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+    val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project]
+    val newProjectList =
+      afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+        e match {
+          case u @ UnresolvedAlias(child) => child match {
+            case ne: NamedExpression => ne
+            case e if !e.valid => u
+            case c @ Cast(ne: NamedExpression, tp) => Alias(c, 
s"${ne.name}-$tp")
+            case other => Alias(other, s"_c$i")
+          }
+          case _ => throw new IllegalArgumentException
+        }
+    }
+    Project(newProjectList, child)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    val resolvedProject = super.validate(tableEnv).asInstanceOf[Project]
+
+    def checkUniqueNames(exprs: Seq[Expression]): Unit = {
+      val names: mutable.Set[String] = mutable.Set()
+      exprs.foreach {
+        case n: Alias =>
+          // explicit name
+          if (names.contains(n.name)) {
+            throw new ValidationException(s"Duplicate field name $n.name.")
+          } else {
+            names.add(n.name)
+          }
+        case r: ResolvedFieldReference =>
+          // simple field forwarding
+          if (names.contains(r.name)) {
+            throw new ValidationException(s"Duplicate field name $r.name.")
+          } else {
+            names.add(r.name)
+          }
+        case _ => // Do nothing
+      }
+    }
+    checkUniqueNames(resolvedProject.projectList)
+    resolvedProject
+  }
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    val allAlias = projectList.forall(_.isInstanceOf[Alias])
+    child.construct(relBuilder)
+    if (allAlias) {
+      // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+      //   Add a projection ourselves (will be automatically removed by 
translation rules).
+      relBuilder.push(
+        LogicalProject.create(relBuilder.peek(),
+          projectList.map(_.toRexNode(relBuilder)).asJava,
+          projectList.map(_.name).asJava))
+    } else {
+      relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+    }
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends 
UnaryNode {
+  override def output: Seq[Attribute] =
+    throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder =
+    throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+    if (aliasList.length > child.output.length) {
+      failValidation("Aliasing more fields than we actually have")
+    } else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+      failValidation("Alias only accept name expressions as arguments")
+    } else {
+      val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+      val input = child.output
+      Project(
+        names.zip(input).map { case (name, attr) =>
+          Alias(attr, name)} ++ input.drop(names.length), child)
+    }
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    child.construct(relBuilder)
+    relBuilder.distinct()
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Distinct on stream tables is currently not 
supported.")
+    }
+    this
+  }
+}
+
+case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    child.construct(relBuilder)
+    relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Distinct on stream tables is currently not 
supported.")
+    }
+    super.validate(tableEnv)
+  }
+}
+
+case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode 
{
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    child.construct(relBuilder)
+    relBuilder.filter(condition.toRexNode(relBuilder))
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    val resolvedFilter = super.validate(tableEnv).asInstanceOf[Filter]
+    if (resolvedFilter.condition.resultType != BOOLEAN_TYPE_INFO) {
+      failValidation(s"filter expression ${resolvedFilter.condition} of" +
+        s" ${resolvedFilter.condition.resultType} is not a boolean")
+    }
+    resolvedFilter
+  }
+}
+
+case class Aggregate(
+    groupingExpressions: Seq[Expression],
+    aggregateExpressions: Seq[NamedExpression],
+    child: LogicalNode) extends UnaryNode {
+
+  override def output: Seq[Attribute] = {
+    (groupingExpressions ++ aggregateExpressions) map { agg =>
+      agg match {
+        case ne: NamedExpression => ne.toAttribute
+        case e => Alias(e, e.toString).toAttribute
+      }
+    }
+  }
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    child.construct(relBuilder)
+    relBuilder.aggregate(
+      
relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
+      aggregateExpressions.map { e =>
+        e match {
+          case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
+          case _ => throw new RuntimeException("This should never happen.")
+        }
+      }.asJava)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Aggregate on stream tables is currently not 
supported.")
+    }
+
+    val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
+    val groupingExprs = resolvedAggregate.groupingExpressions
+    val aggregateExprs = resolvedAggregate.aggregateExpressions
+    aggregateExprs.foreach(validateAggregateExpression)
+    groupingExprs.foreach(validateGroupingExpression)
+
+    def validateAggregateExpression(expr: Expression): Unit = expr match {
+      // check no nested aggregation exists.
+      case aggExpr: Aggregation =>
+        aggExpr.children.foreach { child =>
+          child.preOrderVisit {
+            case agg: Aggregation =>
+              failValidation(
+                "It's not allowed to use an aggregate function as " +
+                  "input of another aggregate function")
+            case _ => // OK
+          }
+        }
+      case a: Attribute if !groupingExprs.exists(_.checkEquals(a)) =>
+        failValidation(
+          s"expression '$a' is invalid because it is neither" +
+            " present in group by nor an aggregate function")
+      case e if groupingExprs.exists(_.checkEquals(e)) => // OK
+      case e => e.children.foreach(validateAggregateExpression)
+    }
+
+    def validateGroupingExpression(expr: Expression): Unit = {
+      if (!expr.resultType.isKeyType) {
+        failValidation(
+          s"expression $expr cannot be used as a grouping expression " +
+            "because it's not a valid key type")
+      }
+    }
+    resolvedAggregate
+  }
+}
+
+case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode {
+  override def output: Seq[Attribute] = left.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    left.construct(relBuilder)
+    right.construct(relBuilder)
+    relBuilder.union(true)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
+    if (left.output.length != right.output.length) {
+      failValidation(s"Union two table of different column sizes:" +
+        s" ${left.output.size} and ${right.output.size}")
+    }
+    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+      l.resultType == r.resultType && l.name == r.name }
+    if (!sameSchema) {
+      failValidation(s"Union two table of different schema:" +
+        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] 
and" +
+        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
+    }
+    resolvedUnion
+  }
+}
+
+case class Join(
+    left: LogicalNode,
+    right: LogicalNode,
+    joinType: JoinType,
+    condition: Option[Expression]) extends BinaryNode {
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case JoinType.INNER => left.output ++ right.output
+      case j => throw new ValidationException(s"Unsupported JoinType: $j")
+    }
+  }
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    joinType match {
+      case JoinType.INNER =>
+        left.construct(relBuilder)
+        right.construct(relBuilder)
+        relBuilder.join(JoinRelType.INNER,
+          
condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)))
+      case _ =>
+        throw new ValidationException(s"Unsupported JoinType: $joinType")
+    }
+  }
+
+  private def ambiguousName: Set[String] =
+    left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Join on stream tables is currently not 
supported.")
+    }
+
+    val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
+    if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
+      failValidation(s"filter expression ${resolvedJoin.condition} is not a 
boolean")
+    } else if (!ambiguousName.isEmpty) {
+      failValidation(s"join relations with ambiguous names: 
${ambiguousName.mkString(", ")}")
+    }
+    resolvedJoin
+  }
+}
+
+case class CatalogNode(
+    tableName: String,
+    rowType: RelDataType) extends LeafNode {
+
+  val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
+    ResolvedFieldReference(
+      field.getName, 
TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName))
+  }
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    relBuilder.scan(tableName)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = this
+}
+
+/**
+  * Wrapper for valid logical plans generated from SQL String.
+  */
+case class LogicalRelNode(
+    relNode: RelNode) extends LeafNode {
+
+  val output: Seq[Attribute] = relNode.getRowType.getFieldList.asScala.map { 
field =>
+    ResolvedFieldReference(
+      field.getName, 
TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName))
+  }
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    relBuilder.push(relNode)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = this
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 5356a9d..4f111c9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -17,28 +17,18 @@
  */
 package org.apache.flink.api.table
 
+import scala.collection.JavaConverters._
+
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataTypeField
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
-import org.apache.calcite.util.NlsString
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations
 import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.sinks.TableSink
 import org.apache.flink.api.table.typeutils.TypeConverter
 
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-
-case class BaseTable(
-  private[flink] val relNode: RelNode,
-  private[flink] val tableEnv: TableEnvironment)
-
 /**
   * A Table is the core component of the Table API.
   * Similar to how the batch and streaming APIs have DataSet and DataStream,
@@ -66,18 +56,16 @@ case class BaseTable(
   * in a Scala DSL or as an expression String. Please refer to the 
documentation for the expression
   * syntax.
   *
-  * @param relNode The root node of the relational Calcite [[RelNode]] tree.
   * @param tableEnv The [[TableEnvironment]] to which the table is bound.
+  * @param logicalPlan
   */
 class Table(
-  private[flink] override val relNode: RelNode,
-  private[flink] override val tableEnv: TableEnvironment)
-  extends BaseTable(relNode, tableEnv)
-{
+    private[flink] val tableEnv: TableEnvironment,
+    private[flink] val logicalPlan: LogicalNode) {
 
   def relBuilder = tableEnv.getRelBuilder
 
-  def getRelNode: RelNode = relNode
+  def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
 
   /**
     * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
@@ -90,44 +78,17 @@ class Table(
     * }}}
     */
   def select(fields: Expression*): Table = {
-
-    checkUniqueNames(fields)
-
-    relBuilder.push(relNode)
-
-    // separate aggregations and selection expressions
-    val extractedAggCalls: List[(Expression, List[AggCall])] = fields
-      .map(extractAggCalls(_, tableEnv)).toList
-
-    // get aggregation calls
-    val aggCalls: List[AggCall] = extractedAggCalls.flatMap(_._2)
-
-    // apply aggregations
-    if (aggCalls.nonEmpty) {
-      // aggregation on stream table is not currently supported
-      tableEnv match {
-        case _: StreamTableEnvironment =>
-          throw new TableException("Aggregation on stream tables is currently 
not supported.")
-        case _ =>
-          val emptyKey: GroupKey = relBuilder.groupKey()
-          relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
-      }
-    }
-
-    // get selection expressions
-    val exprs: List[RexNode] = 
extractedAggCalls.map(_._1.toRexNode(relBuilder))
-
-    relBuilder.project(exprs.toIterable.asJava)
-    val projected = relBuilder.build()
-
-    if(relNode == projected) {
-      // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
-      //   Add a projection ourselves (will be automatically removed by 
translation rules).
-      new Table(createRenamingProject(exprs), tableEnv)
+    val projectionOnAggregates = fields.map(extractAggregations(_, tableEnv))
+    val aggregations = projectionOnAggregates.flatMap(_._2)
+    if (aggregations.nonEmpty) {
+      new Table(tableEnv,
+        Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+          Aggregate(Nil, aggregations, 
logicalPlan).validate(tableEnv)).validate(tableEnv))
     } else {
-      new Table(projected, tableEnv)
+      new Table(tableEnv,
+        Project(
+          projectionOnAggregates.map(e => UnresolvedAlias(e._1)), 
logicalPlan).validate(tableEnv))
     }
-
   }
 
   /**
@@ -156,30 +117,7 @@ class Table(
     * }}}
     */
   def as(fields: Expression*): Table = {
-
-    val curNames = relNode.getRowType.getFieldNames.asScala
-
-    // validate that AS has only field references
-    if (! fields.forall( _.isInstanceOf[UnresolvedFieldReference] )) {
-      throw new IllegalArgumentException("All expressions must be field 
references.")
-    }
-    // validate that we have not more field references than fields
-    if ( fields.length > curNames.size) {
-      throw new IllegalArgumentException("More field references than fields.")
-    }
-
-    val curFields = curNames.map(new UnresolvedFieldReference(_))
-
-    val renamings = fields.zip(curFields).map {
-      case (newName, oldName) => new Naming(oldName, newName.name)
-    }
-    val remaining = curFields.drop(fields.size)
-
-    relBuilder.push(relNode)
-
-    val exprs = (renamings ++ remaining).map(_.toRexNode(relBuilder))
-
-    new Table(createRenamingProject(exprs), tableEnv)
+    new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
   }
 
   /**
@@ -208,11 +146,7 @@ class Table(
     * }}}
     */
   def filter(predicate: Expression): Table = {
-
-    relBuilder.push(relNode)
-    relBuilder.filter(predicate.toRexNode(relBuilder))
-
-    new Table(relBuilder.build(), tableEnv)
+    new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
   }
 
   /**
@@ -269,19 +203,10 @@ class Table(
     * }}}
     */
   def groupBy(fields: Expression*): GroupedTable = {
-
-    // group by on stream tables is currently not supported
-    tableEnv match {
-      case _: StreamTableEnvironment =>
-        throw new TableException("Group by on stream tables is currently not 
supported.")
-      case _ => {
-        relBuilder.push(relNode)
-        val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
-        val groupKey = relBuilder.groupKey(groupExpr)
-
-        new GroupedTable(relBuilder.build(), tableEnv, groupKey)
-      }
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Group by on stream tables is currently not 
supported.")
     }
+    new GroupedTable(this, fields)
   }
 
   /**
@@ -309,15 +234,7 @@ class Table(
     * }}}
     */
   def distinct(): Table = {
-    // distinct on stream table is not currently supported
-    tableEnv match {
-      case _: StreamTableEnvironment =>
-        throw new TableException("Distinct on stream tables is currently not 
supported.")
-      case _ =>
-        relBuilder.push(relNode)
-        relBuilder.distinct()
-        new Table(relBuilder.build(), tableEnv)
-    }
+    new Table(tableEnv, Distinct(logicalPlan).validate(tableEnv))
   }
 
   /**
@@ -334,32 +251,12 @@ class Table(
     * }}}
     */
   def join(right: Table): Table = {
-
-    // join on stream tables is currently not supported
-    tableEnv match {
-      case _: StreamTableEnvironment =>
-        throw new TableException("Join on stream tables is currently not 
supported.")
-      case _ => {
-        // check that right table belongs to the same TableEnvironment
-        if (right.tableEnv != this.tableEnv) {
-          throw new TableException("Only tables from the same TableEnvironment 
can be joined.")
-        }
-
-        // check that join inputs do not have overlapping field names
-        val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
-        val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
-        if (leftFields.intersect(rightFields).nonEmpty) {
-          throw new IllegalArgumentException("Overlapping fields names on join 
input.")
-        }
-
-        relBuilder.push(relNode)
-        relBuilder.push(right.relNode)
-
-        relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
-        val join = relBuilder.build()
-        new Table(join, tableEnv)
-      }
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same 
TableEnvironment can be joined.")
     }
+    new Table(tableEnv,
+      Join(this.logicalPlan, right.logicalPlan, JoinType.INNER, 
None).validate(tableEnv))
   }
 
   /**
@@ -375,32 +272,11 @@ class Table(
     * }}}
     */
   def unionAll(right: Table): Table = {
-
     // check that right table belongs to the same TableEnvironment
     if (right.tableEnv != this.tableEnv) {
-      throw new TableException("Only tables from the same TableEnvironment can 
be unioned.")
-    }
-
-    val leftRowType: List[RelDataTypeField] = 
relNode.getRowType.getFieldList.asScala.toList
-    val rightRowType: List[RelDataTypeField] = 
right.relNode.getRowType.getFieldList.asScala.toList
-
-    if (leftRowType.length != rightRowType.length) {
-      throw new IllegalArgumentException("Unioned tables have varying row 
schema.")
+      throw new ValidationException("Only tables from the same 
TableEnvironment can be unioned.")
     }
-    else {
-      val zipped: List[(RelDataTypeField, RelDataTypeField)] = 
leftRowType.zip(rightRowType)
-      zipped.foreach { case (x, y) =>
-        if (!x.getName.equals(y.getName) || x.getType != y.getType) {
-          throw new IllegalArgumentException("Unioned tables have varying row 
schema.")
-        }
-      }
-    }
-
-    relBuilder.push(relNode)
-    relBuilder.push(right.relNode)
-
-    relBuilder.union(true)
-    new Table(relBuilder.build(), tableEnv)
+    new Table(tableEnv, Union(logicalPlan, 
right.logicalPlan).validate(tableEnv))
   }
 
   /**
@@ -414,22 +290,13 @@ class Table(
     * }}}
     */
   def orderBy(fields: Expression*): Table = {
-    relBuilder.push(relNode)
-
-    if (! fields.forall {
-      case x : UnresolvedFieldReference => true
-      case x : Ordering => x.child.isInstanceOf[UnresolvedFieldReference]
-      case _ => false
-    }) {
-      throw new IllegalArgumentException("All expressions must be field 
references " +
-        "or asc/desc expressions.")
+    val order: Seq[Ordering] = fields.map { case e =>
+      e match {
+        case o: Ordering => o
+        case _ => Asc(e)
+      }
     }
-
-    val exprs = fields.map(_.toRexNode(relBuilder))
-
-    relBuilder.sort(exprs.asJava)
-    new Table(relBuilder.build(), tableEnv)
-
+    new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
   }
 
   /**
@@ -460,7 +327,7 @@ class Table(
   def toSink[T](sink: TableSink[T]): Unit = {
 
     // get schema information of table
-    val rowType = relNode.getRowType
+    val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
     val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
       .map(f => 
TypeConverter.sqlTypeToTypeInfo(f.getType.getSqlTypeName)).toArray
@@ -471,62 +338,14 @@ class Table(
     // emit the table to the configured table sink
     tableEnv.emitToSink(this, configuredSink)
   }
-
-  private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = {
-
-    val names = exprs.map{ e =>
-      e.getKind match {
-        case SqlKind.AS =>
-          e.asInstanceOf[RexCall].getOperands.get(1)
-            .asInstanceOf[RexLiteral].getValue
-            .asInstanceOf[NlsString].getValue
-        case SqlKind.INPUT_REF =>
-          
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
-        case _ =>
-          throw new PlanGenException("Unexpected expression type encountered.")
-      }
-
-    }
-    LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava)
-  }
-
-  private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
-    val names: mutable.Set[String] = mutable.Set()
-
-    exprs.foreach {
-      case n: Naming =>
-        // explicit name
-        if (names.contains(n.name)) {
-          throw new IllegalArgumentException(s"Duplicate field name $n.name.")
-        } else {
-          names.add(n.name)
-        }
-      case u: UnresolvedFieldReference =>
-        // simple field forwarding
-        if (names.contains(u.name)) {
-          throw new IllegalArgumentException(s"Duplicate field name $u.name.")
-        } else {
-          names.add(u.name)
-        }
-      case _ => // Do nothing
-    }
-  }
-
 }
 
 /**
   * A table that has been grouped on a set of grouping keys.
-  *
-  * @param relNode The root node of the relational Calcite [[RelNode]] tree.
-  * @param tableEnv The [[TableEnvironment]] to which the table is bound.
-  * @param groupKey The Calcite [[GroupKey]] of this table.
   */
 class GroupedTable(
-  private[flink] override val relNode: RelNode,
-  private[flink] override val tableEnv: TableEnvironment,
-  private[flink] val groupKey: GroupKey) extends BaseTable(relNode, tableEnv) {
-
-  def relBuilder = tableEnv.getRelBuilder
+  private[flink] val table: Table,
+  private[flink] val groupKey: Seq[Expression]) {
 
   /**
     * Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
@@ -540,31 +359,19 @@ class GroupedTable(
     */
   def select(fields: Expression*): Table = {
 
-    relBuilder.push(relNode)
-
-    // separate aggregations and selection expressions
-    val extractedAggCalls: List[(Expression, List[AggCall])] = fields
-      .map(extractAggCalls(_, tableEnv)).toList
-
-    // get aggregation calls
-    val aggCalls: List[AggCall] = extractedAggCalls.flatMap(_._2)
+    val projectionOnAggregates = fields.map(extractAggregations(_, 
table.tableEnv))
+    val aggregations = projectionOnAggregates.flatMap(_._2)
 
-    // apply aggregations
-    relBuilder.aggregate(groupKey, aggCalls.toIterable.asJava)
-
-    // get selection expressions
-    val exprs: List[RexNode] = try {
-      extractedAggCalls.map(_._1.toRexNode(relBuilder))
-    } catch {
-      case iae: IllegalArgumentException  =>
-        throw new IllegalArgumentException(
-          "Only grouping fields and aggregations allowed after groupBy.", iae)
-      case e: Exception => throw e
+    val logical = if (aggregations.nonEmpty) {
+      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+        Aggregate(groupKey, aggregations, 
table.logicalPlan).validate(table.tableEnv)
+      )
+    } else {
+      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+        Aggregate(groupKey, Nil, table.logicalPlan).validate(table.tableEnv))
     }
 
-    relBuilder.project(exprs.toIterable.asJava)
-
-    new Table(relBuilder.build(), tableEnv)
+    new Table(table.tableEnv, logical.validate(table.tableEnv))
   }
 
   /**
@@ -581,5 +388,4 @@ class GroupedTable(
     val fieldExprs = ExpressionParser.parseExpressionList(fields)
     select(fieldExprs: _*)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
new file mode 100644
index 0000000..63c7013
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+import org.apache.commons.lang.ClassUtils
+
+/**
+ * Generic base class for trees that can be transformed and traversed.
+ */
+abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
+
+  /**
+   * List of child nodes that should be considered when doing transformations. 
Other values
+   * in the Product will not be transformed, only handed through.
+   */
+  def children: Seq[A]
+
+  /**
+   * Tests for equality by first testing for reference equality.
+   */
+  def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
+
+  /**
+    * Do tree transformation in post order.
+    */
+  def postOrderTransform(rule: PartialFunction[A, A]): A = {
+    def childrenTransform(rule: PartialFunction[A, A]): A = {
+      var changed = false
+      val newArgs = productIterator.map {
+        case arg: TreeNode[_] if children.contains(arg) =>
+          val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
+          if (!(newChild fastEquals arg)) {
+            changed = true
+            newChild
+          } else {
+            arg
+          }
+        case args: Traversable[_] => args.map {
+          case arg: TreeNode[_] if children.contains(arg) =>
+            val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
+            if (!(newChild fastEquals arg)) {
+              changed = true
+              newChild
+            } else {
+              arg
+            }
+          case other => other
+        }
+        case nonChild: AnyRef => nonChild
+        case null => null
+      }.toArray
+      if (changed) makeCopy(newArgs) else this
+    }
+
+    val afterChildren = childrenTransform(rule)
+    if (afterChildren fastEquals this) {
+      rule.applyOrElse(this, identity[A])
+    } else {
+      rule.applyOrElse(afterChildren, identity[A])
+    }
+  }
+
+  /**
+    * Runs the given function first on the node and then recursively on all 
its children.
+    */
+  def preOrderVisit(f: A => Unit): Unit = {
+    f(this)
+    children.foreach(_.preOrderVisit(f))
+  }
+
+  /**
+   * Creates a new copy of this expression with new children. This is used 
during transformation
+   * if children change.
+   */
+  def makeCopy(newArgs: Array[AnyRef]): A = {
+    val ctors = getClass.getConstructors.filter(_.getParameterTypes.size > 0)
+    if (ctors.isEmpty) {
+      throw new RuntimeException(s"No valid constructor for 
${getClass.getSimpleName}")
+    }
+
+    val defaultCtor = ctors.find { ctor =>
+      if (ctor.getParameterTypes.size != newArgs.length) {
+        false
+      } else if (newArgs.contains(null)) {
+        false
+      } else {
+        val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
+        ClassUtils.isAssignable(argsClasses, ctor.getParameterTypes)
+      }
+    }.getOrElse(ctors.maxBy(_.getParameterTypes.size))
+
+    try {
+      defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
+    } catch {
+      case e: java.lang.IllegalArgumentException =>
+        throw new IllegalArgumentException(s"Fail to copy treeNode 
${getClass.getName}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
new file mode 100644
index 0000000..1da1d2c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
+import org.apache.flink.api.table.validate._
+
+object TypeCheckUtils {
+
+  def assertNumericExpr(dataType: TypeInformation[_], caller: String): 
ExprValidationResult = {
+    if (dataType.isInstanceOf[NumericTypeInfo[_]]) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$caller requires numeric types, get $dataType here")
+    }
+  }
+
+  def assertOrderableExpr(dataType: TypeInformation[_], caller: String): 
ExprValidationResult = {
+    if (dataType.isSortKeyType) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$caller requires orderable types, get $dataType 
here")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
new file mode 100644
index 0000000..218996d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
+
+/**
+  * Utilities for type conversions.
+  */
+object TypeCoercion {
+
+  val numericWideningPrecedence: IndexedSeq[TypeInformation[_]] =
+    IndexedSeq(
+      BYTE_TYPE_INFO,
+      SHORT_TYPE_INFO,
+      INT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      FLOAT_TYPE_INFO,
+      DOUBLE_TYPE_INFO)
+
+  def widerTypeOf(tp1: TypeInformation[_], tp2: TypeInformation[_]): 
Option[TypeInformation[_]] = {
+    (tp1, tp2) match {
+      case (tp1, tp2) if tp1 == tp2 => Some(tp1)
+
+      case (_, STRING_TYPE_INFO) => Some(STRING_TYPE_INFO)
+      case (STRING_TYPE_INFO, _) => Some(STRING_TYPE_INFO)
+
+      case tuple if 
tuple.productIterator.forall(numericWideningPrecedence.contains) =>
+        val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == 
tp1 || t == tp2)
+        Some(numericWideningPrecedence(higherIndex))
+
+      case _ => None
+    }
+  }
+
+  /**
+    * Test if we can do cast safely without lose of information.
+    */
+  def canSafelyCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean 
= (from, to) match {
+    case (_, STRING_TYPE_INFO) => true
+
+    case tuple if 
tuple.productIterator.forall(numericWideningPrecedence.contains) =>
+      if (numericWideningPrecedence.indexOf(from) < 
numericWideningPrecedence.indexOf(to)) {
+        true
+      } else {
+        false
+      }
+
+    case _ => false
+  }
+
+  /**
+    * All the supported cast types in flink-table.
+    * Note: This may lose information during the cast.
+    */
+  def canCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = 
(from, to) match {
+    case (from, to) if from == to => true
+
+    case (_, STRING_TYPE_INFO) => true
+
+    case (_, DATE_TYPE_INFO) => false // Date type not supported yet.
+    case (_, VOID_TYPE_INFO) => false // Void type not supported
+    case (_, CHAR_TYPE_INFO) => false // Character type not supported.
+
+    case (STRING_TYPE_INFO, _: NumericTypeInfo[_]) => true
+    case (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
+
+    case (BOOLEAN_TYPE_INFO, _: NumericTypeInfo[_]) => true
+    case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) => true
+
+    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => true
+
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala
new file mode 100644
index 0000000..8571051
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.validate
+
+/**
+  * Represents the result of `Expression.validateInput`.
+  */
+sealed trait ExprValidationResult {
+  def isFailure: Boolean = !isSuccess
+  def isSuccess: Boolean
+}
+
+/**
+  * Represents the successful result of `Expression.checkInputDataTypes`.
+  */
+object ValidationSuccess extends ExprValidationResult {
+  val isSuccess: Boolean = true
+}
+
+/**
+  * Represents the failing result of `Expression.checkInputDataTypes`,
+  * with a error message to show the reason of failure.
+  */
+case class ValidationFailure(message: String) extends ExprValidationResult {
+  val isSuccess: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
new file mode 100644
index 0000000..726917e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.validate
+
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.ValidationException
+
+/**
+  * A catalog for looking up user defined functions, used during validation 
phase.
+  */
+class FunctionCatalog {
+
+  private val functionBuilders = mutable.HashMap.empty[String, Class[_]]
+
+  def registerFunction(name: String, builder: Class[_]): Unit =
+    functionBuilders.put(name.toLowerCase, builder)
+
+  /**
+    * Lookup and create an expression if we find a match.
+    */
+  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+    val funcClass = functionBuilders.get(name.toLowerCase).getOrElse {
+      throw new ValidationException(s"undefined function $name")
+    }
+    withChildren(funcClass, children)
+  }
+
+  /**
+    * Instantiate a function using the provided `children`.
+    */
+  private def withChildren(func: Class[_], children: Seq[Expression]): 
Expression = {
+    // Try to find a constructor accepts `Seq[Expression]`
+    Try(func.getDeclaredConstructor(classOf[Seq[_]])) match {
+      case Success(seqCtor) =>
+        Try(seqCtor.newInstance(children).asInstanceOf[Expression]) match {
+          case Success(expr) => expr
+          case Failure(e) => throw new ValidationException(e.getMessage)
+        }
+      case Failure(e) =>
+        val childrenClass = Seq.fill(children.length)(classOf[Expression])
+        // Try to find a constructor matching the exact number of children
+        Try(func.getDeclaredConstructor(childrenClass: _*)) match {
+          case Success(ctor) =>
+            Try(ctor.newInstance(children: _*).asInstanceOf[Expression]) match 
{
+              case Success(expr) => expr
+              case Failure(e) => throw new ValidationException(e.getMessage)
+            }
+          case Failure(e) =>
+            throw new ValidationException(s"Invalid number of arguments for 
function $func")
+        }
+    }
+  }
+
+  /**
+    * Drop a function and return if the function existed.
+    */
+  def dropFunction(name: String): Boolean =
+    functionBuilders.remove(name.toLowerCase).isDefined
+
+  /**
+    * Drop all registered functions.
+    */
+  def clear(): Unit = functionBuilders.clear()
+}
+
+object FunctionCatalog {
+
+  val buildInFunctions: Map[String, Class[_]] = Map(
+    // aggregate functions
+    "avg" -> classOf[Avg],
+    "count" -> classOf[Count],
+    "max" -> classOf[Max],
+    "min" -> classOf[Min],
+    "sum" -> classOf[Sum],
+
+    // string functions
+    "charLength" -> classOf[CharLength],
+    "initCap" -> classOf[InitCap],
+    "like" -> classOf[Like],
+    "lowerCase" -> classOf[Lower],
+    "similar" -> classOf[Similar],
+    "subString" -> classOf[SubString],
+    "trim" -> classOf[Trim],
+    "upperCase" -> classOf[Upper],
+
+    // math functions
+    "abs" -> classOf[Abs],
+    "ceil" -> classOf[Ceil],
+    "exp" -> classOf[Exp],
+    "floor" -> classOf[Floor],
+    "log10" -> classOf[Log10],
+    "ln" -> classOf[Ln],
+    "power" -> classOf[Power],
+    "mod" -> classOf[Mod]
+  )
+
+  /**
+    * Create a new function catalog with build-in functions.
+    */
+  def withBuildIns: FunctionCatalog = {
+    val catalog = new FunctionCatalog()
+    buildInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
+    catalog
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
index 0a2cf57..8fdb2da 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
@@ -24,10 +24,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.*;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -99,7 +96,7 @@ public class TableEnvironmentITCase extends 
TableProgramsTestBase {
                tableEnv.registerDataSet("MyTable", ds2);
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testScanUnregisteredTable() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
@@ -127,7 +124,7 @@ public class TableEnvironmentITCase extends 
TableProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testIllegalName() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
@@ -138,7 +135,7 @@ public class TableEnvironmentITCase extends 
TableProgramsTestBase {
                tableEnv.registerTable("_DataSetTable_42", t);
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testRegisterTableFromOtherEnv() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv1 = 
TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
index d4af08d..ddf5884 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.plan.PlanGenException;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
@@ -59,7 +59,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testAggregationOnNonExistingField() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -141,7 +141,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = PlanGenException.class)
+       @Test(expected = ValidationException.class)
        public void testNonWorkingDataTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -161,7 +161,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = UnsupportedOperationException.class)
+       @Test(expected = ValidationException.class)
        public void testNoNestedAggregation() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
index 0246904..83c7cfd 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
@@ -30,6 +30,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.codegen.CodeGenException;
 import static org.junit.Assert.fail;
+
+import org.apache.flink.api.table.ValidationException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -154,7 +156,7 @@ public class ExpressionsITCase extends 
TableProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testEvalInvalidTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
index 2848347..7a2bedf 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -152,7 +153,7 @@ public class FilterITCase extends TableProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testFilterInvalidField() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
index 70dd793..1906040 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
@@ -40,7 +41,7 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testGroupingOnNonExistentField() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -54,7 +55,7 @@ public class GroupedAggregationsITCase extends 
MultipleProgramsTestBase {
                        .select("a.avg");
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testGroupingInvalidSelection() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
index 4c40596..e6db3b0 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
@@ -121,7 +121,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testJoinNonExistingKey() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -136,7 +136,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
                in1.join(in2).where("foo === e").select("c, g");
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testJoinWithNonMatchingKeyTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -154,7 +154,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
                tableEnv.toDataSet(result, Row.class).collect();
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testJoinWithAmbiguousFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -189,7 +189,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testJoinTablesFromDifferentEnvs() {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tEnv1 = 
TableEnvironment.getTableEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
index 7c9478a..e48914c 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -103,7 +104,7 @@ public class SelectITCase extends TableProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testSelectInvalidField() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
@@ -115,7 +116,7 @@ public class SelectITCase extends TableProgramsTestBase {
                        .select("a + 1, foo + 2");
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testSelectAmbiguousFieldNames() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
index e55bd22..db5eac9 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.batch.table;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.codegen.CodeGenException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
@@ -64,6 +64,26 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
        }
 
        @Test
+       public void testSubstringWithByteStart() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+               DataSet<Tuple2<String, Byte>> ds = env.fromElements(
+                       new Tuple2<>("AAAA", (byte) 2),
+                       new Tuple2<>("BBBB", (byte) 1));
+
+               Table in = tableEnv.fromDataSet(ds, "a, b");
+
+               Table result = in
+                       .select("a.substring(1, b)");
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "AA\nB";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
        public void testSubstringWithMaxEnd() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -83,7 +103,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = CodeGenException.class)
+       @Test(expected = ValidationException.class)
        public void testNonWorkingSubstring1() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -102,7 +122,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                resultSet.collect();
        }
 
-       @Test(expected = CodeGenException.class)
+       @Test(expected = ValidationException.class)
        public void testNonWorkingSubstring2() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -121,7 +141,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                resultSet.collect();
        }
 
-       @Test(expected = CodeGenException.class)
+       @Test(expected = ValidationException.class)
        public void testGeneratedCodeForStringComparison() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -132,7 +152,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
        }
 
-       @Test(expected = CodeGenException.class)
+       @Test(expected = ValidationException.class)
        public void testGeneratedCodeForIntegerEqualsComparison() throws 
Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -143,7 +163,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
        }
 
-       @Test(expected = CodeGenException.class)
+       @Test(expected = ValidationException.class)
        public void testGeneratedCodeForIntegerGreaterComparison() throws 
Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
index a7805f8..853cd7f 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
@@ -80,7 +80,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testUnionIncompatibleNumberOfFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -95,7 +95,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
                in1.unionAll(in2);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testUnionIncompatibleFieldsName() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -110,7 +110,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
                in1.unionAll(in2);
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = ValidationException.class)
        public void testUnionIncompatibleFieldTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
@@ -168,7 +168,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
            compareResultAsText(results, expected);
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testUnionTablesFromDifferentEnvs() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tEnv1 = 
TableEnvironment.getTableEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
index 80a491b..c33e1ef 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException, 
ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -89,7 +89,7 @@ class TableEnvironmentITCase(
     tEnv.registerDataSet("MyTable", ds2)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testScanUnregisteredTable(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -130,7 +130,7 @@ class TableEnvironmentITCase(
     tEnv.registerDataSet("MyTable", t2)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRegisterTableFromOtherEnv(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv1 = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
index c34edbc..6c413e5 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.api.scala.batch.table
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
@@ -49,7 +48,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[ValidationException])
   def testAggregationOnNonExistingField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -137,7 +136,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[PlanGenException])
+  @Test(expected = classOf[ValidationException])
   def testNonWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -150,7 +149,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     t.collect()
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test(expected = classOf[ValidationException])
   def testNoNestedAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
index fdbe3c9..9a0a035 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.codegen.CodeGenException
 import org.apache.flink.api.table.expressions.{Literal, Null}
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Assert._
@@ -144,7 +144,7 @@ class ExpressionsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[ValidationException])
   def testEvalInvalidTypes(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
index 5deff9e..ee0356f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -174,7 +174,7 @@ class FilterITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[ValidationException])
   def testFilterInvalidFieldName(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
index cbd42b2..8889b37 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.batch.table
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.junit._
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class GroupedAggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[ValidationException])
   def testGroupingOnNonExistentField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -45,7 +45,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) 
extends MultipleProgram
       .select('a.avg)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[ValidationException])
   def testGroupingInvalidSelection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment

Reply via email to