[FLINK-3754] [tableAPI] Add validation phase to Table API before construction 
of RelNodes.

This closes #1958


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0d543f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0d543f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0d543f8

Branch: refs/heads/master
Commit: f0d543f8cf95efecef88c77155456104a6d742b9
Parents: f2e6057
Author: Yijie Shen <henry.yijies...@gmail.com>
Authored: Wed Apr 13 16:46:58 2016 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed May 18 00:12:22 2016 +0200

----------------------------------------------------------------------
 .../flink/api/scala/table/expressionDsl.scala   |  74 ++--
 .../flink/api/table/BatchTableEnvironment.scala |  34 +-
 .../api/table/ExpressionParserException.scala   |  23 --
 .../flink/api/table/FlinkPlannerImpl.scala      |   4 +-
 .../api/table/StreamTableEnvironment.scala      |  17 +-
 .../flink/api/table/TableEnvironment.scala      |  43 ++-
 .../apache/flink/api/table/TableException.scala |  23 --
 .../org/apache/flink/api/table/exceptions.scala |  39 +++
 .../api/table/expressions/Expression.scala      |  64 ++--
 .../table/expressions/ExpressionParser.scala    |  42 +--
 .../api/table/expressions/InputTypeSpec.scala   |  55 +++
 .../flink/api/table/expressions/TreeNode.scala  | 120 -------
 .../api/table/expressions/aggregations.scala    |  33 +-
 .../api/table/expressions/arithmetic.scala      |  44 ++-
 .../flink/api/table/expressions/call.scala      |  75 +---
 .../flink/api/table/expressions/cast.scala      |  21 +-
 .../api/table/expressions/comparison.scala      |  41 ++-
 .../api/table/expressions/fieldExpression.scala |  73 +++-
 .../flink/api/table/expressions/literals.scala  |  14 +-
 .../flink/api/table/expressions/logic.scala     |  50 ++-
 .../api/table/expressions/mathExpressions.scala | 115 +++++++
 .../flink/api/table/expressions/ordering.scala  |  21 +-
 .../table/expressions/stringExpressions.scala   | 220 ++++++++++++
 .../api/table/plan/RexNodeTranslator.scala      |  50 ++-
 .../api/table/plan/logical/LogicalNode.scala    | 162 +++++++++
 .../api/table/plan/logical/operators.scala      | 339 +++++++++++++++++++
 .../org/apache/flink/api/table/table.scala      | 296 +++-------------
 .../apache/flink/api/table/trees/TreeNode.scala | 114 +++++++
 .../api/table/typeutils/TypeCheckUtils.scala    |  40 +++
 .../api/table/typeutils/TypeCoercion.scala      |  92 +++++
 .../table/validate/ExprValidationResult.scala   |  41 +++
 .../api/table/validate/FunctionCatalog.scala    | 124 +++++++
 .../api/java/batch/TableEnvironmentITCase.java  |  11 +-
 .../java/batch/table/AggregationsITCase.java    |   8 +-
 .../api/java/batch/table/ExpressionsITCase.java |   4 +-
 .../api/java/batch/table/FilterITCase.java      |   3 +-
 .../batch/table/GroupedAggregationsITCase.java  |   5 +-
 .../flink/api/java/batch/table/JoinITCase.java  |  10 +-
 .../api/java/batch/table/SelectITCase.java      |   5 +-
 .../batch/table/StringExpressionsITCase.java    |  32 +-
 .../flink/api/java/batch/table/UnionITCase.java |  10 +-
 .../scala/batch/TableEnvironmentITCase.scala    |   6 +-
 .../scala/batch/table/AggregationsITCase.scala  |   9 +-
 .../scala/batch/table/ExpressionsITCase.scala   |   4 +-
 .../api/scala/batch/table/FilterITCase.scala    |   4 +-
 .../batch/table/GroupedAggregationsITCase.scala |   6 +-
 .../api/scala/batch/table/JoinITCase.scala      |  10 +-
 .../api/scala/batch/table/SelectITCase.scala    |   8 +-
 .../batch/table/StringExpressionsITCase.scala   |   7 +-
 .../api/scala/batch/table/UnionITCase.scala     |   8 +-
 .../expression/utils/ExpressionEvaluator.scala  |  10 +-
 .../api/scala/stream/table/UnionITCase.scala    |  10 +-
 .../scala/stream/table/UnsupportedOpsTest.scala |   7 +
 53 files changed, 1915 insertions(+), 765 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 0f0b93c..11fb64a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -17,11 +17,11 @@
  */
 package org.apache.flink.api.scala.table
 
+import scala.language.implicitConversions
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.expressions._
 
-import scala.language.implicitConversions
-
 /**
  * These are all the operations that can be used to construct an 
[[Expression]] AST for expression
  * operations.
@@ -63,7 +63,7 @@ trait ImplicitExpressionOperations {
 
   def cast(toType: TypeInformation[_]) = Cast(expr, toType)
 
-  def as(name: Symbol) = Naming(expr, name.name)
+  def as(name: Symbol) = Alias(expr, name.name)
 
   def asc = Asc(expr)
   def desc = Desc(expr)
@@ -91,37 +91,37 @@ trait ImplicitExpressionOperations {
   /**
     * Calculates the Euler's number raised to the given power.
     */
-  def exp() = Call(BuiltInFunctionNames.EXP, expr)
+  def exp() = Exp(expr)
 
   /**
     * Calculates the base 10 logarithm of given value.
     */
-  def log10() = Call(BuiltInFunctionNames.LOG10, expr)
+  def log10() = Log10(expr)
 
   /**
     * Calculates the natural logarithm of given value.
     */
-  def ln() = Call(BuiltInFunctionNames.LN, expr)
+  def ln() = Ln(expr)
 
   /**
     * Calculates the given number raised to the power of the other value.
     */
-  def power(other: Expression) = Call(BuiltInFunctionNames.POWER, expr, other)
+  def power(other: Expression) = Power(expr, other)
 
   /**
     * Calculates the absolute value of given one.
     */
-  def abs() = Call(BuiltInFunctionNames.ABS, expr)
+  def abs() = Abs(expr)
 
   /**
     * Calculates the largest integer less than or equal to a given number.
     */
-  def floor() = Call(BuiltInFunctionNames.FLOOR, expr)
+  def floor() = Floor(expr)
 
   /**
     * Calculates the smallest integer greater than or equal to a given number.
     */
-  def ceil() = Call(BuiltInFunctionNames.CEIL, expr)
+  def ceil() = Ceil(expr)
 
   /**
     * Creates a substring of the given string between the given indices.
@@ -130,9 +130,8 @@ trait ImplicitExpressionOperations {
     * @param endIndex last character of the substring (starting at 1, 
inclusive)
     * @return substring
     */
