Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite d720b002a -> 3f8cea74a


[FLINK-3604][tableAPI] Enable and fix ignored tests.

This closes #1782.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f8cea74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f8cea74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f8cea74

Branch: refs/heads/tableOnCalcite
Commit: 3f8cea74a79d31ee1d0cb74a767635f70a29e3c1
Parents: d720b00
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Mar 10 14:59:18 2016 +0100
Committer: vasia <va...@apache.org>
Committed: Fri Mar 11 15:44:20 2016 +0100

----------------------------------------------------------------------
 .../api/java/table/JavaBatchTranslator.scala    |   6 +-
 .../api/scala/table/DataSetConversions.scala    |  14 +-
 .../apache/flink/api/scala/table/package.scala  |   2 +-
 .../flink/api/table/plan/TypeConverter.scala    |  23 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   2 +-
 .../org/apache/flink/api/table/table.scala      | 361 ++++++++++---------
 .../flink/api/table/typeinfo/RowTypeInfo.scala  |  15 +-
 .../api/java/table/test/AggregationsITCase.java |   8 +-
 .../flink/api/java/table/test/AsITCase.java     |  84 ++---
 .../table/test/GroupedAggregationsITCase.java   |   1 -
 .../api/java/table/test/PojoGroupingITCase.java |   3 +-
 .../table/test/StringExpressionsITCase.java     |  16 +-
 .../scala/table/test/PageRankTableITCase.java   |   4 +-
 .../scala/table/test/AggregationsITCase.scala   |  16 +-
 .../flink/api/scala/table/test/AsITCase.scala   |  49 ++-
 .../api/scala/table/test/CastingITCase.scala    |   3 +-
 .../api/scala/table/test/FilterITCase.scala     |  35 +-
 .../table/test/StringExpressionsITCase.scala    |  13 +-
 18 files changed, 316 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 14ee78e..dbbe7e8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -88,11 +88,13 @@ class JavaBatchTranslator(config: TableConfig) extends 
PlanTranslator {
           s"Cannot generate a valid execution plan for the given query: \n\n" +
           s"${RelOptUtil.toString(lPlan)}\n" +
           "Please consider filing a bug report.", e)
+      case a: AssertionError =>
+        throw a.getCause
     }
 
-    println("---------------")
+    println("-------------")
     println("DataSet Plan:")
-    println("---------------")
+    println("-------------")
     println(RelOptUtil.toString(dataSetPlan))
 
     dataSetPlan match {

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
index 2508a3d..6e630ea 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.table
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table._
 import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, 
Expression}
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -27,7 +28,7 @@ import org.apache.flink.api.scala._
  * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is
  * wrapped in this by the implicit conversions in 
[[org.apache.flink.api.scala.table]].
  */
-class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
+class DataSetConversions[T](set: DataSet[T], inputType: TypeInformation[T]) {
 
   /**
    * Converts the [[DataSet]] to a [[Table]]. The field names can be specified 
like this:
@@ -59,8 +60,15 @@ class DataSetConversions[T](set: DataSet[T], inputType: 
CompositeType[T]) {
    * of type `Int`.
    */
   def toTable: Table = {
-    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
-    as(resultFields: _*)
+
+    inputType match {
+      case c: CompositeType[T] =>
+        val resultFields = c.getFieldNames.map(UnresolvedFieldReference)
+        as(resultFields: _*)
+      case _ =>
+        throw new IllegalArgumentException("" +
+          "Please specify a field name with 'as' to convert an atomic type 
dataset to a table ")
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 86bb7c0..5c3ba44 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -72,7 +72,7 @@ package object table extends ImplicitExpressionConversions {
   }
 
   implicit def dataSet2DataSetConversions[T](set: DataSet[T]): 
DataSetConversions[T] = {
-    new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]])
+    new DataSetConversions[T](set, set.getType)
   }
 
   implicit def table2RowDataSet(

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index 62c87a1..030d577 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -37,7 +37,7 @@ import scala.collection.JavaConversions._
 
 object TypeConverter {
 
-  val DEFAULT_ROW_TYPE = new 
RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]]
+  val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq(), 
Seq()).asInstanceOf[TypeInformation[Any]]
 
   def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo 
