http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java deleted file mode 100644 index bdebfb1..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.table.test; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class AggregationsITCase extends MultipleProgramsTestBase { - - - public AggregationsITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testAggregationTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env)); - - Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "231,1,21,21,11"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAggregationOnNonExistingField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env)); - - Table result = - table.select("foo.avg"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test - public void testWorkingAggregationDataTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements( - new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), - new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,1,1,1.5,1.5,2"; - compareResultAsText(results, expected); - } - - @Test - public void testAggregationWithArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Float, String>> input = - env.fromElements( - new Tuple2<>(1f, "Hello"), - new Tuple2<>(2f, "Ciao")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - table.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\""); - - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "5.5,2 THE COUNT"; - compareResultAsText(results, expected); - } - - @Test - public void testAggregationWithTwoCount() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Float, String>> input = - env.fromElements( - new Tuple2<>(1f, "Hello"), - new Tuple2<>(2f, "Ciao")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - table.select("f0.count, f1.count"); - - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testNonWorkingDataTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - table.select("f1.sum"); - - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testNoNestedAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - table.select("f0.sum.sum"); - - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - -} -
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java deleted file mode 100644 index f6ab54e..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class AsITCase extends MultipleProgramsTestBase { - - - public AsITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithToFewFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithToManyFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithAmbiguousFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithNonFieldReference1() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithNonFieldReference2() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," + - " c"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java deleted file mode 100644 index 7e9e3dc..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class CastingITCase extends MultipleProgramsTestBase { - - - public CastingITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testAutoCastToString() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table.select( - "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\""); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1b,1s,1i,1L,1.0f,1.0d"; - compareResultAsText(results, expected); - } - - @Test - public void testNumericAutocastInArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table.select("f0 + 1, f1 +" + - " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,2,2.0,2.0,2.0"; - compareResultAsText(results, expected); - } - - @Test - public void testNumericAutocastInComparison() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements( - new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), - new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello")); - - Table table = - tableEnv.fromDataSet(input, "a,b,c,d,e,f,g"); - - Table result = table - .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,2,2,2.0,2.0,Hello"; - compareResultAsText(results, expected); - } - - @Test - public void testCastFromString() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple3<String, String, String>> input = - env.fromElements(new Tuple3<>("1", "true", "2.0")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table.select( - "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,1,1,2.0,2.0,true\n"; - compareResultAsText(results, expected); - } - - @Test - public void testCastDateFromString() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple4<String, String, String, String>> input = - env.fromElements(new Tuple4<>("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table - .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3") - .select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + - "1970-01-17 17:47:53.775\n"; - compareResultAsText(results, expected); - } - - @Test - public void testCastDateToStringAndLong() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<String, String>> input = - env.fromElements(new Tuple2<>("2011-05-03 15:51:36.000", "1304437896000")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table - .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1") - .select("f0.cast(STRING), f0.cast(LONG), f1.cast(STRING), f1.cast(LONG)"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n"; - compareResultAsText(results, expected); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java deleted file mode 100644 index c9bba62..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class ExpressionsITCase extends MultipleProgramsTestBase { - - - public ExpressionsITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Integer, Integer>> input = - env.fromElements(new Tuple2<>(5, 10)); - - Table table = - tableEnv.fromDataSet(input, "a, b"); - - Table result = table.select( - "a - 5, a + 5, a / 2, a * 2, a % 2, -a"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "0,10,2,10,1,-5"; - compareResultAsText(results, expected); - } - - @Test - public void testLogic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Integer, Boolean>> input = - env.fromElements(new Tuple2<>(5, true)); - - Table table = - tableEnv.fromDataSet(input, "a, b"); - - Table result = table.select( - "b && true, b && false, b || false, !b"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "true,false,true,false"; - compareResultAsText(results, expected); - } - - @Test - public void testComparisons() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple3<Integer, Integer, Integer>> input = - env.fromElements(new Tuple3<>(5, 5, 4)); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table.select( - "a > c, a >= b, a < c, a.isNull, a.isNotNull"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "true,true,false,false,true"; - compareResultAsText(results, expected); - } - - @Test - public void testBitwiseOperation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Byte, Byte>> input = - env.fromElements(new Tuple2<>((byte) 3, (byte) 5)); - - Table table = - tableEnv.fromDataSet(input, "a, b"); - - Table result = table.select( - "a & b, a | b, a ^ b, ~a"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,7,6,-4"; - compareResultAsText(results, expected); - } - - @Test - public void testBitwiseWithAutocast() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Integer, Byte>> input = - env.fromElements(new Tuple2<>(3, (byte) 5)); - - Table table = - tableEnv.fromDataSet(input, "a, b"); - - Table result = table.select( - "a & b, a | b, a ^ b, ~a"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,7,6,-4"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testBitwiseWithNonWorkingAutocast() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSource<Tuple2<Float, Byte>> input = - env.fromElements(new Tuple2<>(3.0f, (byte) 5)); - - Table table = - tableEnv.fromDataSet(input, "a, b"); - - Table result = - table.select("a & b, a | b, a ^ b, ~a"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java deleted file mode 100644 index 44e0def..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class FilterITCase extends MultipleProgramsTestBase { - - - public FilterITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testAllRejectingFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("false"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "\n"; - compareResultAsText(results, expected); - } - - @Test - public void testAllPassingFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("true"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } - - @Test - public void testFilterOnIntegerTupleField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter(" a % 2 = 0 "); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + - "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; - compareResultAsText(results, expected); - } - - @Test - public void testNotEquals() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("!( a % 2 <> 0 ) "); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + - "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; - compareResultAsText(results, expected); - } - - @Test - public void testIntegerBiggerThan128() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello")); - - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table.filter("a = 300 "); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "300,1,Hello\n"; - compareResultAsText(results, expected); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java deleted file mode 100644 index f5c9185..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class GroupedAggregationsITCase extends MultipleProgramsTestBase { - - - public GroupedAggregationsITCase(TestExecutionMode mode){ - super(mode); - } - - @Test(expected = ExpressionException.class) - public void testGroupingOnNonExistentField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .groupBy("foo").select("a.avg"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test - public void testGroupedAggregate() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .groupBy("b").select("b, a.sum"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; - compareResultAsText(results, expected); - } - - @Test - public void testGroupingKeyForwardIfNotUsed() throws Exception { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .groupBy("b").select("a.sum"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; - compareResultAsText(results, expected); - } - - @Test - public void testGroupNoAggregation() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = - tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - - String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"; - List<Row> results = ds.collect(); - compareResultAsText(results, expected); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java deleted file mode 100644 index 428aec5..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class JoinITCase extends MultipleProgramsTestBase { - - - public JoinITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testJoin() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("b === e").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; - compareResultAsText(results, expected); - } - - @Test - public void testJoinWithFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("b === e && b < 2").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "Hi,Hallo\n"; - compareResultAsText(results, expected); - } - - @Test - public void testJoinWithMultipleKeys() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("a === d && b === h").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + - "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testJoinNonExistingKey() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("foo === e").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testJoinWithNonMatchingKeyTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1 - .join(in2).where("a === g").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testJoinWithAmbiguousFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c"); - - Table result = in1 - .join(in2).where("a === d").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test - public void testJoinWithAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1 - .join(in2).where("a === d").select("g.count"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "6"; - compareResultAsText(results, expected); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java deleted file mode 100644 index d61912b..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import java.io.Serializable; -import java.util.List; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.table.Table; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class PojoGroupingITCase extends MultipleProgramsTestBase { - - public PojoGroupingITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testPojoGrouping() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<String, Double, String>> data = env.fromElements( - new Tuple3<>("A", 23.0, "Z"), - new Tuple3<>("A", 24.0, "Y"), - new Tuple3<>("B", 1.0, "Z")); - - TableEnvironment tableEnv = new TableEnvironment(); - - Table table = tableEnv - .fromDataSet(data, "groupMe, value, name") - .select("groupMe, value, name") - .where("groupMe != 'B'"); - - DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class); - - DataSet<MyPojo> result = myPojos.groupBy("groupMe") - .sortGroup("value", Order.DESCENDING) - .first(1); - List<MyPojo> resultList = result.collect(); - - compareResultAsText(resultList, "A,24.0,Y"); - } - - public static class MyPojo implements Serializable { - private static final long serialVersionUID = 8741918940120107213L; - - public String groupMe; - public double value; - public String name; - - public MyPojo() { - // for serialization - } - - public MyPojo(String groupMe, double value, String name) { - this.groupMe = groupMe; - this.value = value; - this.name = name; - } - - @Override - public String toString() { - return groupMe + "," + value + "," + name; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java deleted file mode 100644 index 9e42f53..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class SelectITCase extends MultipleProgramsTestBase { - - - public SelectITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testSimpleSelectAllWithAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - Table in = tableEnv.fromDataSet(ds, "a,b,c"); - - Table result = in - .select("a, b, c"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - - } - - @Test - public void testSimpleSelectWithNaming() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - Table in = tableEnv.fromDataSet(ds); - - Table result = in - .select("f0 as a, f1 as b") - .select("a, b"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithToFewFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - Table in = tableEnv.fromDataSet(ds, "a, b"); - - DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); - List<Row> results = resultSet.collect(); - String expected = " sorry dude "; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithToManyFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - Table in = tableEnv.fromDataSet(ds, "a, b, c, d"); - - DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); - List<Row> results = resultSet.collect(); - String expected = " sorry dude "; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testAsWithAmbiguousFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - Table in = tableEnv.fromDataSet(ds, "a, b, c, b"); - - DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); - List<Row> results = resultSet.collect(); - String expected = " today's not your day "; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testOnlyFieldRefInAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - Table in = tableEnv.fromDataSet(ds, "a, b as c, d"); - - DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class); - List<Row> results = resultSet.collect(); - String expected = "sorry bro"; - compareResultAsText(results, expected); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java deleted file mode 100644 index e73b5a2..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.table.Table; -import org.junit.Test; - -import java.io.File; -import java.util.Scanner; - -import static org.junit.Assert.assertEquals; - -public class SqlExplainITCase { - - private static String testFilePath = SqlExplainITCase.class.getResource("/").getFile(); - - public static class WC { - public String word; - public int count; - - // Public constructor to make it a Flink POJO - public WC() {} - - public WC(int count, String word) { - this.word = word; - this.count = count; - } - } - - @Test - public void testGroupByWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<WC> input = env.fromElements( - new WC(1,"d"), - new WC(2,"d"), - new WC(3,"d")); - - Table table = tableEnv.fromDataSet(input).as("a, b"); - - String result = table - .filter("a % 2 = 0") - .explain(); - String source = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testFilter0.out")) - .useDelimiter("\\A").next(); - assertEquals(result, source); - } - - @Test - public void testGroupByWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<WC> input = env.fromElements( - new WC(1, "d"), - new WC(2, "d"), - new WC(3, "d")); - - Table table = tableEnv.fromDataSet(input).as("a, b"); - - String result = table - .filter("a % 2 = 0") - .explain(true); - String source = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testFilter1.out")) - .useDelimiter("\\A").next(); - assertEquals(result, source); - } - - @Test - public void testJoinWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<WC> input1 = env.fromElements( - new WC(1, "d"), - new WC(1, "d"), - new WC(1, "d")); - - Table table1 = tableEnv.fromDataSet(input1).as("a, b"); - - DataSet<WC> input2 = env.fromElements( - new WC(1,"d"), - new WC(1,"d"), - new WC(1,"d")); - - Table table2 = tableEnv.fromDataSet(input2).as("c, d"); - - String result = table1 - .join(table2) - .where("b = d") - .select("a, c") - .explain(); - String source = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testJoin0.out")) - .useDelimiter("\\A").next(); - assertEquals(result, source); - } - - @Test - public void testJoinWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<WC> input1 = env.fromElements( - new WC(1, "d"), - new WC(1, "d"), - new WC(1, "d")); - - Table table1 = tableEnv.fromDataSet(input1).as("a, b"); - - DataSet<WC> input2 = env.fromElements( - new WC(1, "d"), - new WC(1, "d"), - new WC(1, "d")); - - Table table2 = tableEnv.fromDataSet(input2).as("c, d"); - - String result = table1 - .join(table2) - .where("b = d") - .select("a, c") - .explain(true); - String source = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testJoin1.out")) - .useDelimiter("\\A").next(); - assertEquals(result, source); - } - - @Test - public void testUnionWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<WC> input1 = env.fromElements( - new WC(1, "d"), - new WC(1, "d"), - new WC(1, "d")); - - Table table1 = tableEnv.fromDataSet(input1); - - DataSet<WC> input2 = env.fromElements( - new WC(1, "d"), - new WC(1, "d"), - new WC(1, "d")); - - Table table2 = tableEnv.fromDataSet(input2); - - String result = table1 - .unionAll(table2) - .explain(); - String source = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testUnion0.out")) - .useDelimiter("\\A").next(); - assertEquals(result, source); - } - - @Test - public void testUnionWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<WC> input1 = env.fromElements( - new WC(1, "d"), - new WC(1, "d"), - new WC(1, "d")); - - Table table1 = tableEnv.fromDataSet(input1); - - DataSet<WC> input2 = env.fromElements( - new WC(1, "d"), - new WC(1, "d"), - new WC(1, "d")); - - Table table2 = tableEnv.fromDataSet(input2); - - String result = table1 - .unionAll(table2) - .explain(true); - String source = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testUnion1.out")) - .useDelimiter("\\A").next(); - assertEquals(result, source); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java deleted file mode 100644 index 7936f8c..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class StringExpressionsITCase extends MultipleProgramsTestBase { - - - public StringExpressionsITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testSubstring() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple2<String, Integer>> ds = env.fromElements( - new Tuple2<>("AAAA", 2), - new Tuple2<>("BBBB", 1)); - - Table in = tableEnv.fromDataSet(ds, "a, b"); - - Table result = in - .select("a.substring(0, b)"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "AA\nB"; - compareResultAsText(results, expected); - } - - @Test - public void testSubstringWithMaxEnd() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple2<String, Integer>> ds = env.fromElements( - new Tuple2<>("ABCD", 2), - new Tuple2<>("ABCD", 1)); - - Table in = tableEnv.fromDataSet(ds, "a, b"); - - Table result = in - .select("a.substring(b)"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "CD\nBCD"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testNonWorkingSubstring1() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple2<String, Float>> ds = env.fromElements( - new Tuple2<>("ABCD", 2.0f), - new Tuple2<>("ABCD", 1.0f)); - - Table in = tableEnv.fromDataSet(ds, "a, b"); - - Table result = in - .select("a.substring(0, b)"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testNonWorkingSubstring2() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple2<String, String>> ds = env.fromElements( - new Tuple2<>("ABCD", "a"), - new Tuple2<>("ABCD", "b")); - - Table in = tableEnv.fromDataSet(ds, "a, b"); - - Table result = in - .select("a.substring(b, 15)"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = ""; - compareResultAsText(results, expected); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java deleted file mode 100644 index 7fd3a28..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table.test; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.table.ExpressionException; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.Table; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class UnionITCase extends MultipleProgramsTestBase { - - - public UnionITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testUnion() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, c"); - - Table selected = in1.unionAll(in2).select("c"); - DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); - List<Row> results = ds.collect(); - - String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"; - compareResultAsText(results, expected); - } - - @Test - public void testUnionWithFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c"); - - Table selected = in1.unionAll(in2).where("b < 2").select("c"); - DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); - List<Row> results = ds.collect(); - - String expected = "Hi\n" + "Hallo\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testUnionFieldsNameNotOverlap1() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table selected = in1.unionAll(in2); - DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); - List<Row> results = ds.collect(); - - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ExpressionException.class) - public void testUnionFieldsNameNotOverlap2() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, e").select("a, b, c"); - - Table selected = in1.unionAll(in2); - DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); - List<Row> results = ds.collect(); - - String expected = ""; - compareResultAsText(results, expected); - } - - @Test - public void testUnionWithAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c"); - - Table selected = in1.unionAll(in2).select("c.count"); - DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class); - List<Row> results = ds.collect(); - - String expected = "18"; - compareResultAsText(results, expected); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java deleted file mode 100644 index 1816614..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java +++ /dev/null @@ -1,100 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.flink.api.scala.table.test; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.examples.scala.PageRankTable; -import org.apache.flink.test.testdata.PageRankData; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - -@RunWith(Parameterized.class) -public class PageRankTableITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 2; - - private int curProgId = config.getInteger("ProgramId", -1); - - private String verticesPath; - private String edgesPath; - private String resultPath; - private String expectedResult; - - public PageRankTableITCase(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); - edgesPath = createTempFile("edges.txt", PageRankData.EDGES); - } - - @Override - protected void testProgram() throws Exception { - expectedResult = runProgram(curProgId); - } - - @Override - protected void postSubmit() throws Exception { - compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.01); - } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); - } - - - public String runProgram(int progId) throws Exception { - - switch(progId) { - case 1: { - PageRankTable.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData - .NUM_VERTICES + "", "3"}); - return PageRankData.RANKS_AFTER_3_ITERATIONS; - } - case 2: { - // start with a very high number of iteration such that the dynamic convergence criterion must handle termination - PageRankTable.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); - return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; - } - - default: - throw new IllegalArgumentException("Invalid program id"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala deleted file mode 100644 index acb7ded..0000000 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.ExpressionException -import org.junit.Test - -class TypeExceptionTest { - - @Test(expected = classOf[ExpressionException]) - def testInnerCaseClassException(): Unit = { - case class WC(word: String, count: Int) - - val env = ExecutionEnvironment.getExecutionEnvironment - val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) - val expr = input.toTable // this should fail - val result = expr - .groupBy('word) - .select('word, 'count.sum as 'count) - .toDataSet[WC] - - result.print() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala deleted file mode 100644 index ee5d9e8..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.{Row, ExpressionException} -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testAggregationTypes(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toTable - .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg).toDataSet[Row] - val results = ds.collect() - val expected = "231,1,21,21,11" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAggregationOnNonExistingField(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toTable - .select('foo.avg).toDataSet[Row] - val expected = "" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testWorkingAggregationDataTypes(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements( - (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), - (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable - .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) - .toDataSet[Row] - val expected = "1,1,1,1,1.5,1.5,2" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testAggregationWithArithmetic(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable - .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT").toDataSet[Row] - val expected = "5.5,2 THE COUNT" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testAggregationWithTwoCount(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable - .select('_1.count, '_2.count).toDataSet[Row] - val expected = "2,2" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingAggregationDataTypes(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("Hello", 1)).toTable - .select('_1.sum).toDataSet[Row] - val expected = "" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testNoNestedAggregations(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("Hello", 1)).toTable - .select('_2.sum.sum).toDataSet[Row] - val expected = "" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala deleted file mode 100644 index 59573eb..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.{Row, ExpressionException} -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testAs(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).toDataSet[Row] - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToFewFields(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToManyFields(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithAmbiguousFields(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithNonFieldReference1(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - // as can only have field references - val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithNonFieldReference2(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - // as can only have field references - val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b).toDataSet[Row] - val expected = "no" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } -}