http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
deleted file mode 100644
index 4927fb2..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import java.util
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import 
org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
-FlatFieldDescriptor}
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer}
-
-/**
- * A TypeInformation that is used to rename fields of an underlying 
CompositeType. This
- * allows the system to translate "as" Table API operations to a 
[[RenameOperator]]
- * that does not get translated to a runtime operator.
- */
-class RenamingProxyTypeInfo[T](
-    val tpe: CompositeType[T],
-    val fieldNames: Array[String])
-  extends CompositeType[T](tpe.getTypeClass) {
-
-  def getUnderlyingType: CompositeType[T] = tpe
-
-  if (tpe.getArity != fieldNames.length) {
-    throw new IllegalArgumentException(s"Number of field names 
'${fieldNames.mkString(",")}' and " +
-      s"number of fields in underlying type $tpe do not match.")
-  }
-
-  if (fieldNames.toSet.size != fieldNames.length) {
-    throw new IllegalArgumentException(s"New field names must be unique. " +
-      s"Names: ${fieldNames.mkString(",")}.")
-  }
-
-  override def getFieldIndex(fieldName: String): Int = {
-    val result = fieldNames.indexOf(fieldName)
-    if (result != fieldNames.lastIndexOf(fieldName)) {
-      -2
-    } else {
-      result
-    }
-  }
-  override def getFieldNames: Array[String] = fieldNames
-
-  override def isBasicType: Boolean = tpe.isBasicType
-
-  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[T] =
-    tpe.createSerializer(executionConfig)
-
-  override def getArity: Int = tpe.getArity
-
-  override def isKeyType: Boolean = tpe.isKeyType
-
-  override def getTypeClass: Class[T] = tpe.getTypeClass
-
-  override def getGenericParameters: java.util.List[TypeInformation[_]] = 
tpe.getGenericParameters
-
-  override def isTupleType: Boolean = tpe.isTupleType
-
-  override def toString = {
-    s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
-      s"fields: ${fieldNames.mkString(", ")})"
-  }
-
-  override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
-
-  override def getTotalFields: Int = tpe.getTotalFields
-
-  override def createComparator(
-        logicalKeyFields: Array[Int],
-        orders: Array[Boolean],
-        logicalFieldOffset: Int,
-        executionConfig: ExecutionConfig) =
-    tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, 
executionConfig)
-
-  override def getFlatFields(
-      fieldExpression: String,
-      offset: Int,
-      result: util.List[FlatFieldDescriptor]): Unit = {
-
-    // split of head of field expression
-    val (head, tail) = if (fieldExpression.indexOf('.') >= 0) {
-      fieldExpression.splitAt(fieldExpression.indexOf('.'))
-    } else {
-      (fieldExpression, "")
-    }
-
-    // replace proxy field name by original field name of wrapped type
-    val headPos = getFieldIndex(head)
-    if (headPos >= 0) {
-      val resolvedHead = tpe.getFieldNames()(headPos)
-      val resolvedFieldExpr = resolvedHead + tail
-
-      // get flat fields with wrapped field name
-      tpe.getFlatFields(resolvedFieldExpr, offset, result)
-    }
-    else {
-      throw new IllegalArgumentException(s"Invalid field expression: 
${fieldExpression}")
-    }
-  }
-
-  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
-    tpe.getTypeAt(fieldExpression)
-  }
-
-  override protected def createTypeComparatorBuilder(): 
TypeComparatorBuilder[T] = {
-    throw new RuntimeException("This method should never be called because 
createComparator is " +
-      "overwritten.")
-  }
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case renamingProxy: RenamingProxyTypeInfo[_] =>
-        renamingProxy.canEqual(this) &&
-        tpe.equals(renamingProxy.tpe) &&
-        fieldNames.sameElements(renamingProxy.fieldNames)
-      case _ => false
-    }
-  }
-
-  override def hashCode(): Int = {
-    31 * tpe.hashCode() + 
util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]])
-  }
-
-  override def canEqual(obj: Any): Boolean = {
-    obj.isInstanceOf[RenamingProxyTypeInfo[_]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
index 39fc1d8..522c7f3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
@@ -20,32 +20,29 @@ package org.apache.flink.api.table.typeinfo
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import 
org.apache.flink.api.common.typeutils.CompositeType.TypeComparatorBuilder
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.TypeComparator
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.expressions.Expression
 
 import scala.collection.mutable.ArrayBuffer
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.table.Row
 
 /**
  * TypeInformation for [[Row]].
  */
-class RowTypeInfo(
-    fieldTypes: Seq[TypeInformation[_]],
-    fieldNames: Seq[String])
-  extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, 
fieldNames) {
+class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]])
+  extends CaseClassTypeInfo[Row](
+    classOf[Row],
+    Array(),
+    fieldTypes,
+    for (i <- fieldTypes.indices) yield "f" + i)
+{
 
   /**
    * Temporary variable for directly passing orders to comparators.
    */
   var comparatorOrders: Option[Array[Boolean]] = None
 
-  def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), 
fields.map(_.name))
-
-  if (fieldNames.toSet.size != fieldNames.size) {
-    throw new IllegalArgumentException("Field names must be unique.")
-  }
-
   override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[Row] = {
     val fieldSerializers: Array[TypeSerializer[Any]] = new 
Array[TypeSerializer[Any]](getArity)
     for (i <- 0 until getArity) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index a3d5edc..dd51b14 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -57,7 +57,6 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class AggregationsITCase extends MultipleProgramsTestBase {
 
-
        public AggregationsITCase(TestExecutionMode mode){
                super(mode);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index 0f35d75..492596c 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.table.test;
 
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.codegen.CodeGenException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
@@ -35,7 +36,6 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class AsITCase extends MultipleProgramsTestBase {
 
-
        public AsITCase(TestExecutionMode mode){
                super(mode);
        }
@@ -60,7 +60,7 @@ public class AsITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test
+       @Test(expected = CodeGenException.class)
        public void testAsFromPojo() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index cde78ce..957c093 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -23,15 +23,18 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.codegen.CodeGenException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 import scala.NotImplementedError;
 
 import java.util.List;
@@ -39,11 +42,11 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class CastingITCase extends MultipleProgramsTestBase {
 
-
        public CastingITCase(TestExecutionMode mode){
                super(mode);
        }
 
+       @Ignore
        @Test(expected = NotImplementedError.class)
        public void testNumericAutocastInArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -64,7 +67,7 @@ public class CastingITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testNumericAutocastInComparison() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -86,7 +89,7 @@ public class CastingITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test(expected = CodeGenException.class)
        public void testCastFromString() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -106,7 +109,7 @@ public class CastingITCase extends MultipleProgramsTestBase 
{
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test(expected = CodeGenException.class)
        public void testCastDateFromString() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -128,7 +131,7 @@ public class CastingITCase extends MultipleProgramsTestBase 
{
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test(expected = CodeGenException.class)
        public void testCastDateToStringAndLong() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 2a17087..51f666e 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.test.TableProgramsTestBase;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,17 +36,16 @@ import scala.NotImplementedError;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class ExpressionsITCase extends MultipleProgramsTestBase {
+public class ExpressionsITCase extends TableProgramsTestBase {
 
-
-       public ExpressionsITCase(TestExecutionMode mode){
-               super(mode);
+       public ExpressionsITCase(TestExecutionMode mode, TableConfigMode 
configMode) {
+               super(mode, configMode);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSource<Tuple2<Integer, Integer>> input =
                                env.fromElements(new Tuple2<>(5, 10));
@@ -62,10 +62,10 @@ public class ExpressionsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testLogic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSource<Tuple2<Integer, Boolean>> input =
                                env.fromElements(new Tuple2<>(5, true));
@@ -82,10 +82,10 @@ public class ExpressionsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testComparisons() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSource<Tuple3<Integer, Integer, Integer>> input =
                                env.fromElements(new Tuple3<>(5, 5, 4));

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index cd08879..f48be48 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.test.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
@@ -34,17 +35,16 @@ import scala.NotImplementedError;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class FilterITCase extends MultipleProgramsTestBase {
+public class FilterITCase extends TableProgramsTestBase {
 
-
-       public FilterITCase(TestExecutionMode mode){
-               super(mode);
+       public FilterITCase(TestExecutionMode mode, TableConfigMode configMode){
+               super(mode, configMode);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testAllRejectingFilter() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -63,7 +63,7 @@ public class FilterITCase extends MultipleProgramsTestBase {
        @Test
        public void testAllPassingFilter() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -85,10 +85,10 @@ public class FilterITCase extends MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testFilterOnIntegerTupleField() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -106,10 +106,10 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testNotEquals() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -127,10 +127,10 @@ public class FilterITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testIntegerBiggerThan128() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> input = 
env.fromElements(new Tuple3<>(300, 1L, "Hello"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
index d5dc56a..524dd4e 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -36,7 +36,6 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 
-
        public GroupedAggregationsITCase(TestExecutionMode mode){
                super(mode);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
index 2e1f4a7..95213ee 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
@@ -39,7 +39,6 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class JoinITCase extends MultipleProgramsTestBase {
 
-
        public JoinITCase(TestExecutionMode mode) {
                super(mode);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
index 5ef0235..993638d 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -26,11 +26,11 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableException;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.NotImplementedError;
 
 @RunWith(Parameterized.class)
 public class PojoGroupingITCase extends MultipleProgramsTestBase {
@@ -39,7 +39,7 @@ public class PojoGroupingITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test(expected = TableException.class)
        public void testPojoGrouping() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index a66219c..ada0e06 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.test.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
@@ -34,17 +35,16 @@ import scala.NotImplementedError;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class SelectITCase extends MultipleProgramsTestBase {
+public class SelectITCase extends TableProgramsTestBase {
 
-
-       public SelectITCase(TestExecutionMode mode) {
-               super(mode);
+       public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) 
{
+               super(mode, configMode);
        }
 
        @Test
        public void testSimpleSelectAllWithAs() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -66,10 +66,10 @@ public class SelectITCase extends MultipleProgramsTestBase {
 
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testSimpleSelectWithNaming() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -90,7 +90,7 @@ public class SelectITCase extends MultipleProgramsTestBase {
        @Test(expected = IllegalArgumentException.class)
        public void testAsWithToFewFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -105,7 +105,7 @@ public class SelectITCase extends MultipleProgramsTestBase {
        @Test(expected = IllegalArgumentException.class)
        public void testAsWithToManyFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -120,7 +120,7 @@ public class SelectITCase extends MultipleProgramsTestBase {
        @Test(expected = IllegalArgumentException.class)
        public void testAsWithAmbiguousFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
 
@@ -135,7 +135,7 @@ public class SelectITCase extends MultipleProgramsTestBase {
        @Test(expected = IllegalArgumentException.class)
        public void testOnlyFieldRefInAs() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
 
                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index c4b2f01..315fe9f 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.table.test;
 
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.codegen.CodeGenException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
@@ -29,19 +30,17 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.NotImplementedError;
 
 import java.util.List;
 
 @RunWith(Parameterized.class)
 public class StringExpressionsITCase extends MultipleProgramsTestBase {
 
-
        public StringExpressionsITCase(TestExecutionMode mode) {
                super(mode);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test(expected = CodeGenException.class)
        public void testSubstring() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -61,7 +60,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = NotImplementedError.class)
+       @Test(expected = CodeGenException.class)
        public void testSubstringWithMaxEnd() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
index d6297d9..ec4cd1c 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
@@ -38,7 +38,6 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class UnionITCase extends MultipleProgramsTestBase {
 
-
        public UnionITCase(TestExecutionMode mode) {
                super(mode);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 4a37737..d6a853d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -19,19 +19,17 @@
 package org.apache.flink.api.scala.table.test
 
 import java.util.Date
-
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.Row
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-
 import scala.collection.JavaConverters._
+import org.apache.flink.api.table.codegen.CodeGenException
 
 @RunWith(classOf[Parameterized])
 class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
@@ -49,6 +47,7 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Ignore // gives different types of exceptions for cluster and collection 
modes
   @Test(expected = classOf[NotImplementedError])
   def testNumericAutoCastInArithmetic(): Unit = {
 
@@ -63,7 +62,7 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testNumericAutoCastInComparison(): Unit = {
 
     // don't test everything, just some common cast directions
@@ -79,7 +78,7 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test(expected = classOf[CodeGenException])
   def testCastFromString: Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -106,7 +105,7 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test(expected = classOf[CodeGenException])
   def testCastDateToStringAndLong {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 9f20043..f300547 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -21,12 +21,14 @@ package org.apache.flink.api.scala.table.test
 import java.util.Date
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.Row
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.test.TableProgramsTestBase
+import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -34,9 +36,12 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+class ExpressionsITCase(
+    mode: TestExecutionMode,
+    config: TableConfigMode)
+  extends TableProgramsTestBase(mode, config) {
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -44,11 +49,11 @@ class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
       .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
 
     val expected = "0,10,2,10,1,-5"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testLogic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -56,11 +61,11 @@ class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
       .select('b && true, 'b && false, 'b || false, !'b)
 
     val expected = "true,false,true,false"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testComparisons(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -68,11 +73,13 @@ class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
       .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
 
     val expected = "true,true,false,false,true"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  // advanced functions not supported yet
+  @Ignore
+  @Test
   def testCaseInsensitiveForAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -81,7 +88,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
       .groupBy("a").select("a, a.count As cnt")
 
     val expected = "3,1"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -97,7 +104,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
         
'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
 
     val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 7b1c5de..96b9341 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -18,12 +18,15 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.test.TableProgramsTestBase
+import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -32,7 +35,10 @@ import scala.collection.JavaConverters._
 
 
 @RunWith(classOf[Parameterized])
-class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+class FilterITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testAllRejectingFilter(): Unit = {
@@ -44,9 +50,9 @@ class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( Literal(false) )
 
-//    val expected = "\n"
-//    val results = filterDs.collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "\n"
+    val results = filterDs.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -57,17 +63,19 @@ class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
-//    val filterDs = ds.filter( Literal(true) )
-//    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-//      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + 
"7,4," +
-//      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-//      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-//      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-//      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-//    val results = filterDs.collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val filterDs = ds.filter( Literal(true) )
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = filterDs.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  // TODO test broken does not test Table API
+  @Ignore
   @Test
   def testFilterOnStringTupleField(): Unit = {
     /*
@@ -78,7 +86,7 @@ class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val filterDs = ds.filter( _._3.contains("world") )
 
 //    val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are 
you?)\n"
-//    val results = filterDs.collect()
+//    val results = filterDs.toDataSet[Row](getConfig).collect()
 //    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -92,12 +100,12 @@ class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( 'a % 2 === 0 )
 
-//    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-//      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-//      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-//      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-//    val results = filterDs.collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+    val results = filterDs.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
@@ -118,7 +126,7 @@ class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
   }
 
   // These two not yet done, but are planned
-
+  // TODO test broken does not test Table API
   @Ignore
   @Test
   def testFilterBasicType(): Unit = {
@@ -132,10 +140,11 @@ class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val filterDs = ds.filter( _.startsWith("H") )
 
 //    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how 
are you?\n"
-//    val results = filterDs.collect()
+//    val results = filterDs.toDataSet[Row](getConfig).collect()
 //    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  // TODO test broken does not test Table API
   @Ignore
   @Test
   def testFilterOnCustomType(): Unit = {
@@ -147,7 +156,7 @@ class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val filterDs = ds.filter( _.myString.contains("a") )
 
 //    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + 
"3,5,Luke Skywalker\n"
-//    val results = filterDs.collect()
+//    val results = filterDs.toDataSet[Row](getConfig).collect()
 //    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 3700d67..e181e0b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.test.TableProgramsTestBase
+import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -31,7 +33,10 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+class SelectITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testSimpleSelectAll(): Unit = {
@@ -45,7 +50,7 @@ class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
       "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" +
       "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" +
       "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -61,11 +66,11 @@ class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
       "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" +
       "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" +
       "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testSimpleSelectWithNaming(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -76,7 +81,7 @@ class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + 
"7,4\n" +
       "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" 
+ "15,5\n" +
       "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -87,7 +92,7 @@ class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
 
     val expected = "no"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -98,7 +103,7 @@ class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
 
     val expected = "no"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -109,7 +114,7 @@ class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
 
     val expected = "no"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -121,7 +126,7 @@ class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mod
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
 
     val expected = "no"
-    val results = t.toDataSet[Row].collect()
+    val results = t.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 565f444..f2e9aca 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -26,13 +26,13 @@ import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-
 import scala.collection.JavaConverters._
+import org.apache.flink.api.table.codegen.CodeGenException
 
 @RunWith(classOf[Parameterized])
 class StringExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test(expected = classOf[CodeGenException])
   def testSubstring(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
@@ -43,7 +43,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) 
extends MultipleProgramsT
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test(expected = classOf[CodeGenException])
   def testSubstringWithMaxEnd(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
new file mode 100644
index 0000000..0962f4e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test
+
+import java.util
+
+import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
+import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv}
+import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode.{EFFICIENT,
 NULL}
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+class TableProgramsTestBase(
+    mode: TestExecutionMode,
+    tableConfigMode: TableConfigMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  def getJavaTableEnvironment: JavaTableEnv = {
+    val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv
+    val tableEnv = new JavaTableEnv
+    configure(tableEnv.getConfig)
+    tableEnv
+  }
+
+  def getScalaTableEnvironment: ScalaTableEnv = {
+    val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv
+    val tableEnv = new ScalaTableEnv
+    configure(tableEnv.getConfig)
+    tableEnv
+  }
+
+  def getConfig: TableConfig = {
+    val config = new TableConfig()
+    configure(config)
+    config
+  }
+
+  def configure(config: TableConfig): Unit = {
+    tableConfigMode match {
+      case NULL =>
+        config.setNullCheck(true)
+      case EFFICIENT =>
+        config.setEfficientTypeUsage(true)
+      case _ => // keep default
+    }
+  }
+
+}
+
+object TableProgramsTestBase {
+  sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: 
Boolean }
+  object TableConfigMode {
+    case object DEFAULT extends TableConfigMode {
+      val nullCheck = false; val efficientTypes = false
+    }
+    case object NULL extends TableConfigMode {
+      val nullCheck = true; val efficientTypes = false
+    }
+    case object EFFICIENT extends TableConfigMode {
+      val nullCheck = false; val efficientTypes = true
+    }
+  }
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def tableConfigs(): util.Collection[Array[java.lang.Object]] = {
+    Seq(
+      // TODO more tests in cluster mode?
+      Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT),
+      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT),
+      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL),
+      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT)
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
deleted file mode 100644
index ef616a9..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.util.TestLogger
-import org.junit.Test
-import org.scalatest.junit.JUnitSuiteLike
-
-class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike {
-
-  @Test
-  def testRenamingProxyTypeEquality(): Unit = {
-    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
-      .asInstanceOf[CompositeType[TestPojo]]
-
-    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("someInt", "aString", "doubleArray"))
-
-    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("someInt", "aString", "doubleArray"))
-
-    assert(tpeInfo1.equals(tpeInfo2))
-    assert(tpeInfo1.hashCode() == tpeInfo2.hashCode())
-  }
-
-  @Test
-  def testRenamingProxyTypeInequality(): Unit = {
-    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
-      .asInstanceOf[CompositeType[TestPojo]]
-
-    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("someInt", "aString", "doubleArray"))
-
-    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("foobar", "aString", "doubleArray"))
-
-    assert(!tpeInfo1.equals(tpeInfo2))
-  }
-}
-
-final class TestPojo {
-  var someInt: Int = 0
-  private var aString: String = null
-  var doubleArray: Array[Double] = null
-
-  def setaString(aString: String) {
-    this.aString = aString
-  }
-
-  def getaString: String = {
-    return aString
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala
index a58d0b7..11d1a8a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala
@@ -38,8 +38,7 @@ class RowComparatorTest extends ComparatorTestBase[Row] {
         BasicTypeInfo.INT_TYPE_INFO,
         BasicTypeInfo.BOOLEAN_TYPE_INFO,
         BasicTypeInfo.SHORT_TYPE_INFO),
-      TypeExtractor.createTypeInfo(classOf[MyPojo])),
-    Array("f0", "f1", "f2", "f3", "f4"))
+      TypeExtractor.createTypeInfo(classOf[MyPojo])))
 
   val testPojo1 = new MyPojo()
   // TODO we cannot test null here as PojoComparator has no support for null 
keys

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
index 60a02ae..fc000fd 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
@@ -51,7 +51,7 @@ class RowSerializerTest {
   @Test
   def testRowSerializer(): Unit = {
     val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), 
Seq("id", "name"))
+      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
 
     val row1 = new Row(2)
     row1.setField(0, 1)
@@ -85,20 +85,7 @@ class RowSerializerTest {
       BasicTypeInfo.INT_TYPE_INFO,
       BasicTypeInfo.INT_TYPE_INFO,
       BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO), Seq(
-      "id0",
-      "id1",
-      "id2",
-      "id3",
-      "id4",
-      "id5",
-      "id6",
-      "id7",
-      "id8",
-      "id9",
-      "id10",
-      "id11",
-      "name"))
+      BasicTypeInfo.STRING_TYPE_INFO))
 
     val row = new Row(13)
     row.setField(0, 2)
@@ -134,8 +121,7 @@ class RowSerializerTest {
           BasicTypeInfo.INT_TYPE_INFO,
           BasicTypeInfo.BOOLEAN_TYPE_INFO,
           BasicTypeInfo.SHORT_TYPE_INFO),
-        TypeExtractor.createTypeInfo(classOf[MyPojo])),
-      Array("f0", "f1", "f2", "f3", "f4"))
+        TypeExtractor.createTypeInfo(classOf[MyPojo])))
 
     val testPojo1 = new MyPojo()
     testPojo1.name = null

Reply via email to