match {
     case BOOLEAN_TYPE_INFO => BOOLEAN
@@ -132,17 +132,16 @@ object TypeConverter {
 
           // POJO type expected
           case pt: PojoTypeInfo[_] =>
-            logicalFieldTypes.zipWithIndex foreach {
-              case (fieldTypeInfo, i) =>
-                val fieldName = logicalFieldNames(i)
-                val index = pt.getFieldIndex(fieldName)
-                if (index < 0) {
-                  throw new TableException(s"POJO does not define field name: 
$fieldName")
+            logicalFieldNames.zip(logicalFieldTypes) foreach {
+              case (fName, fType) =>
+                val pojoIdx = pt.getFieldIndex(fName)
+                if (pojoIdx < 0) {
+                  throw new TableException(s"POJO does not define field name: 
$fName")
                 }
-                val expectedTypeInfo = pt.getTypeAt(i)
-                if (fieldTypeInfo != expectedTypeInfo) {
+                val expectedTypeInfo = pt.getTypeAt(pojoIdx)
+                if (fType != expectedTypeInfo) {
                   throw new TableException(s"Result field does not match 
expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
+                    s"Expected: $expectedTypeInfo; Actual: $fType")
                 }
             }
 
@@ -172,7 +171,7 @@ object TypeConverter {
 
       // Row is expected, create the arity for it
       case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
-        new RowTypeInfo(logicalFieldTypes)
+        new RowTypeInfo(logicalFieldTypes, logicalFieldNames)
 
       // no physical type
       // determine type based on logical fields and configuration parameters
@@ -180,7 +179,7 @@ object TypeConverter {
         // no need for efficient types -> use Row
         // we cannot use efficient types if row arity > tuple arity or nullable
         if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY 
|| nullable) {
-          new RowTypeInfo(logicalFieldTypes)
+          new RowTypeInfo(logicalFieldTypes, logicalFieldNames)
         }
         // use efficient type tuple or atomic type
         else {

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 9a9bf99..d3416ee 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -91,7 +91,7 @@ class DataSetAggregate(
     .map(n => TypeConverter.sqlTypeToTypeInfo(n))
     .toArray
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val rowTypeInfo = new RowTypeInfo(fieldTypes, 
rowType.getFieldNames.asScala)
     val mappedInput = inputDS.map(aggregateResult._1)
     val groupReduceFunction = aggregateResult._2
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 0763f34..43c097e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -28,39 +28,39 @@ import org.apache.calcite.tools.RelBuilder.{AggCall, 
GroupKey}
 import org.apache.calcite.util.NlsString
 import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator}
 import RexNodeTranslator.{toRexNode, extractAggCalls}
-import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.expressions.{Naming, 
UnresolvedFieldReference, Expression}
 import org.apache.flink.api.table.parser.ExpressionParser
 
 import scala.collection.JavaConverters._
 
 case class BaseTable(
-    private[flink] val relNode: RelNode,
-    private[flink] val relBuilder: RelBuilder)
+  private[flink] val relNode: RelNode,
+  private[flink] val relBuilder: RelBuilder)
 
 /**
- * The abstraction for writing Table API programs. Similar to how the batch 
and streaming APIs
- * have [[org.apache.flink.api.scala.DataSet]] and
- * [[org.apache.flink.streaming.api.scala.DataStream]].
- *
- * Use the methods of [[Table]] to transform data. Use
- * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] 
back to a DataSet
- * or DataStream.
- *
- * When using Scala a [[Table]] can also be converted using implicit 
conversions.
- *
- * Example:
- *
- * {{{
- *   val table = set.toTable('a, 'b)
- *   ...
- *   val table2 = ...
- *   val set = table2.toDataSet[MyType]
- * }}}
- *
- * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either 
take arguments
- * in a Scala DSL or as an expression String. Please refer to the 
documentation for the expression
- * syntax.
- */
+  * The abstraction for writing Table API programs. Similar to how the batch 
and streaming APIs
+  * have [[org.apache.flink.api.scala.DataSet]] and
+  * [[org.apache.flink.streaming.api.scala.DataStream]].
+  *
+  * Use the methods of [[Table]] to transform data. Use
+  * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a 
[[Table]] back to a DataSet
+  * or DataStream.
+  *
+  * When using Scala a [[Table]] can also be converted using implicit 
conversions.
+  *
+  * Example:
+  *
+  * {{{
+  *   val table = set.toTable('a, 'b)
+  *   ...
+  *   val table2 = ...
+  *   val set = table2.toDataSet[MyType]
+  * }}}
+  *
+  * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either 
take arguments
+  * in a Scala DSL or as an expression String. Please refer to the 
documentation for the expression
+  * syntax.
+  */
 class Table(
   private[flink] override val relNode: RelNode,
   private[flink] override val relBuilder: RelBuilder)
@@ -68,15 +68,15 @@ class Table(
 {
 
   /**
-   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
-   * can contain complex expressions and aggregations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select('key, 'value.avg + " The average" as 'average, 
'other.substring(0, 10))
-   * }}}
-   */
+    * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
+    * can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.select('key, 'value.avg + " The average" as 'average, 
'other.substring(0, 10))
+    * }}}
+    */
   def select(fields: Expression*): Table = {
 
     relBuilder.push(relNode)
@@ -101,90 +101,95 @@ class Table(
       .map(toRexNode(_, relBuilder))
 
     relBuilder.project(exprs.toIterable.asJava)
-    var projected = relBuilder.build()
+    val projected = relBuilder.build()
 
     if(relNode == projected) {
       // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
       //   Add a projection ourselves (will be automatically removed by 
translation rules).
-      val names = exprs.map{ e =>
-        e.getKind match {
-          case SqlKind.AS =>
-            e.asInstanceOf[RexCall].getOperands.get(1)
-              .asInstanceOf[RexLiteral].getValue
-              .asInstanceOf[NlsString].getValue
-          case SqlKind.INPUT_REF =>
-            
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
-          case _ =>
-            throw new PlanGenException("Unexpected expression type 
encountered.")
-        }
-
-      }
-
-      projected = LogicalProject.create(relNode, exprs.toList.asJava, 
names.toList.asJava)
+      new Table(createRenamingProject(exprs), relBuilder)
+    } else {
+      new Table(projected, relBuilder)
     }
 
-    new Table(projected, relBuilder)
   }
 
   /**
-   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
-   * can contain complex expressions and aggregations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select("key, value.avg + " The average" as average, 
other.substring(0, 10)")
-   * }}}
-   */
+    * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
+    * can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.select("key, value.avg + " The average" as average, 
other.substring(0, 10)")
+    * }}}
+    */
   def select(fields: String): Table = {
     val fieldExprs = ExpressionParser.parseExpressionList(fields)
     select(fieldExprs: _*)
   }
 
   /**
-   * Renames the fields of the expression result. Use this to disambiguate 
fields before
-   * joining to operations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.as('a, 'b)
-   * }}}
-   */
+    * Renames the fields of the expression result. Use this to disambiguate 
fields before
+    * joining to operations.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.as('a, 'b)
+    * }}}
+    */
   def as(fields: Expression*): Table = {
 
+    val curNames = relNode.getRowType.getFieldNames.asScala
+
+    // validate that AS has only field references
+    if (! fields.forall( _.isInstanceOf[UnresolvedFieldReference] )) {
+      throw new IllegalArgumentException("All expressions must be field 
references.")
+    }
+    // validate that we have not more field references than fields
+    if ( fields.length > curNames.size) {
+      throw new IllegalArgumentException("More field references than fields.")
+    }
+
+    val curFields = curNames.map(new UnresolvedFieldReference(_))
+
+    val renamings = fields.zip(curFields).map {
+      case (newName, oldName) => new Naming(oldName, newName.name)
+    }
+    val remaining = curFields.drop(fields.size)
+
     relBuilder.push(relNode)
-    val expressions = fields.map(toRexNode(_, relBuilder)).toIterable.asJava
-    val names = fields.map(_.name).toIterable.asJava
-    relBuilder.project(expressions, names)
-    new Table(relBuilder.build(), relBuilder)
+
+    val exprs = (renamings ++ remaining).map(toRexNode(_, relBuilder))
+
+    new Table(createRenamingProject(exprs), relBuilder)
   }
 
   /**
-   * Renames the fields of the expression result. Use this to disambiguate 
fields before
-   * joining to operations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.as("a, b")
-   * }}}
-   */
+    * Renames the fields of the expression result. Use this to disambiguate 
fields before
+    * joining to operations.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.as("a, b")
+    * }}}
+    */
   def as(fields: String): Table = {
     val fieldExprs = ExpressionParser.parseExpressionList(fields)
     as(fieldExprs: _*)
   }
 
   /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter('name === "Fred")
-   * }}}
-   */
+    * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.filter('name === "Fred")
+    * }}}
+    */
   def filter(predicate: Expression): Table = {
 
     relBuilder.push(relNode)
@@ -194,58 +199,58 @@ class Table(
   }
 
   /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter("name = 'Fred'")
-   * }}}
-   */
+    * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.filter("name = 'Fred'")
+    * }}}
+    */
   def filter(predicate: String): Table = {
     val predicateExpr = ExpressionParser.parseExpression(predicate)
     filter(predicateExpr)
   }
 
   /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.where('name === "Fred")
-   * }}}
-   */
+    * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.where('name === "Fred")
+    * }}}
+    */
   def where(predicate: Expression): Table = {
     filter(predicate)
   }
 
   /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.where("name = 'Fred'")