-  def substring(beginIndex: Expression, endIndex: Expression) = {
-    Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
-  }
+  def substring(beginIndex: Expression, endIndex: Expression) =
+    SubString(expr, beginIndex, endIndex)
 
   /**
     * Creates a substring of the given string beginning at the given index to 
the end.
@@ -140,9 +139,8 @@ trait ImplicitExpressionOperations {
     * @param beginIndex first character of the substring (starting at 1, 
inclusive)
     * @return substring
     */
-  def substring(beginIndex: Expression) = {
-    Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
-  }
+  def substring(beginIndex: Expression) =
+    new SubString(expr, beginIndex)
 
   /**
     * Removes leading and/or trailing characters from the given string.
@@ -155,25 +153,13 @@ trait ImplicitExpressionOperations {
   def trim(
       removeLeading: Boolean = true,
       removeTrailing: Boolean = true,
-      character: Expression = BuiltInFunctionConstants.TRIM_DEFAULT_CHAR) = {
+      character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = {
     if (removeLeading && removeTrailing) {
-      Call(
-        BuiltInFunctionNames.TRIM,
-        BuiltInFunctionConstants.TRIM_BOTH,
-        character,
-        expr)
+      Trim(TrimConstants.TRIM_BOTH, character, expr)
     } else if (removeLeading) {
-      Call(
-        BuiltInFunctionNames.TRIM,
-        BuiltInFunctionConstants.TRIM_LEADING,
-        character,
-        expr)
+      Trim(TrimConstants.TRIM_LEADING, character, expr)
     } else if (removeTrailing) {
-      Call(
-        BuiltInFunctionNames.TRIM,
-        BuiltInFunctionConstants.TRIM_TRAILING,
-        character,
-        expr)
+      Trim(TrimConstants.TRIM_TRAILING, character, expr)
     } else {
       expr
     }
@@ -182,51 +168,39 @@ trait ImplicitExpressionOperations {
   /**
     * Returns the length of a String.
     */
-  def charLength() = {
-    Call(BuiltInFunctionNames.CHAR_LENGTH, expr)
-  }
+  def charLength() = CharLength(expr)
 
   /**
     * Returns all of the characters in a String in upper case using the rules 
of
     * the default locale.
     */
-  def upperCase() = {
-    Call(BuiltInFunctionNames.UPPER_CASE, expr)
-  }
+  def upperCase() = Upper(expr)
 
   /**
     * Returns all of the characters in a String in lower case using the rules 
of
     * the default locale.
     */
-  def lowerCase() = {
-    Call(BuiltInFunctionNames.LOWER_CASE, expr)
-  }
+  def lowerCase() = Lower(expr)
 
   /**
     * Converts the initial letter of each word in a String to uppercase.
     * Assumes a String containing only [A-Za-z0-9], everything else is treated 
as whitespace.
     */
-  def initCap() = {
-    Call(BuiltInFunctionNames.INIT_CAP, expr)
-  }
+  def initCap() = InitCap(expr)
 
   /**
     * Returns true, if a String matches the specified LIKE pattern.
     *
     * e.g. "Jo_n%" matches all Strings that start with "Jo(arbitrary letter)n"
     */
-  def like(pattern: Expression) = {
-    Call(BuiltInFunctionNames.LIKE, expr, pattern)
-  }
+  def like(pattern: Expression) = Like(expr, pattern)
 
   /**
     * Returns true, if a String matches the specified SQL regex pattern.
     *
     * e.g. "A+" matches all Strings that consist of at least one A
     */
-  def similar(pattern: Expression) = {
-    Call(BuiltInFunctionNames.SIMILAR, expr, pattern)
-  }
+  def similar(pattern: Expression) = Similar(expr, pattern)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 39e3105..207500a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -24,6 +24,7 @@ import 
org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
 import org.apache.flink.api.java.io.DiscardingOutputFormat
@@ -31,7 +32,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, 
DataSetConvention}
+import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataSetTable}
 import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
