http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
deleted file mode 100644
index c177184..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import java.util.Date
-
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testAutoCastToString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new 
Date(0))).toTable
-      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + 
"d", '_7 + "Date")
-      .toDataSet[Row]
-    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNumericAutoCastInArithmetic(): Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
-      .toDataSet[Row]
-    val expected = "2,2,2,2.0,2.0,2.0"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNumericAutoCastInComparison(): Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
-      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
-      .toDataSet[Row]
-    val expected = "2,2,2,2,2.0,2.0"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCastFromString: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("1", "true", "2.0",
-        "2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
-      .toTable
-      .select(
-        '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
-        '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO),
-        '_1.cast(BasicTypeInfo.INT_TYPE_INFO),
-        '_1.cast(BasicTypeInfo.LONG_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
-        
'_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        
'_5.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        
'_6.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        
'_7.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
-    .toDataSet[Row]
-    val expected = "1,1,1,1,2.0,2.0,true," +
-      "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 
15:51:36.000," +
-      "1970-01-17 17:47:53.775\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCastDateToStringAndLong {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
-    val result = ds.toTable
-      .select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0),
-        '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1))
-      .select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        'f0.cast(BasicTypeInfo.LONG_TYPE_INFO),
-        'f1.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        'f1.cast(BasicTypeInfo.LONG_TYPE_INFO))
-      .toDataSet[Row]
-      .collect
-    val expected = "2011-05-03 15:51:36.000,1304437896000," +
-      "2011-05-03 15:51:36.000,1304437896000\n"
-    TestBaseUtils.compareResultAsText(result.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
deleted file mode 100644
index 017cbf1..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import java.util.Date
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, ExpressionException}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, 10)).as('a, 'b)
-      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a).toDataSet[Row]
-    val expected = "0,10,2,10,1,-5"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testLogic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, true)).as('a, 'b)
-      .select('b && true, 'b && false, 'b || false, !'b).toDataSet[Row]
-    val expected = "true,false,true,false"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testComparisons(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
-      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 
'a.isNotNull).toDataSet[Row]
-    val expected = "true,true,false,false,true"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testBitwiseOperations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-    val expected = "1,7,6,-4"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testBitwiseWithAutocast(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row]
-    val expected = "1,7,6,-4"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testBitwiseWithNonWorkingAutocast(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3.0, 5)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] 
-    val expected = "1,7,6,-4"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCaseInsensitiveForAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
-      .groupBy("a").select("a, a.count As cnt").toDataSet[Row]
-    val expected = "3,1"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDateLiteral(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((0L, "test")).as('a, 'b)
-      .select('a,
-        Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        
'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
-      .toDataSet[Row]
-    val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
deleted file mode 100644
index c0b86f8..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-    val expected = "\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnStringTupleField(): Unit = {
-    /*
-     * Test filter on String tuple field.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val filterDs = ds.filter( _._3.contains("world") )
-    val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  // These two not yet done, but are planned
-
-  @Ignore
-  @Test
-  def testFilterBasicType(): Unit = {
-    /*
-     * Test filter on basic type
-     */
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.getStringDataSet(env)
-
-    val filterDs = ds.filter( _.startsWith("H") )
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how 
are you?\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Ignore
-  @Test
-  def testFilterOnCustomType(): Unit = {
-    /*
-     * Test filter on custom type
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.filter( _.myString.contains("a") )
-    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + 
"3,5,Luke Skywalker\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
deleted file mode 100644
index fb76507..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{Row, ExpressionException}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GroupedAggreagationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test(expected = classOf[ExpressionException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('_foo)
-      .select('a.avg).toDataSet[Row]
-    val expected = ""
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregate(): Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum).toDataSet[Row]
-    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + 
"6,111\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed(): Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum).toDataSet[Row]
-    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSQLStyleAggregations(): Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .select(
-        """Sum( a) as a1, a.sum as a2,
-          |Min (a) as b1, a.min as b2,
-          |Max (a ) as c1, a.max as c2,
-          |Avg ( a ) as d1, a.avg as d2,
-          |Count(a) as e1, a.count as e2
-        """.stripMargin).toDataSet[Row]
-    val expected = "231,231,1,1,21,21,11,11,21,21"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupNoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-      .as('a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
-      .toDataSet[Row]
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
deleted file mode 100644
index e12c9d6..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{Row, ExpressionException}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testJoin(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g).toDataSet[Row]
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo 
Welt\n"
-    val results = joinDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithFilter(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 
'g).toDataSet[Row]
-    val expected = "Hi,Hallo\n"
-    val results = joinDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 
'g).toDataSet[Row]
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo 
Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-    val results = joinDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinNonExistingKey(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g).toDataSet[Row]
-    val expected = ""
-    val results = joinDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinWithNonMatchingKeyTypes(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g).toDataSet[Row]
-    val expected = ""
-    val results = joinDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
-
-    val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g).toDataSet[Row]
-    val expected = ""
-    val results = joinDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('a === 'd).select('g.count).toDataSet[Row]
-    val expected = "6"
-    val results = joinDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
deleted file mode 100644
index fa3f283..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{Row, ExpressionException}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, 
'_3)
-      .toDataSet[Row]
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'c).select('a, 'b, 'c)
-      .toDataSet[Row]
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable
-      .select('_1 as 'a, '_2 as 'b)
-      .select('a, 'b).toDataSet[Row]
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + 
"7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" 
+ "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 
'd).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'b).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Test(expected = classOf[ExpressionException])
-  def testOnlyFieldRefInAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 
'd).toDataSet[Row]
-    val expected = "no"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
deleted file mode 100644
index bead02f..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-
-import org.junit._
-import org.junit.Assert.assertEquals
-
-case class WC(count: Int, word: String)
-
-class SqlExplainITCase {
-
-  val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile
-
-  @Test
-  def testGroupByWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
-    val result = expr.filter("a % 2 = 0").explain()
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testGroupByWithExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
-    val result = expr.filter("a % 2 = 0").explain(true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testJoinWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
-    val result = expr1.join(expr2).where("b = d").select("a, c").explain()
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testJoinWithExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
-    val result = expr1.join(expr2).where("b = d").select("a, c").explain(true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testUnionWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
-    val result = expr1.unionAll(expr2).explain()
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testUnionWithExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
-    val result = expr1.unionAll(expr2).explain(true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString
-    assertEquals(result, source)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
deleted file mode 100644
index 10bc8fd..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{Row, ExpressionException}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class StringExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testSubstring(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
-      .select('a.substring(0, 'b)).toDataSet[Row]
-    val expected = "AA\nB"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSubstringWithMaxEnd(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
-      .select('a.substring('b)).toDataSet[Row]
-    val expected = "CD\nBCD"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
-      .select('a.substring(0, 'b)).toDataSet[Row]
-    val expected = "AAA\nBB"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
-      .select('a.substring('b, 15)).toDataSet[Row]
-    val expected = "AAA\nBB"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
deleted file mode 100644
index a47d4b7..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{ExpressionException, Row}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class UnionITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testUnion(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + 
"Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
-
-    val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
-
-    val results = joinDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hallo\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testUnionFieldsNameNotOverlap1(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = ""
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testUnionFieldsNameNotOverlap2(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 
'e).select('a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = ""
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnionWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "18"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
deleted file mode 100644
index ef616a9..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.util.TestLogger
-import org.junit.Test
-import org.scalatest.junit.JUnitSuiteLike
-
-class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike {
-
-  @Test
-  def testRenamingProxyTypeEquality(): Unit = {
-    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
-      .asInstanceOf[CompositeType[TestPojo]]
-
-    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("someInt", "aString", "doubleArray"))
-
-    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("someInt", "aString", "doubleArray"))
-
-    assert(tpeInfo1.equals(tpeInfo2))
-    assert(tpeInfo1.hashCode() == tpeInfo2.hashCode())
-  }
-
-  @Test
-  def testRenamingProxyTypeInequality(): Unit = {
-    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
-      .asInstanceOf[CompositeType[TestPojo]]
-
-    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("someInt", "aString", "doubleArray"))
-
-    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
-      pojoTypeInfo1,
-      Array("foobar", "aString", "doubleArray"))
-
-    assert(!tpeInfo1.equals(tpeInfo2))
-  }
-}
-
-final class TestPojo {
-  var someInt: Int = 0
-  private var aString: String = null
-  var doubleArray: Array[Double] = null
-
-  def setaString(aString: String) {
-    this.aString = aString
-  }
-
-  def getaString: String = {
-    return aString
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter0.out 
b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
deleted file mode 100644
index 062fc90..0000000
--- a/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
+++ /dev/null
@@ -1,28 +0,0 @@
-== Abstract Syntax Tree ==
-Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
-
-== Physical Execution Plan ==
-Stage 3 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-
-               Stage 1 : Filter
-                       content : ('a * 2) === 0
-                       ship_strategy : Forward
-                       exchange_mode : PIPELINED
-                       driver_strategy : FlatMap
-                       Partitioning : RANDOM_PARTITIONED
-
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
-                               ship_strategy : Forward
-                               exchange_mode : PIPELINED
-                               Partitioning : RANDOM_PARTITIONED
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter1.out 
b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
deleted file mode 100644
index 83378e6..0000000
--- a/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
+++ /dev/null
@@ -1,96 +0,0 @@
-== Abstract Syntax Tree ==
-Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
-
-== Physical Execution Plan ==
-Stage 3 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-       Partitioning Order : (none)
-       Uniqueness : not unique
-       Order : (none)
-       Grouping : not grouped
-       Uniqueness : not unique
-       Est. Output Size : (unknown)
-       Est. Cardinality : (unknown)
-       Network : 0.0
-       Disk I/O : 0.0
-       CPU : 0.0
-       Cumulative Network : 0.0
-       Cumulative Disk I/O : 0.0
-       Cumulative CPU : 0.0
-       Output Size (bytes) : (none)
-       Output Cardinality : (none)
-       Avg. Output Record Size (bytes) : (none)
-       Filter Factor : (none)
-
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-               Partitioning Order : (none)
-               Uniqueness : not unique
-               Order : (none)
-               Grouping : not grouped
-               Uniqueness : not unique
-               Est. Output Size : (unknown)
-               Est. Cardinality : (unknown)
-               Network : 0.0
-               Disk I/O : 0.0
-               CPU : 0.0
-               Cumulative Network : 0.0
-               Cumulative Disk I/O : 0.0
-               Cumulative CPU : 0.0
-               Output Size (bytes) : (none)
-               Output Cardinality : (none)
-               Avg. Output Record Size (bytes) : (none)
-               Filter Factor : (none)
-
-               Stage 1 : Filter
-                       content : ('a * 2) === 0
-                       ship_strategy : Forward
-                       exchange_mode : PIPELINED
-                       driver_strategy : FlatMap
-                       Partitioning : RANDOM_PARTITIONED
-                       Partitioning Order : (none)
-                       Uniqueness : not unique
-                       Order : (none)
-                       Grouping : not grouped
-                       Uniqueness : not unique
-                       Est. Output Size : 0.0
-                       Est. Cardinality : 0.0
-                       Network : 0.0
-                       Disk I/O : 0.0
-                       CPU : 0.0
-                       Cumulative Network : 0.0
-                       Cumulative Disk I/O : 0.0
-                       Cumulative CPU : 0.0
-                       Output Size (bytes) : (none)
-                       Output Cardinality : (none)
-                       Avg. Output Record Size (bytes) : (none)
-                       Filter Factor : (none)
-
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
-                               ship_strategy : Forward
-                               exchange_mode : PIPELINED
-                               Partitioning : RANDOM_PARTITIONED
-                               Partitioning Order : (none)
-                               Uniqueness : not unique
-                               Order : (none)
-                               Grouping : not grouped
-                               Uniqueness : not unique
-                               Est. Output Size : 0.0
-                               Est. Cardinality : 0.0
-                               Network : 0.0
-                               Disk I/O : 0.0
-                               CPU : 0.0
-                               Cumulative Network : 0.0
-                               Cumulative Disk I/O : 0.0
-                               Cumulative CPU : 0.0
-                               Output Size (bytes) : (none)
-                               Output Cardinality : (none)
-                               Avg. Output Record Size (bytes) : (none)
-                               Filter Factor : (none)
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin0.out 
b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
deleted file mode 100644
index e6e30be..0000000
--- a/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
+++ /dev/null
@@ -1,39 +0,0 @@
-== Abstract Syntax Tree ==
-Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
-
-== Physical Execution Plan ==
-Stage 3 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-
-Stage 5 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-
-       Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-
-               Stage 1 : Join
-                       content : Join at 'b === 'd
-                       ship_strategy : Hash Partition on [1]
-                       exchange_mode : PIPELINED
-                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
-                       Partitioning : RANDOM_PARTITIONED
-
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
-                               ship_strategy : Forward
-                               exchange_mode : PIPELINED
-                               Partitioning : RANDOM_PARTITIONED
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin1.out 
b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
deleted file mode 100644
index a8f05dd..0000000
--- a/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
+++ /dev/null
@@ -1,141 +0,0 @@
-== Abstract Syntax Tree ==
-Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
-
-== Physical Execution Plan ==
-Stage 3 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-       Partitioning Order : (none)
-       Uniqueness : not unique
-       Order : (none)
-       Grouping : not grouped
-       Uniqueness : not unique
-       Est. Output Size : (unknown)
-       Est. Cardinality : (unknown)
-       Network : 0.0
-       Disk I/O : 0.0
-       CPU : 0.0
-       Cumulative Network : 0.0
-       Cumulative Disk I/O : 0.0
-       Cumulative CPU : 0.0
-       Output Size (bytes) : (none)
-       Output Cardinality : (none)
-       Avg. Output Record Size (bytes) : (none)
-       Filter Factor : (none)
-
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-               Partitioning Order : (none)
-               Uniqueness : not unique
-               Order : (none)
-               Grouping : not grouped
-               Uniqueness : not unique
-               Est. Output Size : (unknown)
-               Est. Cardinality : (unknown)
-               Network : 0.0
-               Disk I/O : 0.0
-               CPU : 0.0
-               Cumulative Network : 0.0
-               Cumulative Disk I/O : 0.0
-               Cumulative CPU : 0.0
-               Output Size (bytes) : (none)
-               Output Cardinality : (none)
-               Avg. Output Record Size (bytes) : (none)
-               Filter Factor : (none)
-
-Stage 5 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-       Partitioning Order : (none)
-       Uniqueness : not unique
-       Order : (none)
-       Grouping : not grouped
-       Uniqueness : not unique
-       Est. Output Size : (unknown)
-       Est. Cardinality : (unknown)
-       Network : 0.0
-       Disk I/O : 0.0
-       CPU : 0.0
-       Cumulative Network : 0.0
-       Cumulative Disk I/O : 0.0
-       Cumulative CPU : 0.0
-       Output Size (bytes) : (none)
-       Output Cardinality : (none)
-       Avg. Output Record Size (bytes) : (none)
-       Filter Factor : (none)
-
-       Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-               Partitioning Order : (none)
-               Uniqueness : not unique
-               Order : (none)
-               Grouping : not grouped
-               Uniqueness : not unique
-               Est. Output Size : (unknown)
-               Est. Cardinality : (unknown)
-               Network : 0.0
-               Disk I/O : 0.0
-               CPU : 0.0
-               Cumulative Network : 0.0
-               Cumulative Disk I/O : 0.0
-               Cumulative CPU : 0.0
-               Output Size (bytes) : (none)
-               Output Cardinality : (none)
-               Avg. Output Record Size (bytes) : (none)
-               Filter Factor : (none)
-
-               Stage 1 : Join
-                       content : Join at 'b === 'd
-                       ship_strategy : Hash Partition on [1]
-                       exchange_mode : PIPELINED
-                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
-                       Partitioning : RANDOM_PARTITIONED
-                       Partitioning Order : (none)
-                       Uniqueness : not unique
-                       Order : (none)
-                       Grouping : not grouped
-                       Uniqueness : not unique
-                       Est. Output Size : (unknown)
-                       Est. Cardinality : (unknown)
-                       Network : (unknown)
-                       Disk I/O : (unknown)
-                       CPU : (unknown)
-                       Cumulative Network : (unknown)
-                       Cumulative Disk I/O : (unknown)
-                       Cumulative CPU : (unknown)
-                       Output Size (bytes) : (none)
-                       Output Cardinality : (none)
-                       Avg. Output Record Size (bytes) : (none)
-                       Filter Factor : (none)
-
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
-                               ship_strategy : Forward
-                               exchange_mode : PIPELINED
-                               Partitioning : RANDOM_PARTITIONED
-                               Partitioning Order : (none)
-                               Uniqueness : not unique
-                               Order : (none)
-                               Grouping : not grouped
-                               Uniqueness : not unique
-                               Est. Output Size : (unknown)
-                               Est. Cardinality : (unknown)
-                               Network : 0.0
-                               Disk I/O : 0.0
-                               CPU : 0.0
-                               Cumulative Network : (unknown)
-                               Cumulative Disk I/O : (unknown)
-                               Cumulative CPU : (unknown)
-                               Output Size (bytes) : (none)
-                               Output Cardinality : (none)
-                               Avg. Output Record Size (bytes) : (none)
-                               Filter Factor : (none)
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion0.out 
b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
deleted file mode 100644
index db9d2f9..0000000
--- a/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
+++ /dev/null
@@ -1,38 +0,0 @@
-== Abstract Syntax Tree ==
-Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
-
-== Physical Execution Plan ==
-Stage 3 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-
-Stage 5 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-
-       Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-
-               Stage 1 : Union
-                       content : 
-                       ship_strategy : Redistribute
-                       exchange_mode : PIPELINED
-                       Partitioning : RANDOM_PARTITIONED
-
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
-                               ship_strategy : Forward
-                               exchange_mode : PIPELINED
-                               Partitioning : RANDOM_PARTITIONED
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion1.out 
b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
deleted file mode 100644
index 8dc1e53..0000000
--- a/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
+++ /dev/null
@@ -1,140 +0,0 @@
-== Abstract Syntax Tree ==
-Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
-
-== Physical Execution Plan ==
-Stage 3 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-       Partitioning Order : (none)
-       Uniqueness : not unique
-       Order : (none)
-       Grouping : not grouped
-       Uniqueness : not unique
-       Est. Output Size : (unknown)
-       Est. Cardinality : (unknown)
-       Network : 0.0
-       Disk I/O : 0.0
-       CPU : 0.0
-       Cumulative Network : 0.0
-       Cumulative Disk I/O : 0.0
-       Cumulative CPU : 0.0
-       Output Size (bytes) : (none)
-       Output Cardinality : (none)
-       Avg. Output Record Size (bytes) : (none)
-       Filter Factor : (none)
-
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-               Partitioning Order : (none)
-               Uniqueness : not unique
-               Order : (none)
-               Grouping : not grouped
-               Uniqueness : not unique
-               Est. Output Size : (unknown)
-               Est. Cardinality : (unknown)
-               Network : 0.0
-               Disk I/O : 0.0
-               CPU : 0.0
-               Cumulative Network : 0.0
-               Cumulative Disk I/O : 0.0
-               Cumulative CPU : 0.0
-               Output Size (bytes) : (none)
-               Output Cardinality : (none)
-               Avg. Output Record Size (bytes) : (none)
-               Filter Factor : (none)
-
-Stage 5 : Data Source
-       content : collect elements with CollectionInputFormat
-       Partitioning : RANDOM_PARTITIONED
-       Partitioning Order : (none)
-       Uniqueness : not unique
-       Order : (none)
-       Grouping : not grouped
-       Uniqueness : not unique
-       Est. Output Size : (unknown)
-       Est. Cardinality : (unknown)
-       Network : 0.0
-       Disk I/O : 0.0
-       CPU : 0.0
-       Cumulative Network : 0.0
-       Cumulative Disk I/O : 0.0
-       Cumulative CPU : 0.0
-       Output Size (bytes) : (none)
-       Output Cardinality : (none)
-       Avg. Output Record Size (bytes) : (none)
-       Filter Factor : (none)
-
-       Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
-               ship_strategy : Forward
-               exchange_mode : PIPELINED
-               driver_strategy : Map
-               Partitioning : RANDOM_PARTITIONED
-               Partitioning Order : (none)
-               Uniqueness : not unique
-               Order : (none)
-               Grouping : not grouped
-               Uniqueness : not unique
-               Est. Output Size : (unknown)
-               Est. Cardinality : (unknown)
-               Network : 0.0
-               Disk I/O : 0.0
-               CPU : 0.0
-               Cumulative Network : 0.0
-               Cumulative Disk I/O : 0.0
-               Cumulative CPU : 0.0
-               Output Size (bytes) : (none)
-               Output Cardinality : (none)
-               Avg. Output Record Size (bytes) : (none)
-               Filter Factor : (none)
-
-               Stage 1 : Union
-                       content : 
-                       ship_strategy : Redistribute
-                       exchange_mode : PIPELINED
-                       Partitioning : RANDOM_PARTITIONED
-                       Partitioning Order : (none)
-                       Uniqueness : not unique
-                       Order : (none)
-                       Grouping : not grouped
-                       Uniqueness : not unique
-                       Est. Output Size : (unknown)
-                       Est. Cardinality : (unknown)
-                       Network : 0.0
-                       Disk I/O : 0.0
-                       CPU : 0.0
-                       Cumulative Network : (unknown)
-                       Cumulative Disk I/O : 0.0
-                       Cumulative CPU : 0.0
-                       Output Size (bytes) : (none)
-                       Output Cardinality : (none)
-                       Avg. Output Record Size (bytes) : (none)
-                       Filter Factor : (none)
-
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
-                               ship_strategy : Forward
-                               exchange_mode : PIPELINED
-                               Partitioning : RANDOM_PARTITIONED
-                               Partitioning Order : (none)
-                               Uniqueness : not unique
-                               Order : (none)
-                               Grouping : not grouped
-                               Uniqueness : not unique
-                               Est. Output Size : (unknown)
-                               Est. Cardinality : (unknown)
-                               Network : 0.0
-                               Disk I/O : 0.0
-                               CPU : 0.0
-                               Cumulative Network : (unknown)
-                               Cumulative Disk I/O : 0.0
-                               Cumulative CPU : 0.0
-                               Output Size (bytes) : (none)
-                               Output Cardinality : (none)
-                               Avg. Output Record Size (bytes) : (none)
-                               Filter Factor : (none)
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/pom.xml b/flink-staging/flink-tez/pom.xml
deleted file mode 100644
index 6083f7f..0000000
--- a/flink-staging/flink-tez/pom.xml
+++ /dev/null
@@ -1,224 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-staging</artifactId>
-        <version>1.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-
-    <artifactId>flink-tez</artifactId>
-    <name>flink-tez</name>
-
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-optimizer</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-examples-batch</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-api</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-common</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-dag</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-runtime-library</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-client</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-codec</groupId>
-            <artifactId>commons-codec</artifactId>
-            <version>1.4</version>
-        </dependency>
-
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4.1</version>
-                <configuration>
-                    <descriptors>
-                        
<descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor>
-                    </descriptors>
-                    <archive>
-                        <manifest>
-                            
<mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass>
-                        </manifest>
-                    </archive>
-                </configuration>
-                <executions>
-                    <execution>
-                        <!--<id>assemble-all</id>-->
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml 
b/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
deleted file mode 100644
index 504761a..0000000
--- a/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
-          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-          
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
-       <id>flink-fat-jar</id>
-       <formats>
-               <format>jar</format>
-       </formats>
-       <includeBaseDirectory>false</includeBaseDirectory>
-       <dependencySets>
-               <dependencySet>
-                       <outputDirectory>/</outputDirectory>
-                       <useProjectArtifact>true</useProjectArtifact>
-                       <!--<excludes>
-                               <exclude>org.apache.flink:*</exclude>
-                       </excludes>-->
-                       <useTransitiveFiltering>true</useTransitiveFiltering>
-                       <unpack>true</unpack>
-                       <scope>runtime</scope>
-            <excludes>
-                <exclude>com.google.guava:guava</exclude>
-            </excludes>
-               </dependencySet>
-       </dependencySets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
deleted file mode 100644
index 4c091e5..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-
-public class LocalTezEnvironment extends ExecutionEnvironment {
-
-       TezExecutor executor;
-       Optimizer compiler;
-
-       private LocalTezEnvironment() {
-               compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new Configuration());
-               executor = new TezExecutor(compiler, this.getParallelism());
-       }
-
-       public static LocalTezEnvironment create() {
-               return new LocalTezEnvironment();
-       }
-
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               TezConfiguration tezConf = new TezConfiguration();
-               tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-               tezConf.set("fs.defaultFS", "file:///");
-               
tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
true);
-               executor.setConfiguration(tezConf);
-               return executor.executePlan(createProgramPlan(jobName));
-       }
-
-       @Override
-       public String getExecutionPlan() throws Exception {
-               Plan p = createProgramPlan(null, false);
-               return executor.getOptimizerPlanAsJSON(p);
-       }
-
-       public void setAsContext() {
-               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                       @Override
-                       public ExecutionEnvironment 
createExecutionEnvironment() {
-                               return LocalTezEnvironment.this;
-                       }
-               };
-               initializeContextEnvironment(factory);
-       }
-
-       @Override
-       public void startNewSession() throws Exception {
-               throw new UnsupportedOperationException("Session management is 
not implemented in Flink on Tez.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
deleted file mode 100644
index 131937e..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ClassUtil;
-import org.apache.hadoop.util.ToolRunner;
-
-
-public class RemoteTezEnvironment extends ExecutionEnvironment {
-
-       private static final Log LOG = 
LogFactory.getLog(RemoteTezEnvironment.class);
-       
-       private Optimizer compiler;
-       private TezExecutor executor;
-       private Path jarPath = null;
-       
-
-       public void registerMainClass (Class mainClass) {
-               jarPath = new Path(ClassUtil.findContainingJar(mainClass));
-               LOG.info ("Registering main class " + mainClass.getName() + " 
contained in " + jarPath.toString());
-       }
-
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               TezExecutorTool tool = new TezExecutorTool(executor, 
createProgramPlan());
-               if (jarPath != null) {
-                       tool.setJobJar(jarPath);
-               }
-               try {
-                       int executionResult = ToolRunner.run(new 
Configuration(), tool, new String[]{jobName});
-               }
-               finally {
-                       return new JobExecutionResult(null, -1, null);
-               }
-
-       }
-
-       @Override
-       public String getExecutionPlan() throws Exception {
-               Plan p = createProgramPlan(null, false);
-               return executor.getOptimizerPlanAsJSON(p);
-       }
-
-       public static RemoteTezEnvironment create () {
-               return new RemoteTezEnvironment();
-       }
-
-       public RemoteTezEnvironment() {
-               compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
-               executor = new TezExecutor(compiler, getParallelism());
-       }
-
-       @Override
-       public void startNewSession() throws Exception {
-               throw new UnsupportedOperationException("Session management is 
not implemented in Flink on Tez.");
-       }
-}

Reply via email to