-   * }}}
-   */
+    * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.where("name = 'Fred'")
+    * }}}
+    */
   def where(predicate: String): Table = {
     filter(predicate)
   }
 
   /**
-   * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
-   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.groupBy('key).select('key, 'value.avg)
-   * }}}
-   */
+    * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+    * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.groupBy('key).select('key, 'value.avg)
+    * }}}
+    */
   def groupBy(fields: Expression*): GroupedTable = {
 
     relBuilder.push(relNode)
@@ -256,29 +261,29 @@ class Table(
   }
 
   /**
-   * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
-   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.groupBy("key").select("key, value.avg")
-   * }}}
-   */
+    * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+    * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.groupBy("key").select("key, value.avg")
+    * }}}
+    */
   def groupBy(fields: String): GroupedTable = {
     val fieldsExpr = ExpressionParser.parseExpressionList(fields)
     groupBy(fieldsExpr: _*)
   }
 
   /**
-   * Removes duplicate values and returns only distinct (different) values.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select("key, value").distinct()
-   * }}}
-   */
+    * Removes duplicate values and returns only distinct (different) values.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.select("key, value").distinct()
+    * }}}
+    */
   def distinct(): Table = {
     relBuilder.push(relNode)
     relBuilder.distinct()
@@ -286,16 +291,16 @@ class Table(
   }
 
   /**
-   * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
-   * operations must not overlap, use [[as]] to rename fields if necessary. 
You can use
-   * where and select clauses after a join to further specify the behaviour of 
the join.
-   *
-   * Example:
-   *
-   * {{{
-   *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
-   * }}}
-   */
+    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two 
joined
+    * operations must not overlap, use [[as]] to rename fields if necessary. 
You can use
+    * where and select clauses after a join to further specify the behaviour 
of the join.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
+    * }}}
+    */
   def join(right: Table): Table = {
 
     // check that join inputs do not have overlapping field names
@@ -314,15 +319,15 @@ class Table(
   }
 
   /**
-   * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two 
union operations
-   * must fully overlap.
-   *
-   * Example:
-   *
-   * {{{
-   *   left.unionAll(right)
-   * }}}
-   */
+    * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two 
union operations
+    * must fully overlap.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.unionAll(right)
+    * }}}
+    */
   def unionAll(right: Table): Table = {
 
     val leftRowType: List[RelDataTypeField] = 
relNode.getRowType.getFieldList.asScala.toList
@@ -370,12 +375,32 @@ class Table(
   }
 
   def explain(): String = explain(false)
+
+  private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = {
+
+    val names = exprs.map{ e =>
+      e.getKind match {
+        case SqlKind.AS =>
+          e.asInstanceOf[RexCall].getOperands.get(1)
+            .asInstanceOf[RexLiteral].getValue
+            .asInstanceOf[NlsString].getValue
+        case SqlKind.INPUT_REF =>
+          
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
+        case _ =>
+          throw new PlanGenException("Unexpected expression type encountered.")
+      }
+
+    }
+
+    LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava)
+  }
+
 }
 
 class GroupedTable(
-    private[flink] override val relNode: RelNode,
-    private[flink] override val relBuilder: RelBuilder,
-    private[flink] val groupKey: GroupKey) extends BaseTable(relNode, 
relBuilder) {
+  private[flink] override val relNode: RelNode,
+  private[flink] override val relBuilder: RelBuilder,
+  private[flink] val groupKey: GroupKey) extends BaseTable(relNode, 
relBuilder) {
 
   /**
     * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
@@ -404,7 +429,7 @@ class GroupedTable(
 
     // get selection expressions
     val exprs: List[RexNode] = try {
-       extractedAggCalls
+      extractedAggCalls
         .map(_._1)
         .map(toRexNode(_, relBuilder))
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
index 522c7f3..81c3836 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
@@ -30,14 +30,25 @@ import org.apache.flink.api.table.Row
 /**
  * TypeInformation for [[Row]].
  */
-class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]])
+class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]], fieldNames: Seq[String])
   extends CaseClassTypeInfo[Row](
     classOf[Row],
     Array(),
     fieldTypes,
-    for (i <- fieldTypes.indices) yield "f" + i)
+    fieldNames)
 {
 
+  if (fieldTypes.length != fieldNames.length) {
+    throw new IllegalArgumentException("Number of field types and names is 
different.")
+  }
+  if (fieldNames.length != fieldNames.toSet.size) {
+    throw new IllegalArgumentException("Field names are not unique.")
+  }
+
+  def this(fieldTypes: Seq[TypeInformation[_]]) = {
+    this(fieldTypes, for (i <- fieldTypes.indices) yield "f" + i)
+  }
+
   /**
    * Temporary variable for directly passing orders to comparators.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 30598c4..e26bc32 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -44,15 +44,13 @@ import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.table.plan.PlanGenException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import scala.NotImplementedError;
-
 import java.util.List;
 
 @RunWith(Parameterized.class)
@@ -160,9 +158,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       // Calcite does not eagerly check type compatibility
-       @Ignore
-       @Test(expected = ExpressionException.class)
+       @Test(expected = PlanGenException.class)
        public void testNonWorkingDataTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index 628cbef..097339e 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -49,8 +49,9 @@ public class AsITCase extends TableProgramsTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
+               Table table = tableEnv
+                       .fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a, b, c")
+                       .select("a, b, c");
 
                DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
                List<Row> results = ds.collect();
@@ -69,8 +70,9 @@ public class AsITCase extends TableProgramsTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
+               Table table = tableEnv
+                       .fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a, b, c")
+                       .select("a, b, c");
 
                TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, 
String>>(
                        BasicTypeInfo.INT_TYPE_INFO,
@@ -99,8 +101,9 @@ public class AsITCase extends TableProgramsTestBase {
                data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
                data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
 
-               Table table =
-                               tableEnv.fromDataSet(env.fromCollection(data), 
"a, b, c, d");
+               Table table = tableEnv
+                       .fromDataSet(env.fromCollection(data), "a, b, c, d")
+                       .select("a, b, c, d");
 
                DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, 
SmallPojo2.class);
                List<SmallPojo2> results = ds.collect();
@@ -118,12 +121,13 @@ public class AsITCase extends TableProgramsTestBase {
                data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
                data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
 
-               Table table =
-                       tableEnv.fromDataSet(env.fromCollection(data),
+               Table table = tableEnv
+                       .fromDataSet(env.fromCollection(data),
                                "department AS a, " +
                                "age AS b, " +
                                "salary AS c, " +
-                               "name AS d");
+                               "name AS d")
+                       .select("a, b, c, d");
 
                DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
                List<Row> results = ds.collect();
@@ -144,12 +148,13 @@ public class AsITCase extends TableProgramsTestBase {
                data.add(new PrivateSmallPojo("Anna", 56, 10000.00, 
"Engineering"));
                data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
 
-               Table table =
-                       tableEnv.fromDataSet(env.fromCollection(data),
+               Table table = tableEnv
+                       .fromDataSet(env.fromCollection(data),
                                "department AS a, " +
                                "age AS b, " +
                                "salary AS c, " +
-                               "name AS d");
+                               "name AS d")
+                       .select("a, b, c, d");
 
                DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
                List<Row> results = ds.collect();
@@ -170,12 +175,13 @@ public class AsITCase extends TableProgramsTestBase {
                data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
                data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
 
-               Table table =
-                       tableEnv.fromDataSet(env.fromCollection(data),
+               Table table = tableEnv
+                       .fromDataSet(env.fromCollection(data),
                                "department AS a, " +
                                "age AS b, " +
                                "salary AS c, " +
-                               "name AS d");
+                               "name AS d")
+                       .select("a, b, c, d");
 
                DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, 
SmallPojo2.class);
                List<SmallPojo2> results = ds.collect();
@@ -196,12 +202,13 @@ public class AsITCase extends TableProgramsTestBase {
                data.add(new PrivateSmallPojo("Anna", 56, 10000.00, 
"Engineering"));
                data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
 
-               Table table =
-                       tableEnv.fromDataSet(env.fromCollection(data),
+               Table table = tableEnv
+                       .fromDataSet(env.fromCollection(data),
                                "department AS a, " +
                                "age AS b, " +
                                "salary AS c, " +
-                               "name AS d");
+                               "name AS d")
+                       .select("a, b, c, d");
 
                DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, 
PrivateSmallPojo2.class);
                List<PrivateSmallPojo2> results = ds.collect();
@@ -217,13 +224,7 @@ public class AsITCase extends TableProgramsTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
+               tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a, b");
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -231,13 +232,7 @@ public class AsITCase extends TableProgramsTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
+               tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a, b, c, d");
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -245,13 +240,7 @@ public class AsITCase extends TableProgramsTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
+               tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a, b, b");
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -259,13 +248,7 @@ public class AsITCase extends TableProgramsTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
+               tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a + 1, b, c");
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -273,14 +256,7 @@ public class AsITCase extends TableProgramsTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = getJavaTableEnvironment();
 
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," +
-                                               " c");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
+               tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a as foo, b,  c");
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
index b8a16b3..91d1976 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.NotImplementedError;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
index 993638d..b8b84f6 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableException;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,7 +38,7 @@ public class PojoGroupingITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test(expected = TableException.class)
+       @Test
        public void testPojoGrouping() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 6b8f984..707ee66 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -80,9 +80,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       // Calcite does eagerly check expression types
-       @Ignore
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = CodeGenException.class)
        public void testNonWorkingSubstring1() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -97,14 +95,10 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                                .select("a.substring(0, b)");
 
                DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
+               resultSet.collect();
        }
 
-       // Calcite does eagerly check expression types
-       @Ignore
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = CodeGenException.class)
        public void testNonWorkingSubstring2() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -119,8 +113,6 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                                .select("a.substring(b, 15)");
 
                DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
+               resultSet.collect();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
index 55f1bde..a893d4d 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
@@ -74,7 +74,9 @@ public class PageRankTableITCase extends JavaProgramTestBase {
                        tConfigs.add(config);
                }
 
-               // TODO: Disabling test until Table API is operational again
+               // TODO: Enable test again once:
+               //   1) complex types (long[]) can be shipped through Table API
+               //   2) abs function is available
 //             return toParameterList(tConfigs);
                return new LinkedList<>();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 00261c0..ad9a66d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
+import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.{ExpressionException, Row}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
@@ -50,10 +51,6 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).toTable
       .select('foo.avg)
-
-    val expected = ""
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -124,17 +121,14 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
-  @Ignore // Calcite does not eagerly check types
-  @Test(expected = classOf[ExpressionException])
+  @Test(expected = classOf[PlanGenException])
   def testNonWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("Hello", 1)).toTable
       .select('_1.sum)
 
-    val expected = ""
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    t.collect()
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -143,10 +137,6 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("Hello", 1)).toTable
       .select('_2.sum.sum)
-
-    val expected = ""
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index f2120ef..9f9a3b4 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -43,7 +43,9 @@ class AsITCase(
   def testAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .as('a, 'b, 'c)
+      .select('a, 'b, 'c)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke 
Skywalker\n" +
@@ -64,7 +66,9 @@ class AsITCase(
       SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
       SomeCaseClass("Lucy", 42, 6000.00, "HR"))
 
-    val t =  env.fromCollection(data).as('a, 'b, 'c, 'd)
+    val t =  env.fromCollection(data)
+      .as('a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
 
     val expected: String =
       "Peter,28,4000.0,Sales\n" +
@@ -83,7 +87,9 @@ class AsITCase(
       SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
       SomeCaseClass("Lucy", 42, 6000.00, "HR"))
 
-    val t =  env.fromCollection(data).as('a, 'b, 'c, 'd)
+    val t =  env.fromCollection(data)
+      .as('a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
 
     val expected: String =
       "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
@@ -97,33 +103,24 @@ class AsITCase(
   def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .as('a, 'b)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .as('a, 'b, 'c, 'd)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .as('a, 'b, 'b)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -131,11 +128,9 @@ class AsITCase(
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .as('a, 'b, 'c)
+      .as('a + 1, 'b, 'c)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -143,11 +138,9 @@ class AsITCase(
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .as('a, 'b, 'c)
+      .as('a as 'foo, 'b, 'c)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 8e11f76..111525f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -66,9 +66,8 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO support advanced String operations
 
-  @Ignore
+  @Ignore // TODO support advanced String operations
   @Test
   def testAutoCastToString(): Unit = {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 0febd4d..ae8ebef 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -74,20 +74,18 @@ class FilterITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO test broken does not test Table API
-  @Ignore
   @Test
   def testFilterOnStringTupleField(): Unit = {
     /*
      * Test filter on String tuple field.
      */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val filterDs = ds.filter( _._3.contains("world") )
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val filterDs = ds.filter( 'c.like("%world%") )
 
-//    val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are 
you?)\n"
-//    val results = filterDs.toDataSet[Row](getConfig).collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+    val results = filterDs.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -152,27 +150,21 @@ class FilterITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // These two not yet done, but are planned
-  // TODO test broken does not test Table API
-  @Ignore
   @Test
   def testFilterBasicType(): Unit = {
     /*
      * Test filter on basic type
      */
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.getStringDataSet(env)
 
-    val filterDs = ds.filter( _.startsWith("H") )
+    val filterDs = ds.as('a).filter( 'a.like("H%") )
 
-//    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how 
are you?\n"
-//    val results = filterDs.toDataSet[Row](getConfig).collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how 
are you?\n"
+    val results = filterDs.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO test broken does not test Table API
-  @Ignore
   @Test
   def testFilterOnCustomType(): Unit = {
     /*
@@ -180,11 +172,12 @@ class FilterITCase(
      */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.filter( _.myString.contains("a") )
+    val filterDs = ds.as('myInt as 'i, 'myLong as 'l, 'myString as 's)
+      .filter( 's.like("%a%") )
 
-//    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + 
"3,5,Luke Skywalker\n"
-//    val results = filterDs.toDataSet[Row](getConfig).collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + 
"3,5,Luke Skywalker\n"
+    val results = filterDs.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index c4fc346..7977547 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.codegen.CodeGenException
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.junit._
@@ -54,32 +55,24 @@ class StringExpressionsITCase(mode: TestExecutionMode) 
extends MultipleProgramsT
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // Calcite does eagerly check expression types
-  @Ignore
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[CodeGenException])
   def testNonWorkingSubstring1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
       .select('a.substring(0, 'b))
 
-    val expected = "AAA\nBB"
     val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // Calcite does eagerly check expression types
-  @Ignore
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[CodeGenException])
   def testNonWorkingSubstring2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
       .select('a.substring('b, 15))
 
-    val expected = "AAA\nBB"
     val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 

Reply via email to