@@ -72,7 +74,7 @@ abstract class BatchTableEnvironment(
     val m = internalNamePattern.findFirstIn(name)
     m match {
       case Some(_) =>
-        throw new TableException(s"Illegal Table name. " +
+        throw new ValidationException(s"Illegal Table name. " +
           s"Please choose a name that does not contain the pattern 
$internalNamePattern")
       case None =>
     }
@@ -87,18 +89,15 @@ abstract class BatchTableEnvironment(
     * The table to scan must be registered in the [[TableEnvironment]]'s 
catalog.
     *
     * @param tableName The name of the table to scan.
-    * @throws TableException if no table is registered under the given name.
+    * @throws ValidationException if no table is registered under the given 
name.
     * @return The scanned table.
     */
-  @throws[TableException]
+  @throws[ValidationException]
   def scan(tableName: String): Table = {
-
     if (isRegistered(tableName)) {
-      relBuilder.scan(tableName)
-      new Table(relBuilder.build(), this)
-    }
-    else {
-      throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
+      new Table(this, CatalogNode(tableName, getRowType(tableName)))
+    } else {
+      throw new ValidationException(s"Table \'$tableName\' was not found in 
the registry.")
     }
   }
 
@@ -133,7 +132,7 @@ abstract class BatchTableEnvironment(
     // transform to a relational tree
     val relational = planner.rel(validated)
 
-    new Table(relational.rel, this)
+    new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
@@ -169,7 +168,7 @@ abstract class BatchTableEnvironment(
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
 
-    val ast = RelOptUtil.toString(table.relNode)
+    val ast = RelOptUtil.toString(table.getRelNode)
     val dataSet = 
translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
@@ -219,15 +218,10 @@ abstract class BatchTableEnvironment(
     * @tparam T The type of the [[DataSet]].
     */
   protected def registerDataSetInternal[T](
-    name: String, dataSet: DataSet[T],
-    fields: Array[Expression]): Unit = {
+      name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, 
fields.toArray)
-    val dataSetTable = new DataSetTable[T](
-      dataSet,
-      fieldIndexes.toArray,
-      fieldNames.toArray
-    )
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
+    val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
     registerTableInternal(name, dataSetTable)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala
deleted file mode 100644
index 2d6fae6..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * Exception for all errors occurring during expression evaluation.
- */
-class ExpressionParserException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index 5a1b3fe..9d0a146 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -34,7 +34,7 @@ import org.apache.calcite.sql.parser.{SqlParser, 
SqlParseException}
 import org.apache.calcite.sql.validate.SqlValidator
 import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, 
SqlRexConvertletTable}
-import org.apache.calcite.tools.{RelConversionException, ValidationException, 
Frameworks, FrameworkConfig}
+import org.apache.calcite.tools.{RelConversionException, ValidationException 
=> CValidationException, Frameworks, FrameworkConfig}
 import org.apache.calcite.util.Util
 import scala.collection.JavaConversions._
 
@@ -96,7 +96,7 @@ class FlinkPlannerImpl(config: FrameworkConfig, var planner: 
RelOptPlanner) {
     }
     catch {
       case e: RuntimeException => {
-        throw new ValidationException(e)
+        throw new CValidationException(e)
       }
     }
     validatedSqlNode

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index be1c005..8ba3000 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -24,10 +24,12 @@ import 
org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, 
DataStreamConvention}
+import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
DataStreamRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.api.table.plan.schema.
@@ -86,18 +88,17 @@ abstract class StreamTableEnvironment(
     * The table to ingest must be registered in the [[TableEnvironment]]'s 
catalog.
     *
     * @param tableName The name of the table to ingest.
-    * @throws TableException if no table is registered under the given name.
+    * @throws ValidationException if no table is registered under the given 
name.
     * @return The ingested table.
     */
-  @throws[TableException]
+  @throws[ValidationException]
   def ingest(tableName: String): Table = {
 
     if (isRegistered(tableName)) {
-      relBuilder.scan(tableName)
-      new Table(relBuilder.build(), this)
+      new Table(this, CatalogNode(tableName, getRowType(tableName)))
     }
     else {
-      throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
+      throw new ValidationException(s"Table \'$tableName\' was not found in 
the registry.")
     }
   }
 
@@ -132,7 +133,7 @@ abstract class StreamTableEnvironment(
     // transform to a relational tree
     val relational = planner.rel(validated)
 
-    new Table(relational.rel, this)
+    new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
@@ -240,7 +241,7 @@ abstract class StreamTableEnvironment(
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): 
DataStream[A] = {
 
-    val relNode = table.relNode
+    val relNode = table.getRelNode
 
     // decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 6ccde47..8aa9e10 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -21,11 +21,13 @@ package org.apache.flink.api.table
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.config.Lex
-import org.apache.calcite.plan.RelOptPlanner
+import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.tools.{Frameworks, FrameworkConfig, RelBuilder}
+import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
+
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.java.table.{BatchTableEnvironment => 
JavaBatchTableEnv}
@@ -35,11 +37,11 @@ import org.apache.flink.api.scala.{ExecutionEnvironment => 
ScalaBatchExecEnv}
 import org.apache.flink.api.scala.table.{BatchTableEnvironment => 
ScalaBatchTableEnv}
 import org.apache.flink.api.scala.table.{StreamTableEnvironment => 
ScalaStreamTableEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.expressions.{Naming, 
UnresolvedFieldReference, Expression}
+import org.apache.flink.api.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.{TransStreamTable, RelTable}
 import org.apache.flink.api.table.sinks.TableSink
-import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.api.table.plan.schema.{RelTable, TransStreamTable}
+import org.apache.flink.api.table.validate.FunctionCatalog
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamExecEnv}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => 
ScalaStreamExecEnv}
 
@@ -72,10 +74,16 @@ abstract class TableEnvironment(val config: TableConfig) {
   // the builder for Calcite RelNodes, Calcite's representation of a 
relational expression tree.
   protected val relBuilder: RelBuilder = RelBuilder.create(frameworkConfig)
 
-  // the planner instance used to optimize queries of this TableEnvironment
-  private val planner: RelOptPlanner = relBuilder
+  private val cluster: RelOptCluster = relBuilder
     .values(Array("dummy"), new Integer(1))
-    .build().getCluster.getPlanner
+    .build().getCluster
+
+  // the planner instance used to optimize queries of this TableEnvironment
+  private val planner: RelOptPlanner = cluster.getPlanner
+
+  private val typeFactory: RelDataTypeFactory = cluster.getTypeFactory
+
+  private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns
 
   // a counter for unique attribute names
   private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
@@ -94,7 +102,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 
     // check that table belongs to this table environment
     if (table.tableEnv != this) {
-      throw new TableException(
+      throw new ValidationException(
         "Only tables that belong to this TableEnvironment can be registered.")
     }
 
@@ -152,7 +160,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     *
     * @param name The name under which the table is registered.
     * @param table The table to register in the catalog
-    * @throws TableException if another table is registered under the provided 
name.
+    * @throws ValidationException if another table is registered under the 
provided name.
     */
   @throws[TableException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
@@ -182,6 +190,10 @@ abstract class TableEnvironment(val config: TableConfig) {
     tables.getTableNames.contains(name)
   }
 
+  protected def getRowType(name: String): RelDataType = {
+    tables.getTable(name).getRowType(typeFactory)
+  }
+
   /** Returns a unique temporary attribute name. */
   private[flink] def createUniqueAttributeName(): String = {
     "TMP_" + attrNameCntr.getAndIncrement()
@@ -197,6 +209,10 @@ abstract class TableEnvironment(val config: TableConfig) {
     planner
   }
 
+  private[flink] def getFunctionCatalog: FunctionCatalog = {
+    functionCatalog
+  }
+
   /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
   private[flink] def getFrameworkConfig: FrameworkConfig = {
     frameworkConfig
@@ -253,7 +269,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       case t: TupleTypeInfo[A] =>
         exprs.zipWithIndex.map {
           case (UnresolvedFieldReference(name), idx) => (idx, name)
-          case (Naming(UnresolvedFieldReference(origName), name), _) =>
+          case (Alias(UnresolvedFieldReference(origName), name), _) =>
             val idx = t.getFieldIndex(origName)
             if (idx < 0) {
               throw new IllegalArgumentException(s"$origName is not a field of 
type $t")
@@ -265,7 +281,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       case c: CaseClassTypeInfo[A] =>
         exprs.zipWithIndex.map {
           case (UnresolvedFieldReference(name), idx) => (idx, name)
-          case (Naming(UnresolvedFieldReference(origName), name), _) =>
+          case (Alias(UnresolvedFieldReference(origName), name), _) =>
             val idx = c.getFieldIndex(origName)
             if (idx < 0) {
               throw new IllegalArgumentException(s"$origName is not a field of 
type $c")
@@ -276,7 +292,7 @@ abstract class TableEnvironment(val config: TableConfig) {
         }
       case p: PojoTypeInfo[A] =>
         exprs.map {
-          case Naming(UnresolvedFieldReference(origName), name) =>
+          case Alias(UnresolvedFieldReference(origName), name) =>
             val idx = p.getFieldIndex(origName)
             if (idx < 0) {
               throw new IllegalArgumentException(s"$origName is not a field of 
type $p")
@@ -389,5 +405,4 @@ object TableEnvironment {
 
     new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala
deleted file mode 100644
index 3e298a4..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * General Exception for all errors during table handling.
- */
-class TableException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
new file mode 100644
index 0000000..a3ab6fd
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+/**
+  * Exception for all errors occurring during expression parsing.
+  */
+case class ExpressionParserException(msg: String) extends RuntimeException(msg)
+
+/**
+  * General Exception for all errors during table handling.
+  */
+case class TableException(msg: String) extends RuntimeException(msg)
+
+/**
+  * Exception for all errors occurring during validation phase.
+  */
+case class ValidationException(msg: String) extends RuntimeException(msg)
+
+/**
+  * Exception for unwanted method calling on unresolved expression.
+  */
+case class UnresolvedException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
index 6960a9f..14e464e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationSuccess}
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+    * Returns the [[TypeInformation]] for evaluating this expression.
+    * It is sometimes not available until the expression is valid.
+    */
+  def resultType: TypeInformation[_]
+
+  /**
+    * One pass validation of the expression tree in post order.
+    */
+  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+    * Check input data types, inputs number or other properties specified by 
this expression.
+    * Return `ValidationSuccess` if it pass the check,
+    * or `ValidationFailure` with supplement message explaining the error.
+    * Note: we should only call this method until `childrenValid == true`
+    */
+  def validateInput(): ExprValidationResult = ValidationSuccess
 
   /**
     * Convert Expression to its counterpart in Calcite, i.e. RexNode
@@ -32,31 +53,36 @@ abstract class Expression extends TreeNode[Expression] { 
self: Product =>
     throw new UnsupportedOperationException(
       s"${this.getClass.getName} cannot be transformed to RexNode"
     )
+
+  def checkEquals(other: Expression): Boolean = {
+    if (this.getClass != other.getClass) {
+      false
+    } else {
+      def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
+        elements1.length == elements2.length && 
elements1.zip(elements2).forall {
+          case (e1: Expression, e2: Expression) => e1.checkEquals(e2)
+          case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
+          case (i1, i2) => i1 == i2
+        }
+      }
+      val elements1 = this.productIterator.toSeq
+      val elements2 = other.productIterator.toSeq
+      checkEquality(elements1, elements2)
+    }
+  }
 }
 
-abstract class BinaryExpression extends Expression { self: Product =>
+abstract class BinaryExpression extends Expression {
   def left: Expression
   def right: Expression
   def children = Seq(left, right)
 }
 
-abstract class UnaryExpression extends Expression { self: Product =>
+abstract class UnaryExpression extends Expression {
   def child: Expression
   def children = Seq(child)
 }
 
-abstract class LeafExpression extends Expression { self: Product =>
+abstract class LeafExpression extends Expression {
   val children = Nil
 }
-
-case class NopExpression() extends LeafExpression {
-  override val name = Expression.freshName("nop")
-}
-
-object Expression {
-  def freshName(prefix: String): String = {
-    s"$prefix-${freshNameCounter.getAndIncrement}"
-  }
-
-  val freshNameCounter = new AtomicInteger
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index ffadca5..db3d187 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -111,8 +111,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
       stringLiteralFlink | singleQuoteStringLiteral |
       boolLiteral | nullLiteral
 
-  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
-    case sym => UnresolvedFieldReference(sym)
+  lazy val fieldReference: PackratParser[NamedExpression] = ident ^^ {
+    sym => UnresolvedFieldReference(sym)
   }
 
   lazy val atom: PackratParser[Expression] =
@@ -155,7 +155,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val suffixAs: PackratParser[Expression] =
     composite ~ "." ~ AS ~ "(" ~ fieldReference ~ ")" ^^ {
-    case e ~ _ ~ _ ~ _ ~ target ~ _ => Naming(e, target.name)
+    case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.name)
   }
 
   lazy val suffixEval: PackratParser[Expression] =
@@ -165,27 +165,23 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val suffixFunctionCall =
     composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
-    case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand 
:: args : _*)
+    case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand 
:: args)
   }
 
   lazy val suffixTrim = composite ~ ".trim(" ~ ("BOTH" | "LEADING" | 
"TRAILING") ~ "," ~
       expression ~ ")" ^^ {
     case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ =>
       val flag = trimType match {
-        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
-        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
-        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
+        case "BOTH" => TrimConstants.TRIM_BOTH
+        case "LEADING" => TrimConstants.TRIM_LEADING
+        case "TRAILING" => TrimConstants.TRIM_TRAILING
       }
-      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
+      Trim(flag, trimCharacter, operand)
   }
 
   lazy val suffixTrimWithoutArgs = composite <~ ".trim" ~ opt("()") ^^ {
     case e =>
-      Call(
-        BuiltInFunctionNames.TRIM,
-        BuiltInFunctionConstants.TRIM_BOTH,
-        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
-        e)
+      Trim(TrimConstants.TRIM_BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
   }
 
   lazy val suffixed: PackratParser[Expression] =
@@ -223,7 +219,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val prefixAs: PackratParser[Expression] =
     AS ~ "(" ~ expression ~ "," ~ fieldReference ~ ")" ^^ {
-    case _ ~ _ ~ e ~ _ ~ target ~ _ => Naming(e, target.name)
+    case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.name)
   }
 
   lazy val prefixEval: PackratParser[Expression] = composite ~
@@ -232,27 +228,23 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   }
 
   lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") 
~ ")" ^^ {
-    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*)
+    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
   }
 
   lazy val prefixTrim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ 
expression ~
       "," ~ expression ~ ")" ^^ {
     case _ ~ trimType ~ _ ~ trimCharacter ~ _ ~ operand ~ _ =>
       val flag = trimType match {
-        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
-        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
-        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
+        case "BOTH" => TrimConstants.TRIM_BOTH
+        case "LEADING" => TrimConstants.TRIM_LEADING
+        case "TRAILING" => TrimConstants.TRIM_TRAILING
       }
-      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
+      Trim(flag, trimCharacter, operand)
   }
 
   lazy val prefixTrimWithoutArgs = "trim(" ~ expression ~ ")" ^^ {
     case _ ~ operand ~ _ =>
-      Call(
-        BuiltInFunctionNames.TRIM,
-        BuiltInFunctionConstants.TRIM_BOTH,
-        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
-        operand)
+      Trim(TrimConstants.TRIM_BOTH, TrimConstants.TRIM_DEFAULT_CHAR, operand)
   }
 
   lazy val prefixed: PackratParser[Expression] =
@@ -322,7 +314,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   // alias
 
   lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
-    case e ~ _ ~ name => Naming(e, name.name)
+    case e ~ _ ~ name => Alias(e, name.name)
   } | logic
 
   lazy val expression: PackratParser[Expression] = alias

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
new file mode 100644
index 0000000..9cb52d5
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import scala.collection.mutable
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.validate._
+
+/**
+  * Expressions that have specification on its inputs.
+  */
+trait InputTypeSpec extends Expression {
+
+  /**
+    * Input type specification for each child.
+    *
+    * For example, [[Power]] expecting both of the children be of Double Type 
should use:
+    * {{{
+    *   def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: 
DOUBLE_TYPE_INFO :: Nil
+    * }}}
+    */
+  def expectedTypes: Seq[TypeInformation[_]]
+
+  override def validateInput(): ExprValidationResult = {
+    val typeMismatches = mutable.ArrayBuffer.empty[String]
+    children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) =>
+      if (e.resultType != tpe) {
+        typeMismatches += s"expecting $tpe on ${i}th input, get 
${e.resultType}"
+      }
+    }
+    if (typeMismatches.isEmpty) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"$this fails on input type checking: ${typeMismatches.mkString("[", 
", ", "]")}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala
deleted file mode 100644
index 9d4ca80..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-/**
- * Generic base class for trees that can be transformed and traversed.
- */
-abstract class TreeNode[A <: TreeNode[A]] { self: A with Product =>
-
-  /**
-   * List of child nodes that should be considered when doing transformations. 
Other values
-   * in the Product will not be transformed, only handed through.
-   */
-  def children: Seq[A]
-
-  /**
-   * Tests for equality by first testing for reference equality.
-   */
-  def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
-
-  def transformPre(rule: PartialFunction[A, A]): A = {
-    val afterTransform = rule.applyOrElse(this, identity[A])
-
-    if (afterTransform fastEquals this) {
-      this.transformChildrenPre(rule)
-    } else {
-      afterTransform.transformChildrenPre(rule)
-    }
-  }
-
-  def transformChildrenPre(rule: PartialFunction[A, A]): A = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: A if children.contains(child) =>
-        val newChild = child.transformPre(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def transformPost(rule: PartialFunction[A, A]): A = {
-    val afterChildren = transformChildrenPost(rule)
-    if (afterChildren fastEquals this) {
-      rule.applyOrElse(this, identity[A])
-    } else {
-      rule.applyOrElse(afterChildren, identity[A])
-    }
-  }
-
-  def transformChildrenPost(rule: PartialFunction[A, A]): A = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: A if children.contains(child) =>
-        val newChild = child.transformPost(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-    // toArray forces evaluation, toSeq does not seem to work here
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def exists(predicate: A => Boolean): Boolean = {
-    var exists = false
-    this.transformPre {
-      case e: A => if (predicate(e)) {
-        exists = true
-      }
-        e
-    }
-    exists
-  }
-
-  /**
-   * Creates a new copy of this expression with new children. This is used 
during transformation
-   * if children change. This must be overridden by tree nodes that don't have 
the Constructor
-   * arguments in the same order as the `children`.
-   */
-  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-    val defaultCtor =
-      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
-    try {
-      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
-    } catch {
-      case iae: IllegalArgumentException =>
-        println("IAE " + this)
-        throw new RuntimeException("Should never happen.")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index 8cd9dc3..24ce85f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -22,7 +22,10 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
 
-abstract sealed class Aggregation extends UnaryExpression { self: Product =>
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.typeutils.TypeCheckUtils
+
+abstract sealed class Aggregation extends UnaryExpression {
 
   override def toString = s"Aggregate($child)"
 
@@ -36,41 +39,59 @@ abstract sealed class Aggregation extends UnaryExpression { 
self: Product =>
 }
 
 case class Sum(child: Expression) extends Aggregation {
-  override def toString = s"($child).sum"
+  override def toString = s"sum($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
     relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, 
child.toRexNode)
   }
+
+  override def resultType = child.resultType
+
+  override def validateInput = 
TypeCheckUtils.assertNumericExpr(child.resultType, "sum")
 }
 
 case class Min(child: Expression) extends Aggregation {
-  override def toString = s"($child).min"
+  override def toString = s"min($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
     relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, 
child.toRexNode)
   }
+
+  override def resultType = child.resultType
+
+  override def validateInput = 
TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
 }
 
 case class Max(child: Expression) extends Aggregation {
-  override def toString = s"($child).max"
+  override def toString = s"max($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
     relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, 
child.toRexNode)
   }
+
+  override def resultType = child.resultType
+
+  override def validateInput = 
TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
 }
 
 case class Count(child: Expression) extends Aggregation {
-  override def toString = s"($child).count"
+  override def toString = s"count($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
     relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, 
child.toRexNode)
   }
+
+  override def resultType = BasicTypeInfo.LONG_TYPE_INFO
 }
 
 case class Avg(child: Expression) extends Aggregation {
-  override def toString = s"($child).avg"
+  override def toString = s"avg($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
     relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, 
child.toRexNode)
   }
+
+  override def resultType = child.resultType
+
+  override def validateInput = 
TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index ca67697..0ce4685 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -25,15 +25,34 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo, 
TypeInformation}
+import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeCoercion, 
TypeConverter}
+import org.apache.flink.api.table.validate._
 
-abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
+abstract class BinaryArithmetic extends BinaryExpression {
   def sqlOperator: SqlOperator
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(sqlOperator, children.map(_.toRexNode))
   }
+
+  override def resultType: TypeInformation[_] =
+    TypeCoercion.widerTypeOf(left.resultType, right.resultType) match {
+      case Some(t) => t
+      case None =>
+        throw new RuntimeException("This should never happen.")
+    }
+
+  // TODO: tighten this rule once we implemented type coercion rules during 
validation
+  override def validateInput(): ExprValidationResult = {
+    if (!left.resultType.isInstanceOf[NumericTypeInfo[_]] ||
+      !right.resultType.isInstanceOf[NumericTypeInfo[_]]) {
+      ValidationFailure(s"$this requires both operands Numeric, get" +
+        s"${left.resultType} and ${right.resultType}")
+    } else {
+      ValidationSuccess
+    }
+  }
 }
 
 case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
@@ -56,6 +75,20 @@ case class Plus(left: Expression, right: Expression) extends 
BinaryArithmetic {
       relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
     }
   }
+
+  // TODO: tighten this rule once we implemented type coercion rules during 
validation
+  override def validateInput(): ExprValidationResult = {
+    if (left.resultType == BasicTypeInfo.STRING_TYPE_INFO ||
+        right.resultType == BasicTypeInfo.STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else if (!left.resultType.isInstanceOf[NumericTypeInfo[_]] ||
+        !right.resultType.isInstanceOf[NumericTypeInfo[_]]) {
+      ValidationFailure(s"$this requires Numeric or String input," +
+        s" get ${left.resultType} and ${right.resultType}")
+    } else {
+      ValidationSuccess
+    }
+  }
 }
 
 case class UnaryMinus(child: Expression) extends UnaryExpression {
@@ -64,6 +97,11 @@ case class UnaryMinus(child: Expression) extends 
UnaryExpression {
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, child.toRexNode)
   }
+
+  override def resultType = child.resultType
+
+  override def validateInput(): ExprValidationResult =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "unary minus")
 }
 
 case class Minus(left: Expression, right: Expression) extends BinaryArithmetic 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
index e36a784..bf2e6ba 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -18,85 +18,28 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
+
 /**
   * General expression for unresolved function calls. The function can be a 
built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
 
   override def children: Seq[Expression] = args
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(
-      BuiltInFunctionNames.toSqlOperator(functionName),
-      args.map(_.toRexNode): _*)
+    throw new UnresolvedException(s"trying to convert UnresolvedFunction 
$functionName to RexNode")
   }
 
   override def toString = s"\\$functionName(${args.mkString(", ")})"
 
-  override def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-    val copy = Call(
-      newArgs.head.asInstanceOf[String],
-      newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*)
-
-    copy.asInstanceOf[this.type]
-  }
-}
-
-/**
-  * Enumeration of common function names.
-  */
-object BuiltInFunctionNames {
-  val SUBSTRING = "SUBSTRING"
-  val TRIM = "TRIM"
-  val CHAR_LENGTH = "CHARLENGTH"
-  val UPPER_CASE = "UPPERCASE"
-  val LOWER_CASE = "LOWERCASE"
-  val INIT_CAP = "INITCAP"
-  val LIKE = "LIKE"
-  val SIMILAR = "SIMILAR"
-  val MOD = "MOD"
-  val EXP = "EXP"
-  val LOG10 = "LOG10"
-  val POWER = "POWER"
-  val LN = "LN"
-  val ABS = "ABS"
-  val FLOOR = "FLOOR"
-  val CEIL = "CEIL"
-
-  def toSqlOperator(name: String): SqlOperator = {
-    name match {
-      case BuiltInFunctionNames.SUBSTRING => SqlStdOperatorTable.SUBSTRING
-      case BuiltInFunctionNames.TRIM => SqlStdOperatorTable.TRIM
-      case BuiltInFunctionNames.CHAR_LENGTH => SqlStdOperatorTable.CHAR_LENGTH
-      case BuiltInFunctionNames.UPPER_CASE => SqlStdOperatorTable.UPPER
-      case BuiltInFunctionNames.LOWER_CASE => SqlStdOperatorTable.LOWER
-      case BuiltInFunctionNames.INIT_CAP => SqlStdOperatorTable.INITCAP
-      case BuiltInFunctionNames.LIKE => SqlStdOperatorTable.LIKE
-      case BuiltInFunctionNames.SIMILAR => SqlStdOperatorTable.SIMILAR_TO
-      case BuiltInFunctionNames.EXP => SqlStdOperatorTable.EXP
-      case BuiltInFunctionNames.LOG10 => SqlStdOperatorTable.LOG10
-      case BuiltInFunctionNames.POWER => SqlStdOperatorTable.POWER
-      case BuiltInFunctionNames.LN => SqlStdOperatorTable.LN
-      case BuiltInFunctionNames.ABS => SqlStdOperatorTable.ABS
-      case BuiltInFunctionNames.MOD => SqlStdOperatorTable.MOD
-      case BuiltInFunctionNames.FLOOR => SqlStdOperatorTable.FLOOR
-      case BuiltInFunctionNames.CEIL => SqlStdOperatorTable.CEIL
-      case _ => ???
-    }
-  }
-}
+  override def resultType =
+    throw new UnresolvedException(s"calling resultType on UnresolvedFunction 
$functionName")
 
-/**
-  * Enumeration of common function flags.
-  */
-object BuiltInFunctionConstants {
-  val TRIM_BOTH = Literal(0)
-  val TRIM_LEADING = Literal(1)
-  val TRIM_TRAILING = Literal(2)
-  val TRIM_DEFAULT_CHAR = Literal(" ")
+  override def validateInput(): ExprValidationResult =
+    ValidationFailure(s"Unresolved function call: $functionName")
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
index fdad1f6..3b8b0e7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -21,18 +21,27 @@ import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.typeutils.{TypeCoercion, TypeConverter}
+import org.apache.flink.api.table.validate._
 
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
+case class Cast(child: Expression, resultType: TypeInformation[_]) extends 
UnaryExpression {
 
-  override def toString = s"$child.cast($tpe)"
+  override def toString = s"$child.cast($resultType)"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe))
+    relBuilder.cast(child.toRexNode, 
TypeConverter.typeInfoToSqlType(resultType))
   }
 
-  override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
+  override def makeCopy(anyRefs: Array[AnyRef]): this.type = {
     val child: Expression = anyRefs.head.asInstanceOf[Expression]
-    copy(child, tpe).asInstanceOf[this.type]
+    copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override def validateInput(): ExprValidationResult = {
+    if (TypeCoercion.canCast(child.resultType, resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Unsupported cast from ${child.resultType} to 
$resultType")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index 124393c..63caeaa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -24,24 +24,59 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 
-abstract class BinaryComparison extends BinaryExpression { self: Product =>
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo
+import org.apache.flink.api.table.validate._
+
+abstract class BinaryComparison extends BinaryExpression {
   def sqlOperator: SqlOperator
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(sqlOperator, children.map(_.toRexNode))
   }
+
+  override def resultType = BOOLEAN_TYPE_INFO
+
+  // TODO: tighten this rule once we implemented type coercion rules during 
validation
+  override def validateInput(): ExprValidationResult = (left.resultType, 
right.resultType) match {
+    case (STRING_TYPE_INFO, STRING_TYPE_INFO) => ValidationSuccess
+    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => ValidationSuccess
+    case (lType, rType) =>
+      ValidationFailure(
+        s"Comparison is only supported for Strings and numeric types, get 
$lType and $rType")
+  }
 }
 
 case class EqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
   override def toString = s"$left === $right"
 
   val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS
+
+  override def validateInput(): ExprValidationResult = (left.resultType, 
right.resultType) match {
+    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => ValidationSuccess
+    case (lType, rType) =>
+      if (lType != rType) {
+        ValidationFailure(s"Equality predicate on incompatible types: $lType 
and $rType")
+      } else {
+        ValidationSuccess
+      }
+  }
 }
 
 case class NotEqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
   override def toString = s"$left !== $right"
 
   val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS
+
+  override def validateInput(): ExprValidationResult = (left.resultType, 
right.resultType) match {
+    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => ValidationSuccess
+    case (lType, rType) =>
+      if (lType != rType) {
+        ValidationFailure(s"Equality predicate on incompatible types: $lType 
and $rType")
+      } else {
+        ValidationSuccess
+      }
+  }
 }
 
 case class GreaterThan(left: Expression, right: Expression) extends 
BinaryComparison {
@@ -74,6 +109,8 @@ case class IsNull(child: Expression) extends UnaryExpression 
{
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.isNull(child.toRexNode)
   }
+
+  override def resultType = BOOLEAN_TYPE_INFO
 }
 
 case class IsNotNull(child: Expression) extends UnaryExpression {
@@ -82,4 +119,6 @@ case class IsNotNull(child: Expression) extends 
UnaryExpression {
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.isNotNull(child.toRexNode)
   }
+
+  override def resultType = BOOLEAN_TYPE_INFO
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
index 82f7653..24283d0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -20,27 +20,88 @@ package org.apache.flink.api.table.expressions
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-case class UnresolvedFieldReference(override val name: String) extends 
LeafExpression {
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
+
+trait NamedExpression extends Expression {
+  def name: String
+  def toAttribute: Attribute
+}
+
+abstract class Attribute extends LeafExpression with NamedExpression {
+  override def toAttribute: Attribute = this
+
+  def withName(newName: String): Attribute
+}
+
+case class UnresolvedFieldReference(name: String) extends Attribute {
+
   override def toString = "\"" + name
 
+  override def withName(newName: String): Attribute = 
UnresolvedFieldReference(newName)
+
+  override def resultType: TypeInformation[_] =
+    throw new UnresolvedException(s"calling resultType on ${this.getClass}")
+
+  override def validateInput(): ExprValidationResult =
+    ValidationFailure(s"Unresolved reference $name")
+}
+
+case class ResolvedFieldReference(
+    name: String,
+    resultType: TypeInformation[_]) extends Attribute {
+
+  override def toString = s"'$name"
+
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.field(name)
   }
-}
 
-case class ResolvedFieldReference(override val name: String) extends 
LeafExpression {
-  override def toString = s"'$name"
+  override def withName(newName: String): Attribute = {
+    if (newName == name) {
+      this
+    } else {
+      ResolvedFieldReference(newName, resultType)
+    }
+  }
 }
 
-case class Naming(child: Expression, override val name: String) extends 
UnaryExpression {
+case class Alias(child: Expression, name: String)
+    extends UnaryExpression with NamedExpression {
+
   override def toString = s"$child as '$name"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.alias(child.toRexNode, name)
   }
 
-  override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
+  override def resultType: TypeInformation[_] = child.resultType
+
+  override def makeCopy(anyRefs: Array[AnyRef]): this.type = {
     val child: Expression = anyRefs.head.asInstanceOf[Expression]
     copy(child, name).asInstanceOf[this.type]
   }
+
+  override def toAttribute: Attribute = {
+    if (valid) {
+      ResolvedFieldReference(name, child.resultType)
+    } else {
+      UnresolvedFieldReference(name)
+    }
+  }
+}
+
+case class UnresolvedAlias(child: Expression) extends UnaryExpression with 
NamedExpression {
+
+  override def name: String =
+    throw new UnresolvedException("Invalid call to name on UnresolvedAlias")
+
+  override def toAttribute: Attribute =
+    throw new UnresolvedException("Invalid call to toAttribute on 
UnresolvedAlias")
+
+  override def resultType: TypeInformation[_] =
+    throw new UnresolvedException("Invalid call to resultType on 
UnresolvedAlias")
+
+  override lazy val valid = false
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index 1fbe5a3..9caec26 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -22,7 +22,6 @@ import java.util.Date
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala.table.ImplicitExpressionOperations
 import org.apache.flink.api.table.typeutils.TypeConverter
 
 object Literal {
@@ -39,11 +38,7 @@ object Literal {
   }
 }
 
-case class Literal(value: Any, tpe: TypeInformation[_])
-  extends LeafExpression with ImplicitExpressionOperations {
-  def expr = this
-  def typeInfo = tpe
-
+case class Literal(value: Any, resultType: TypeInformation[_]) extends 
LeafExpression {
   override def toString = s"$value"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
@@ -51,13 +46,10 @@ case class Literal(value: Any, tpe: TypeInformation[_])
   }
 }
 
-case class Null(tpe: TypeInformation[_]) extends LeafExpression {
-  def expr = this
-  def typeInfo = tpe
-
+case class Null(resultType: TypeInformation[_]) extends LeafExpression {
   override def toString = s"null"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    
relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(tpe))
+    
relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(resultType))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
index 37a6597..90d3dbc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
@@ -21,25 +21,47 @@ import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 
-abstract class BinaryPredicate extends BinaryExpression { self: Product => }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.validate._
+
+abstract class BinaryPredicate extends BinaryExpression {
+  override def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+        right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$this only accepts children of Boolean Type, " +
+        s"get ${left.resultType} and ${right.resultType}")
+    }
+  }
+}
 
 case class Not(child: Expression) extends UnaryExpression {
 
-  override val name = Expression.freshName("not-" + child.name)
-
   override def toString = s"!($child)"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.not(child.toRexNode)
   }
+
+  override def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Not only accepts child of Boolean Type, " +
+        s"get ${child.resultType}")
+    }
+  }
 }
 
 case class And(left: Expression, right: Expression) extends BinaryPredicate {
 
   override def toString = s"$left && $right"
 
-  override val name = Expression.freshName(left.name + "-and-" + right.name)
-
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.and(left.toRexNode, right.toRexNode)
   }
@@ -49,8 +71,6 @@ case class Or(left: Expression, right: Expression) extends 
BinaryPredicate {
 
   override def toString = s"$left || $right"
 
-  override val name = Expression.freshName(left.name + "-or-" + right.name)
-
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.or(left.toRexNode, right.toRexNode)
   }
@@ -63,10 +83,9 @@ case class Eval(
   extends Expression {
   def children = Seq(condition, ifTrue, ifFalse)
 
-  override def toString = s"($condition)? $ifTrue : $ifFalse"
+  override def resultType = ifTrue.resultType
 
-  override val name = Expression.freshName("if-" + condition.name +
-    "-then-" + ifTrue.name + "-else-" + ifFalse.name)
+  override def toString = s"($condition)? $ifTrue : $ifFalse"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     val c = condition.toRexNode
@@ -74,4 +93,15 @@ case class Eval(
     val f = ifFalse.toRexNode
     relBuilder.call(SqlStdOperatorTable.CASE, c, t, f)
   }
+
+  override def validateInput(): ExprValidationResult = {
+    if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+        ifTrue.resultType == ifFalse.resultType) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"Eval should have boolean condition and same type of ifTrue and 
ifFalse, get " +
+          s"(${condition.resultType}, ${ifTrue.resultType}, 
${ifFalse.resultType})")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
new file mode 100644
index 0000000..cf734f0
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils
+import org.apache.flink.api.table.validate._
+
+case class Abs(child: Expression) extends UnaryExpression {
+  override def resultType: TypeInformation[_] = child.resultType
+
+  override def validateInput(): ExprValidationResult =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "Abs")
+
+  override def toString(): String = s"abs($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.ABS, child.toRexNode)
+  }
+}
+
+case class Ceil(child: Expression) extends UnaryExpression {
+  override def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "Ceil")
+
+  override def toString(): String = s"ceil($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.CEIL, child.toRexNode)
+  }
+}
+
+case class Exp(child: Expression) extends UnaryExpression with InputTypeSpec {
+  override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString(): String = s"exp($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.EXP, child.toRexNode)
+  }
+}
+
+
+case class Floor(child: Expression) extends UnaryExpression {
+  override def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "Floor")
+
+  override def toString(): String = s"floor($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.FLOOR, child.toRexNode)
+  }
+}
+
+case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec 
{
+  override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString(): String = s"log10($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LOG10, child.toRexNode)
+  }
+}
+
+case class Ln(child: Expression) extends UnaryExpression with InputTypeSpec {
+  override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString(): String = s"ln($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LN, child.toRexNode)
+  }
+}
+
+case class Power(left: Expression, right: Expression) extends BinaryExpression 
with InputTypeSpec {
+  override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: 
DOUBLE_TYPE_INFO :: Nil
+
+  override def toString(): String = s"pow($left, $right)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.POWER, left.toRexNode, right.toRexNode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
index 75fa078..887cf60 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
@@ -16,28 +16,39 @@
  * limitations under the License.
  */
 package org.apache.flink.api.table.expressions
+
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Ordering extends UnaryExpression { self: Product =>
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.validate._
+
+abstract class Ordering extends UnaryExpression {
+  override def validateInput(): ExprValidationResult = {
+    if (!child.isInstanceOf[NamedExpression]) {
+      ValidationFailure(s"Sort should only based on field reference")
+    } else {
+      ValidationSuccess
+    }
+  }
 }
 
 case class Asc(child: Expression) extends Ordering {
   override def toString: String = s"($child).asc"
 
-  override def name: String = child.name + "-asc"
-
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     child.toRexNode
   }
+
+  override def resultType: TypeInformation[_] = child.resultType
 }
 
 case class Desc(child: Expression) extends Ordering {
   override def toString: String = s"($child).desc"
 
-  override def name: String = child.name + "-desc"
-
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.desc(child.toRexNode)
   }
+
+  override def resultType: TypeInformation[_] = child.resultType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
new file mode 100644
index 0000000..11825f7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import scala.collection.JavaConversions._
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.validate._
+
+/**
+  * Returns the length of this `str`.
+  */
+case class CharLength(child: Expression) extends UnaryExpression {
+  override def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"CharLength only accepts String input, get 
${child.resultType}")
+    }
+  }
+
+  override def toString(): String = s"($child).charLength()"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.CHAR_LENGTH, child.toRexNode)
+  }
+}
+
+/**
+  * Returns str with the first letter of each word in uppercase.
+  * All other letters are in lowercase. Words are delimited by white space.
+  */
+case class InitCap(child: Expression) extends UnaryExpression {
+  override def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"InitCap only accepts String input, get 
${child.resultType}")
+    }
+  }
+
+  override def toString(): String = s"($child).initCap()"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.INITCAP, child.toRexNode)
+  }
+}
+
+/**
+  * Returns true if `str` matches `pattern`.
+  */
+case class Like(str: Expression, pattern: Expression) extends BinaryExpression 
{
+  def left: Expression = str
+  def right: Expression = pattern
+
+  override def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == 
STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Like only accepts (String, String) input, " +
+        s"get (${str.resultType}, ${pattern.resultType})")
+    }
+  }
+
+  override def toString(): String = s"($str).like($pattern)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LIKE, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns str with all characters changed to lowercase.
+  */
+case class Lower(child: Expression) extends UnaryExpression {
+  override def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Lower only accepts String input, get 
${child.resultType}")
+    }
+  }
+
+  override def toString(): String = s"($child).toLowerCase()"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LOWER, child.toRexNode)
+  }
+}
+
+/**
+  * Returns true if `str` is similar to `pattern`.
+  */
+case class Similar(str: Expression, pattern: Expression) extends 
BinaryExpression {
+  def left: Expression = str
+  def right: Expression = pattern
+
+  override def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == 
STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Similar only accepts (String, String) input, " +
+        s"get (${str.resultType}, ${pattern.resultType})")
+    }
+  }
+
+  override def toString(): String = s"($str).similarTo($pattern)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.SIMILAR_TO, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns subString of `str` from `begin`(inclusive) to `end`(not inclusive).
+  */
+case class SubString(
+    str: Expression,
+    begin: Expression,
+    end: Expression) extends Expression with InputTypeSpec {
+
+  def this(str: Expression, begin: Expression) = this(str, begin, 
CharLength(str))
+
+  override def children: Seq[Expression] = str :: begin :: end :: Nil
+
+  override def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  override def toString(): String = s"$str.subString($begin, $end)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.SUBSTRING, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Trim `trimString` from `str` according to `trimFlag`:
+  * 0 for TRIM_BOTH, 1 for TRIM_LEADING and 2 for TRIM_TRAILING.
+  */
+case class Trim(
+    trimFlag: Expression,
+    trimString: Expression,
+    str: Expression) extends Expression with InputTypeSpec {
+
+  override def children: Seq[Expression] = trimFlag :: trimString :: str :: Nil
+
+  override def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+  override def toString(): String = s"trim($trimFlag, $trimString, $str)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.TRIM, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Enumeration of trim flags.
+  */
+object TrimConstants {
+  val TRIM_BOTH = Literal(0)
+  val TRIM_LEADING = Literal(1)
+  val TRIM_TRAILING = Literal(2)
+  val TRIM_DEFAULT_CHAR = Literal(" ")
+}
+
+/**
+  * Returns str with all characters changed to uppercase.
+  */
+case class Upper(child: Expression) extends UnaryExpression {
+  override def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Upper only accepts String input, get 
${child.resultType}")
+    }
+  }
+
+  override def toString(): String = s"($child).toUpperCase()"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.UPPER, child.toRexNode)
+  }
+}

Reply via email to