http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
deleted file mode 100644
index bdebfb1..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table.test;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class AggregationsITCase extends MultipleProgramsTestBase {
-
-
-       public AggregationsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Test
-       public void testAggregationTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table = 
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-               Table result = table.select("f0.sum, f0.min, f0.max, f0.count, 
f0.avg");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "231,1,21,21,11";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAggregationOnNonExistingField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-               Table result =
-                               table.select("foo.avg");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testWorkingAggregationDataTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(
-                                               new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"),
-                                               new Tuple7<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d, "Ciao"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result =
-                               table.select("f0.avg, f1.avg, f2.avg, f3.avg, 
f4.avg, f5.avg, f6.count");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,1,1,1.5,1.5,2";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testAggregationWithArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Float, String>> input =
-                               env.fromElements(
-                                               new Tuple2<>(1f, "Hello"),
-                                               new Tuple2<>(2f, "Ciao"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result =
-                               table.select("(f0 + 2).avg + 2, f1.count + \" 
THE COUNT\"");
-
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "5.5,2 THE COUNT";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testAggregationWithTwoCount() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Float, String>> input =
-                       env.fromElements(
-                               new Tuple2<>(1f, "Hello"),
-                               new Tuple2<>(2f, "Ciao"));
-
-               Table table =
-                       tableEnv.fromDataSet(input);
-
-               Table result =
-                       table.select("f0.count, f1.count");
-
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNonWorkingDataTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<>(1f, "Hello"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result =
-                               table.select("f1.sum");
-
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNoNestedAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<>(1f, "Hello"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result =
-                               table.select("f0.sum.sum");
-
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
deleted file mode 100644
index f6ab54e..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class AsITCase extends MultipleProgramsTestBase {
-
-
-       public AsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Test
-       public void testAs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToFewFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToManyFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithAmbiguousFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithNonFieldReference1() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithNonFieldReference2() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table =
-                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," +
-                                               " c");
-
-               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
deleted file mode 100644
index 7e9e3dc..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class CastingITCase extends MultipleProgramsTestBase {
-
-
-       public CastingITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Test
-       public void testAutoCastToString() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result = table.select(
-                               "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 
'f', f5 + \"d\"");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1b,1s,1i,1L,1.0f,1.0d";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testNumericAutocastInArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result = table.select("f0 + 1, f1 +" +
-                               " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,2,2.0,2.0,2.0";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testNumericAutocastInComparison() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(
-                                               new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"),
-                                               new Tuple7<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d, "Hello"));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a,b,c,d,e,f,g");
-
-               Table result = table
-                               .filter("a > 1 && b > 1 && c > 1L && d > 1.0f 
&& e > 1.0d && f > 1");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,2,2,2.0,2.0,Hello";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testCastFromString() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple3<String, String, String>> input =
-                               env.fromElements(new Tuple3<>("1", "true", 
"2.0"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result = table.select(
-                               "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), 
f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,1,1,2.0,2.0,true\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testCastDateFromString() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple4<String, String, String, String>> input =
-                               env.fromElements(new Tuple4<>("2011-05-03", 
"15:51:36", "2011-05-03 15:51:36.000", "1446473775"));
-
-               Table table =
-                               tableEnv.fromDataSet(input);
-
-               Table result = table
-                               .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS 
f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
-                               .select("f0.cast(STRING), f1.cast(STRING), 
f2.cast(STRING), f3.cast(STRING)");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2011-05-03 00:00:00.000,1970-01-01 
15:51:36.000,2011-05-03 15:51:36.000," +
-                               "1970-01-17 17:47:53.775\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testCastDateToStringAndLong() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<String, String>> input =
-                       env.fromElements(new Tuple2<>("2011-05-03 
15:51:36.000", "1304437896000"));
-
-               Table table =
-                       tableEnv.fromDataSet(input);
-
-               Table result = table
-                       .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1")
-                       .select("f0.cast(STRING), f0.cast(LONG), 
f1.cast(STRING), f1.cast(LONG)");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2011-05-03 
15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
-               compareResultAsText(results, expected);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
deleted file mode 100644
index c9bba62..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class ExpressionsITCase extends MultipleProgramsTestBase {
-
-
-       public ExpressionsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Test
-       public void testArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Integer, Integer>> input =
-                               env.fromElements(new Tuple2<>(5, 10));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b");
-
-               Table result = table.select(
-                               "a - 5, a + 5, a / 2, a * 2, a % 2, -a");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "0,10,2,10,1,-5";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testLogic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Integer, Boolean>> input =
-                               env.fromElements(new Tuple2<>(5, true));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b");
-
-               Table result = table.select(
-                               "b && true, b && false, b || false, !b");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "true,false,true,false";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testComparisons() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple3<Integer, Integer, Integer>> input =
-                               env.fromElements(new Tuple3<>(5, 5, 4));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table.select(
-                               "a > c, a >= b, a < c, a.isNull, a.isNotNull");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "true,true,false,false,true";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testBitwiseOperation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Byte, Byte>> input =
-                               env.fromElements(new Tuple2<>((byte) 3, (byte) 
5));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b");
-
-               Table result = table.select(
-                               "a & b, a | b, a ^ b, ~a");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,7,6,-4";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testBitwiseWithAutocast() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Integer, Byte>> input =
-                               env.fromElements(new Tuple2<>(3, (byte) 5));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b");
-
-               Table result = table.select(
-                               "a & b, a | b, a ^ b, ~a");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,7,6,-4";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testBitwiseWithNonWorkingAutocast() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSource<Tuple2<Float, Byte>> input =
-                               env.fromElements(new Tuple2<>(3.0f, (byte) 5));
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b");
-
-               Table result =
-                               table.select("a & b, a | b, a ^ b, ~a");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
deleted file mode 100644
index 44e0def..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class FilterITCase extends MultipleProgramsTestBase {
-
-
-       public FilterITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Test
-       public void testAllRejectingFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter("false");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testAllPassingFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter("true");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testFilterOnIntegerTupleField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter(" a % 2 = 0 ");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testNotEquals() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .filter("!( a % 2 <> 0 ) ");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testIntegerBiggerThan128() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
env.fromElements(new Tuple3<>(300, 1L, "Hello"));
-
-               Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table.filter("a = 300 ");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "300,1,Hello\n";
-               compareResultAsText(results, expected);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
deleted file mode 100644
index f5c9185..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
-
-
-       public GroupedAggregationsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testGroupingOnNonExistentField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .groupBy("foo").select("a.avg");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testGroupedAggregate() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .groupBy("b").select("b, a.sum");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + 
"5,65\n" + "6,111\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testGroupingKeyForwardIfNotUsed() throws Exception {
-
-               // the grouping key needs to be forwarded to the intermediate 
DataSet, even
-               // if we don't want the key in the output
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                               tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                               .groupBy("b").select("a.sum");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + 
"111\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testGroupNoAggregation() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table table =
-                       tableEnv.fromDataSet(input, "a, b, c");
-
-               Table result = table
-                       .groupBy("b").select("a.sum as d, b").groupBy("b, 
d").select("b");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-
-               String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
-               List<Row> results = ds.collect();
-               compareResultAsText(results, expected);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
deleted file mode 100644
index 428aec5..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class JoinITCase extends MultipleProgramsTestBase {
-
-
-       public JoinITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testJoin() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("b === e").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testJoinWithFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("b === e && b < 
2").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi,Hallo\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testJoinWithMultipleKeys() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.get3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("a === d && b === 
h").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt wie gehts?\n" +
-                               "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I 
am fine.,IJK\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testJoinNonExistingKey() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1.join(in2).where("foo === e").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testJoinWithNonMatchingKeyTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1
-                               .join(in2).where("a === g").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testJoinWithAmbiguousFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
-
-               Table result = in1
-                               .join(in2).where("a === d").select("c, g");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testJoinWithAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table result = in1
-                               .join(in2).where("a === d").select("g.count");
-
-               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "6";
-               compareResultAsText(results, expected);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
deleted file mode 100644
index d61912b..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PojoGroupingITCase extends MultipleProgramsTestBase {
-
-       public PojoGroupingITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testPojoGrouping() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple3<String, Double, String>> data = env.fromElements(
-                       new Tuple3<>("A", 23.0, "Z"),
-                       new Tuple3<>("A", 24.0, "Y"),
-                       new Tuple3<>("B", 1.0, "Z"));
-
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               Table table = tableEnv
-                       .fromDataSet(data, "groupMe, value, name")
-                       .select("groupMe, value, name")
-                       .where("groupMe != 'B'");
-
-               DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, 
MyPojo.class);
-
-               DataSet<MyPojo> result = myPojos.groupBy("groupMe")
-                       .sortGroup("value", Order.DESCENDING)
-                       .first(1);
-               List<MyPojo> resultList = result.collect();
-
-               compareResultAsText(resultList, "A,24.0,Y");
-       }
-
-       public static class MyPojo implements Serializable {
-               private static final long serialVersionUID = 
8741918940120107213L;
-
-               public String groupMe;
-               public double value;
-               public String name;
-
-               public MyPojo() {
-                       // for serialization
-               }
-
-               public MyPojo(String groupMe, double value, String name) {
-                       this.groupMe = groupMe;
-                       this.value = value;
-                       this.name = name;
-               }
-
-               @Override
-               public String toString() {
-                       return groupMe + "," + value + "," + name;
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
deleted file mode 100644
index 9e42f53..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class SelectITCase extends MultipleProgramsTestBase {
-
-
-       public SelectITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testSimpleSelectAllWithAs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-               Table result = in
-                               .select("a, b, c");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-               compareResultAsText(results, expected);
-
-       }
-
-       @Test
-       public void testSimpleSelectWithNaming() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table in = tableEnv.fromDataSet(ds);
-
-               Table result = in
-                               .select("f0 as a, f1 as b")
-                               .select("a, b");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
-                               "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-                               "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToFewFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table in = tableEnv.fromDataSet(ds, "a, b");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = " sorry dude ";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToManyFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table in = tableEnv.fromDataSet(ds, "a, b, c, d");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = " sorry dude ";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithAmbiguousFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table in = tableEnv.fromDataSet(ds, "a, b, c, b");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = " today's not your day ";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testOnlyFieldRefInAs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               Table in = tableEnv.fromDataSet(ds, "a, b as c, d");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "sorry bro";
-               compareResultAsText(results, expected);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
deleted file mode 100644
index e73b5a2..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.table.Table;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-
-public class SqlExplainITCase {
-
-       private static String testFilePath = 
SqlExplainITCase.class.getResource("/").getFile();
-
-       public static class WC {
-               public String word;
-               public int count;
-
-               // Public constructor to make it a Flink POJO
-               public WC() {}
-
-               public WC(int count, String word) {
-                       this.word = word;
-                       this.count = count;
-               }
-       }
-
-       @Test
-       public void testGroupByWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<WC> input = env.fromElements(
-                               new WC(1,"d"),
-                               new WC(2,"d"),
-                               new WC(3,"d"));
-
-               Table table = tableEnv.fromDataSet(input).as("a, b");
-
-               String result = table
-                               .filter("a % 2 = 0")
-                               .explain();
-               String source = new Scanner(new File(testFilePath +
-                               
"../../src/test/scala/resources/testFilter0.out"))
-                               .useDelimiter("\\A").next();
-               assertEquals(result, source);
-       }
-
-       @Test
-       public void testGroupByWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<WC> input = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(2, "d"),
-                               new WC(3, "d"));
-
-               Table table = tableEnv.fromDataSet(input).as("a, b");
-
-               String result = table
-                               .filter("a % 2 = 0")
-                               .explain(true);
-               String source = new Scanner(new File(testFilePath +
-                               
"../../src/test/scala/resources/testFilter1.out"))
-                               .useDelimiter("\\A").next();
-               assertEquals(result, source);
-       }
-
-       @Test
-       public void testJoinWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<WC> input1 = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(1, "d"),
-                               new WC(1, "d"));
-
-               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
-
-               DataSet<WC> input2 = env.fromElements(
-                               new WC(1,"d"),
-                               new WC(1,"d"),
-                               new WC(1,"d"));
-
-               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
-
-               String result = table1
-                               .join(table2)
-                               .where("b = d")
-                               .select("a, c")
-                               .explain();
-               String source = new Scanner(new File(testFilePath +
-                               "../../src/test/scala/resources/testJoin0.out"))
-                               .useDelimiter("\\A").next();
-               assertEquals(result, source);
-       }
-
-       @Test
-       public void testJoinWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<WC> input1 = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(1, "d"),
-                               new WC(1, "d"));
-
-               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
-
-               DataSet<WC> input2 = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(1, "d"),
-                               new WC(1, "d"));
-
-               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
-
-               String result = table1
-                               .join(table2)
-                               .where("b = d")
-                               .select("a, c")
-                               .explain(true);
-               String source = new Scanner(new File(testFilePath +
-                               "../../src/test/scala/resources/testJoin1.out"))
-                               .useDelimiter("\\A").next();
-               assertEquals(result, source);
-       }
-
-       @Test
-       public void testUnionWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<WC> input1 = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(1, "d"),
-                               new WC(1, "d"));
-
-               Table table1 = tableEnv.fromDataSet(input1);
-
-               DataSet<WC> input2 = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(1, "d"),
-                               new WC(1, "d"));
-
-               Table table2 = tableEnv.fromDataSet(input2);
-
-               String result = table1
-                               .unionAll(table2)
-                               .explain();
-               String source = new Scanner(new File(testFilePath +
-                               
"../../src/test/scala/resources/testUnion0.out"))
-                               .useDelimiter("\\A").next();
-               assertEquals(result, source);
-       }
-
-       @Test
-       public void testUnionWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<WC> input1 = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(1, "d"),
-                               new WC(1, "d"));
-
-               Table table1 = tableEnv.fromDataSet(input1);
-
-               DataSet<WC> input2 = env.fromElements(
-                               new WC(1, "d"),
-                               new WC(1, "d"),
-                               new WC(1, "d"));
-
-               Table table2 = tableEnv.fromDataSet(input2);
-
-               String result = table1
-                               .unionAll(table2)
-                               .explain(true);
-               String source = new Scanner(new File(testFilePath +
-                               
"../../src/test/scala/resources/testUnion1.out"))
-                               .useDelimiter("\\A").next();
-               assertEquals(result, source);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
deleted file mode 100644
index 7936f8c..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class StringExpressionsITCase extends MultipleProgramsTestBase {
-
-
-       public StringExpressionsITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testSubstring() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-                               new Tuple2<>("AAAA", 2),
-                               new Tuple2<>("BBBB", 1));
-
-               Table in = tableEnv.fromDataSet(ds, "a, b");
-
-               Table result = in
-                               .select("a.substring(0, b)");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "AA\nB";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testSubstringWithMaxEnd() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-                               new Tuple2<>("ABCD", 2),
-                               new Tuple2<>("ABCD", 1));
-
-               Table in = tableEnv.fromDataSet(ds, "a, b");
-
-               Table result = in
-                               .select("a.substring(b)");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "CD\nBCD";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNonWorkingSubstring1() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple2<String, Float>> ds = env.fromElements(
-                               new Tuple2<>("ABCD", 2.0f),
-                               new Tuple2<>("ABCD", 1.0f));
-
-               Table in = tableEnv.fromDataSet(ds, "a, b");
-
-               Table result = in
-                               .select("a.substring(0, b)");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNonWorkingSubstring2() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple2<String, String>> ds = env.fromElements(
-                               new Tuple2<>("ABCD", "a"),
-                               new Tuple2<>("ABCD", "b"));
-
-               Table in = tableEnv.fromDataSet(ds, "a, b");
-
-               Table result = in
-                               .select("a.substring(b, 15)");
-
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
deleted file mode 100644
index 7fd3a28..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.ExpressionException;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class UnionITCase extends MultipleProgramsTestBase {
-
-
-       public UnionITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testUnion() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, c");
-
-               Table selected = in1.unionAll(in2).select("c");
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-
-               String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" 
+ "Hello\n" + "Hello world\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testUnionWithFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, 
e").select("a, b, c");
-
-               Table selected = in1.unionAll(in2).where("b < 2").select("c");
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-
-               String expected = "Hi\n" + "Hallo\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testUnionFieldsNameNotOverlap1() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               Table selected = in1.unionAll(in2);
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testUnionFieldsNameNotOverlap2() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, 
e").select("a, b, c");
-
-               Table selected = in1.unionAll(in2);
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-
-               String expected = "";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testUnionWithAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, 
e").select("a, b, c");
-
-               Table selected = in1.unionAll(in2).select("c.count");
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-
-               String expected = "18";
-               compareResultAsText(results, expected);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
deleted file mode 100644
index 1816614..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.api.scala.table.test;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.scala.PageRankTable;
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class PageRankTableITCase extends JavaProgramTestBase {
-
-       private static int NUM_PROGRAMS = 2;
-
-       private int curProgId = config.getInteger("ProgramId", -1);
-
-       private String verticesPath;
-       private String edgesPath;
-       private String resultPath;
-       private String expectedResult;
-
-       public PageRankTableITCase(Configuration config) {
-               super(config);
-       }
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-               verticesPath = createTempFile("vertices.txt", 
PageRankData.VERTICES);
-               edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = runProgram(curProgId);
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 
0.01);
-       }
-
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
-
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
-
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-
-               return toParameterList(tConfigs);
-       }
-
-
-       public String runProgram(int progId) throws Exception {
-
-               switch(progId) {
-               case 1: {
-                       PageRankTable.main(new String[]{verticesPath, 
edgesPath, resultPath, PageRankData
-                                       .NUM_VERTICES + "", "3"});
-                       return PageRankData.RANKS_AFTER_3_ITERATIONS;
-               }
-               case 2: {
-                       // start with a very high number of iteration such that 
the dynamic convergence criterion must handle termination
-                       PageRankTable.main(new String[] {verticesPath, 
edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
-                       return 
PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
-               }
-
-               default:
-                       throw new IllegalArgumentException("Invalid program 
id");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
deleted file mode 100644
index acb7ded..0000000
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.ExpressionException
-import org.junit.Test
-
-class TypeExceptionTest {
-
-  @Test(expected = classOf[ExpressionException])
-  def testInnerCaseClassException(): Unit = {
-    case class WC(word: String, count: Int)
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-    val expr = input.toTable // this should fail
-    val result = expr
-      .groupBy('word)
-      .select('word, 'count.sum as 'count)
-      .toDataSet[WC]
-
-    result.print()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
deleted file mode 100644
index ee5d9e8..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{Row, ExpressionException}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testAggregationTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
-      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg).toDataSet[Row]
-    val results = ds.collect()
-    val expected = "231,1,21,21,11"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAggregationOnNonExistingField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
-      .select('foo.avg).toDataSet[Row]
-    val expected = ""
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
-      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
-      .toDataSet[Row]
-    val expected = "1,1,1,1,1.5,1.5,2"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationWithArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
-      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT").toDataSet[Row]
-    val expected = "5.5,2 THE COUNT"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationWithTwoCount(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
-      .select('_1.count, '_2.count).toDataSet[Row]
-    val expected = "2,2"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("Hello", 1)).toTable
-      .select('_1.sum).toDataSet[Row]
-    val expected = ""
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNoNestedAggregations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("Hello", 1)).toTable
-      .select('_2.sum.sum).toDataSet[Row]
-    val expected = ""
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
deleted file mode 100644
index 59573eb..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{Row, ExpressionException}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) 
{
-
-  @Test
-  def testAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'c).toDataSet[Row]
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 
'd).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'b).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 
'b).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 
'b).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

Reply via email to