http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala deleted file mode 100644 index c177184..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import java.util.Date - -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.Row -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testAutoCastToString(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable - .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date") - .toDataSet[Row] - val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testNumericAutoCastInArithmetic(): Unit = { - - // don't test everything, just some common cast directions - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable - .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1) - .toDataSet[Row] - val expected = "2,2,2,2.0,2.0,2.0" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testNumericAutoCastInComparison(): Unit = { - - // don't test everything, just some common cast directions - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements( - (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d), - (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f) - .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1) - .toDataSet[Row] - val expected = "2,2,2,2,2.0,2.0" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testCastFromString: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("1", "true", "2.0", - "2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775")) - .toTable - .select( - '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO), - '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO), - '_1.cast(BasicTypeInfo.INT_TYPE_INFO), - '_1.cast(BasicTypeInfo.LONG_TYPE_INFO), - '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO), - '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO), - '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO), - '_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), - '_5.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), - '_6.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), - '_7.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO)) - .toDataSet[Row] - val expected = "1,1,1,1,2.0,2.0,true," + - "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + - "1970-01-17 17:47:53.775\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testCastDateToStringAndLong { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000")) - val result = ds.toTable - .select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0), - '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1)) - .select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO), - 'f0.cast(BasicTypeInfo.LONG_TYPE_INFO), - 'f1.cast(BasicTypeInfo.STRING_TYPE_INFO), - 'f1.cast(BasicTypeInfo.LONG_TYPE_INFO)) - .toDataSet[Row] - .collect - val expected = "2011-05-03 15:51:36.000,1304437896000," + - "2011-05-03 15:51:36.000,1304437896000\n" - TestBaseUtils.compareResultAsText(result.asJava, expected) - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala deleted file mode 100644 index 017cbf1..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import java.util.Date - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, ExpressionException} -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testArithmetic(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, 10)).as('a, 'b) - .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a).toDataSet[Row] - val expected = "0,10,2,10,1,-5" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testLogic(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, true)).as('a, 'b) - .select('b && true, 'b && false, 'b || false, !'b).toDataSet[Row] - val expected = "true,false,true,false" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testComparisons(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c) - .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull).toDataSet[Row] - val expected = "true,true,false,false,true" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testBitwiseOperations(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] - val expected = "1,7,6,-4" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testBitwiseWithAutocast(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3, 5.toByte)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] - val expected = "1,7,6,-4" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testBitwiseWithNonWorkingAutocast(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3.0, 5)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a).toDataSet[Row] - val expected = "1,7,6,-4" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testCaseInsensitiveForAs(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3, 5.toByte)).as('a, 'b) - .groupBy("a").select("a, a.count As cnt").toDataSet[Row] - val expected = "3,1" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testDateLiteral(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((0L, "test")).as('a, 'b) - .select('a, - Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO), - 'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO)) - .toDataSet[Row] - val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala deleted file mode 100644 index c0b86f8..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - - -@RunWith(classOf[Parameterized]) -class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testAllRejectingFilter(): Unit = { - /* - * Test all-rejecting filter. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( Literal(false) ) - val expected = "\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testAllPassingFilter(): Unit = { - /* - * Test all-passing filter. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( Literal(true) ) - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterOnStringTupleField(): Unit = { - /* - * Test filter on String tuple field. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env) - val filterDs = ds.filter( _._3.contains("world") ) - val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterOnIntegerTupleField(): Unit = { - /* - * Test filter on Integer tuple field. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( 'a % 2 === 0 ) - val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + - "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + - "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - // These two not yet done, but are planned - - @Ignore - @Test - def testFilterBasicType(): Unit = { - /* - * Test filter on basic type - */ - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.getStringDataSet(env) - - val filterDs = ds.filter( _.startsWith("H") ) - val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Ignore - @Test - def testFilterOnCustomType(): Unit = { - /* - * Test filter on custom type - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.getCustomTypeDataSet(env) - val filterDs = ds.filter( _.myString.contains("a") ) - val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala deleted file mode 100644 index fb76507..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.{Row, ExpressionException} -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test(expected = classOf[ExpressionException]) - def testGroupingOnNonExistentField(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('_foo) - .select('a.avg).toDataSet[Row] - val expected = "" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedAggregate(): Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('b, 'a.sum).toDataSet[Row] - val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupingKeyForwardIfNotUsed(): Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('a.sum).toDataSet[Row] - val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSQLStyleAggregations(): Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .select( - """Sum( a) as a1, a.sum as a2, - |Min (a) as b1, a.min as b2, - |Max (a ) as c1, a.max as c2, - |Avg ( a ) as d1, a.avg as d2, - |Count(a) as e1, a.count as e2 - """.stripMargin).toDataSet[Row] - val expected = "231,231,1,1,21,21,11,11,21,21" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupNoAggregation(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env) - .as('a, 'b, 'c) - .groupBy('b) - .select('a.sum as 'd, 'b) - .groupBy('b, 'd) - .select('b) - .toDataSet[Row] - - val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala deleted file mode 100644 index e12c9d6..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.{Row, ExpressionException} -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testJoin(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g).toDataSet[Row] - val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" - val results = joinDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testJoinWithFilter(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g).toDataSet[Row] - val expected = "Hi,Hallo\n" - val results = joinDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testJoinWithMultipleKeys(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g).toDataSet[Row] - val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + - "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" - val results = joinDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testJoinNonExistingKey(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g).toDataSet[Row] - val expected = "" - val results = joinDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testJoinWithNonMatchingKeyTypes(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g).toDataSet[Row] - val expected = "" - val results = joinDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testJoinWithAmbiguousFields(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c) - - val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g).toDataSet[Row] - val expected = "" - val results = joinDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testJoinWithAggregation(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('a === 'd).select('g.count).toDataSet[Row] - val expected = "6" - val results = joinDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala deleted file mode 100644 index fa3f283..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.{Row, ExpressionException} -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testSimpleSelectAll(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3) - .toDataSet[Row] - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSimpleSelectAllWithAs(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c) - .toDataSet[Row] - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSimpleSelectWithNaming(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toTable - .select('_1 as 'a, '_2 as 'b) - .select('a, 'b).toDataSet[Row] - val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToFewFields(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToManyFields(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithAmbiguousFields(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - - @Test(expected = classOf[ExpressionException]) - def testOnlyFieldRefInAs(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala deleted file mode 100644 index bead02f..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ - -import org.junit._ -import org.junit.Assert.assertEquals - -case class WC(count: Int, word: String) - -class SqlExplainITCase { - - val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile - - @Test - def testGroupByWithoutExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) - val result = expr.filter("a % 2 = 0").explain() - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter0.out").mkString - assertEquals(result, source) - } - - @Test - def testGroupByWithExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) - val result = expr.filter("a % 2 = 0").explain(true) - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter1.out").mkString - assertEquals(result, source) - } - - @Test - def testJoinWithoutExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) - val result = expr1.join(expr2).where("b = d").select("a, c").explain() - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin0.out").mkString - assertEquals(result, source) - } - - @Test - def testJoinWithExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) - val result = expr1.join(expr2).where("b = d").select("a, c").explain(true) - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin1.out").mkString - assertEquals(result, source) - } - - @Test - def testUnionWithoutExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable - val result = expr1.unionAll(expr2).explain() - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion0.out").mkString - assertEquals(result, source) - } - - @Test - def testUnionWithExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable - val result = expr1.unionAll(expr2).explain(true) - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion1.out").mkString - assertEquals(result, source) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala deleted file mode 100644 index 10bc8fd..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.{Row, ExpressionException} -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testSubstring(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b) - .select('a.substring(0, 'b)).toDataSet[Row] - val expected = "AA\nB" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSubstringWithMaxEnd(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b) - .select('a.substring('b)).toDataSet[Row] - val expected = "CD\nBCD" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingSubstring1(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b) - .select('a.substring(0, 'b)).toDataSet[Row] - val expected = "AAA\nBB" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingSubstring2(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b) - .select('a.substring('b, 15)).toDataSet[Row] - val expected = "AAA\nBB" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala deleted file mode 100644 index a47d4b7..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{ExpressionException, Row} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testUnion(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - - val unionDs = ds1.unionAll(ds2).select('c) - - val results = unionDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnionWithFilter(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e) - - val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c) - - val results = joinDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hallo\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testUnionFieldsNameNotOverlap1(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e) - - val unionDs = ds1.unionAll(ds2) - - val results = unionDs.toDataSet[Row].collect() - val expected = "" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testUnionFieldsNameNotOverlap2(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e).select('a, 'b, 'c) - - val unionDs = ds1.unionAll(ds2) - - val results = unionDs.toDataSet[Row].collect() - val expected = "" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnionWithAggregation(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e) - - val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count) - - val results = unionDs.toDataSet[Row].collect() - val expected = "18" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala deleted file mode 100644 index ef616a9..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeinfo - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.util.TestLogger -import org.junit.Test -import org.scalatest.junit.JUnitSuiteLike - -class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike { - - @Test - def testRenamingProxyTypeEquality(): Unit = { - val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo]) - .asInstanceOf[CompositeType[TestPojo]] - - val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("someInt", "aString", "doubleArray")) - - val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("someInt", "aString", "doubleArray")) - - assert(tpeInfo1.equals(tpeInfo2)) - assert(tpeInfo1.hashCode() == tpeInfo2.hashCode()) - } - - @Test - def testRenamingProxyTypeInequality(): Unit = { - val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo]) - .asInstanceOf[CompositeType[TestPojo]] - - val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("someInt", "aString", "doubleArray")) - - val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("foobar", "aString", "doubleArray")) - - assert(!tpeInfo1.equals(tpeInfo2)) - } -} - -final class TestPojo { - var someInt: Int = 0 - private var aString: String = null - var doubleArray: Array[Double] = null - - def setaString(aString: String) { - this.aString = aString - } - - def getaString: String = { - return aString - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testFilter0.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter0.out b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out deleted file mode 100644 index 062fc90..0000000 --- a/flink-staging/flink-table/src/test/scala/resources/testFilter0.out +++ /dev/null @@ -1,28 +0,0 @@ -== Abstract Syntax Tree == -Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) - -== Physical Execution Plan == -Stage 3 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - - Stage 1 : Filter - content : ('a * 2) === 0 - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : FlatMap - Partitioning : RANDOM_PARTITIONED - - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat - ship_strategy : Forward - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testFilter1.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter1.out b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out deleted file mode 100644 index 83378e6..0000000 --- a/flink-staging/flink-table/src/test/scala/resources/testFilter1.out +++ /dev/null @@ -1,96 +0,0 @@ -== Abstract Syntax Tree == -Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) - -== Physical Execution Plan == -Stage 3 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 1 : Filter - content : ('a * 2) === 0 - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : FlatMap - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : 0.0 - Est. Cardinality : 0.0 - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat - ship_strategy : Forward - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : 0.0 - Est. Cardinality : 0.0 - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testJoin0.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin0.out b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out deleted file mode 100644 index e6e30be..0000000 --- a/flink-staging/flink-table/src/test/scala/resources/testJoin0.out +++ /dev/null @@ -1,39 +0,0 @@ -== Abstract Syntax Tree == -Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) - -== Physical Execution Plan == -Stage 3 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - -Stage 5 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - - Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - - Stage 1 : Join - content : Join at 'b === 'd - ship_strategy : Hash Partition on [1] - exchange_mode : PIPELINED - driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) - Partitioning : RANDOM_PARTITIONED - - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat - ship_strategy : Forward - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testJoin1.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin1.out b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out deleted file mode 100644 index a8f05dd..0000000 --- a/flink-staging/flink-table/src/test/scala/resources/testJoin1.out +++ /dev/null @@ -1,141 +0,0 @@ -== Abstract Syntax Tree == -Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) - -== Physical Execution Plan == -Stage 3 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - -Stage 5 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 1 : Join - content : Join at 'b === 'd - ship_strategy : Hash Partition on [1] - exchange_mode : PIPELINED - driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : (unknown) - Disk I/O : (unknown) - CPU : (unknown) - Cumulative Network : (unknown) - Cumulative Disk I/O : (unknown) - Cumulative CPU : (unknown) - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat - ship_strategy : Forward - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : (unknown) - Cumulative Disk I/O : (unknown) - Cumulative CPU : (unknown) - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testUnion0.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion0.out b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out deleted file mode 100644 index db9d2f9..0000000 --- a/flink-staging/flink-table/src/test/scala/resources/testUnion0.out +++ /dev/null @@ -1,38 +0,0 @@ -== Abstract Syntax Tree == -Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) - -== Physical Execution Plan == -Stage 3 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - -Stage 5 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - - Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - - Stage 1 : Union - content : - ship_strategy : Redistribute - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat - ship_strategy : Forward - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/resources/testUnion1.out ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion1.out b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out deleted file mode 100644 index 8dc1e53..0000000 --- a/flink-staging/flink-table/src/test/scala/resources/testUnion1.out +++ /dev/null @@ -1,140 +0,0 @@ -== Abstract Syntax Tree == -Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) - -== Physical Execution Plan == -Stage 3 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - -Stage 5 : Data Source - content : collect elements with CollectionInputFormat - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) - ship_strategy : Forward - exchange_mode : PIPELINED - driver_strategy : Map - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : 0.0 - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 1 : Union - content : - ship_strategy : Redistribute - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : (unknown) - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat - ship_strategy : Forward - exchange_mode : PIPELINED - Partitioning : RANDOM_PARTITIONED - Partitioning Order : (none) - Uniqueness : not unique - Order : (none) - Grouping : not grouped - Uniqueness : not unique - Est. Output Size : (unknown) - Est. Cardinality : (unknown) - Network : 0.0 - Disk I/O : 0.0 - CPU : 0.0 - Cumulative Network : (unknown) - Cumulative Disk I/O : 0.0 - Cumulative CPU : 0.0 - Output Size (bytes) : (none) - Output Cardinality : (none) - Avg. Output Record Size (bytes) : (none) - Filter Factor : (none) - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/pom.xml b/flink-staging/flink-tez/pom.xml deleted file mode 100644 index 6083f7f..0000000 --- a/flink-staging/flink-tez/pom.xml +++ /dev/null @@ -1,224 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-tez</artifactId> - <name>flink-tez</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-optimizer</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-examples-batch</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-api</artifactId> - <version>${tez.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-common</artifactId> - <version>${tez.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-dag</artifactId> - <version>${tez.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-runtime-library</artifactId> - <version>${tez.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <version>1.4</version> - </dependency> - - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4.1</version> - <configuration> - <descriptors> - <descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor> - </descriptors> - <archive> - <manifest> - <mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass> - </manifest> - </archive> - </configuration> - <executions> - <execution> - <!--<id>assemble-all</id>--> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml b/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml deleted file mode 100644 index 504761a..0000000 --- a/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml +++ /dev/null @@ -1,42 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>flink-fat-jar</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <useProjectArtifact>true</useProjectArtifact> - <!--<excludes> - <exclude>org.apache.flink:*</exclude> - </excludes>--> - <useTransitiveFiltering>true</useTransitiveFiltering> - <unpack>true</unpack> - <scope>runtime</scope> - <excludes> - <exclude>com.google.guava:guava</exclude> - </excludes> - </dependencySet> - </dependencySets> -</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java deleted file mode 100644 index 4c091e5..0000000 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.client; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; - -public class LocalTezEnvironment extends ExecutionEnvironment { - - TezExecutor executor; - Optimizer compiler; - - private LocalTezEnvironment() { - compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration()); - executor = new TezExecutor(compiler, this.getParallelism()); - } - - public static LocalTezEnvironment create() { - return new LocalTezEnvironment(); - } - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - TezConfiguration tezConf = new TezConfiguration(); - tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf.set("fs.defaultFS", "file:///"); - tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); - executor.setConfiguration(tezConf); - return executor.executePlan(createProgramPlan(jobName)); - } - - @Override - public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan(null, false); - return executor.getOptimizerPlanAsJSON(p); - } - - public void setAsContext() { - ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { - @Override - public ExecutionEnvironment createExecutionEnvironment() { - return LocalTezEnvironment.this; - } - }; - initializeContextEnvironment(factory); - } - - @Override - public void startNewSession() throws Exception { - throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java deleted file mode 100644 index 131937e..0000000 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.ClassUtil; -import org.apache.hadoop.util.ToolRunner; - - -public class RemoteTezEnvironment extends ExecutionEnvironment { - - private static final Log LOG = LogFactory.getLog(RemoteTezEnvironment.class); - - private Optimizer compiler; - private TezExecutor executor; - private Path jarPath = null; - - - public void registerMainClass (Class mainClass) { - jarPath = new Path(ClassUtil.findContainingJar(mainClass)); - LOG.info ("Registering main class " + mainClass.getName() + " contained in " + jarPath.toString()); - } - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - TezExecutorTool tool = new TezExecutorTool(executor, createProgramPlan()); - if (jarPath != null) { - tool.setJobJar(jarPath); - } - try { - int executionResult = ToolRunner.run(new Configuration(), tool, new String[]{jobName}); - } - finally { - return new JobExecutionResult(null, -1, null); - } - - } - - @Override - public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan(null, false); - return executor.getOptimizerPlanAsJSON(p); - } - - public static RemoteTezEnvironment create () { - return new RemoteTezEnvironment(); - } - - public RemoteTezEnvironment() { - compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration()); - executor = new TezExecutor(compiler, getParallelism()); - } - - @Override - public void startNewSession() throws Exception { - throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez."); - } -}