http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java new file mode 100644 index 0000000..89ec2e5 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class SelectITCase extends MultipleProgramsTestBase { + + + public SelectITCase(TestExecutionMode mode) { + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSimpleSelectAllWithAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a,b,c"); + + ExpressionOperation<JavaBatchTranslator> result = in + .select("a, b, c"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + + } + + @Test + public void testSimpleSelectWithNaming() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds); + + ExpressionOperation<JavaBatchTranslator> result = in + .select("f0 as a, f1 as b") + .select("a, b"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToFewFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = " sorry dude "; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToManyFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, d"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = " sorry dude "; + } + + @Test(expected = ExpressionException.class) + public void testAsWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, b"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = " today's not your day "; + } + + @Test(expected = ExpressionException.class) + public void testOnlyFieldRefInAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b as c, d"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "sorry bro"; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java new file mode 100644 index 0000000..f9f1c6b --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class StringExpressionsITCase extends MultipleProgramsTestBase { + + + public StringExpressionsITCase(TestExecutionMode mode) { + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSubstring() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<String, Integer>> ds = env.fromElements( + new Tuple2<String, Integer>("AAAA", 2), + new Tuple2<String, Integer>("BBBB", 1)); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = in + .select("a.substring(0, b)"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "AA\nB"; + } + + @Test + public void testSubstringWithMaxEnd() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<String, Integer>> ds = env.fromElements( + new Tuple2<String, Integer>("ABCD", 2), + new Tuple2<String, Integer>("ABCD", 1)); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = in + .select("a.substring(b)"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "CD\nBCD"; + } + + @Test(expected = ExpressionException.class) + public void testNonWorkingSubstring1() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<String, Float>> ds = env.fromElements( + new Tuple2<String, Float>("ABCD", 2.0f), + new Tuple2<String, Float>("ABCD", 1.0f)); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = in + .select("a.substring(0, b)"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testNonWorkingSubstring2() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<String, String>> ds = env.fromElements( + new Tuple2<String, String>("ABCD", "a"), + new Tuple2<String, String>("ABCD", "b")); + + ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = in + .select("a.substring(b, 15)"); + + DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java new file mode 100644 index 0000000..b75a2ee --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java @@ -0,0 +1,100 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.api.scala.expressions.test; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.examples.scala.PageRankExpression; +import org.apache.flink.test.testdata.PageRankData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class PageRankExpressionITCase extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 2; + + private int curProgId = config.getInteger("ProgramId", -1); + + private String verticesPath; + private String edgesPath; + private String resultPath; + private String expectedResult; + + public PageRankExpressionITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); + edgesPath = createTempFile("edges.txt", PageRankData.EDGES); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = runProgram(curProgId); + } + + @Override + protected void postSubmit() throws Exception { + compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01); + } + + @Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + + public String runProgram(int progId) throws Exception { + + switch(progId) { + case 1: { + PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData + .NUM_VERTICES + "", "3"}); + return PageRankData.RANKS_AFTER_3_ITERATIONS; + } + case 2: { + // start with a very high number of iteration such that the dynamic convergence criterion must handle termination + PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); + return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; + } + + default: + throw new IllegalArgumentException("Invalid program id"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala deleted file mode 100644 index fee7ac8..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAggregationTypes: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression - .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "231,1,21,21,11" - } - - @Test(expected = classOf[ExpressionException]) - def testAggregationOnNonExistingField: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression - .select('foo.avg) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test - def testWorkingAggregationDataTypes: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements( - (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), - (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression - .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,1,1,1.5,1.5,2" - } - - @Test - def testAggregationWithArithmetic: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression - .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT") - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "5.5,2 THE COUNT" - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingAggregationDataTypes: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("Hello", 1)).toExpression - .select('_1.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test(expected = classOf[ExpressionException]) - def testNoNestedAggregations: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("Hello", 1)).toExpression - .select('_2.sum.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala deleted file mode 100644 index 8921e89..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAs: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToFewFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToManyFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithAmbiguousFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithNonFieldReference1: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - // as can only have field references - val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithNonFieldReference2: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - // as can only have field references - val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala deleted file mode 100644 index b3f8ef3..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAutoCastToString: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression - .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d") - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1b,1s,1i,1L,1.0f,1.0d" - } - - @Test - def testNumericAutoCastInArithmetic: Unit = { - - // don't test everything, just some common cast directions - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression - .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "2,2,2,2.0,2.0,2.0" - } - - @Test - def testNumericAutoCastInComparison: Unit = { - - // don't test everything, just some common cast directions - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements( - (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d), - (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f) - .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "2,2,2,2,2.0,2.0" - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala deleted file mode 100644 index de41f65..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testArithmetic: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, 10)).as('a, 'b) - .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "0,10,2,10,1,-5" - } - - @Test - def testLogic: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, true)).as('a, 'b) - .select('b && true, 'b && false, 'b || false, !'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "true,false,true,false" - } - - @Test - def testComparisons: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c) - .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "true,true,false,false,true" - } - - @Test - def testBitwiseOperations: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,7,6,-4" - } - - @Test - def testBitwiseWithAutocast: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3, 5.toByte)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,7,6,-4" - } - - @Test(expected = classOf[ExpressionException]) - def testBitwiseWithNonWorkingAutocast: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val ds = env.fromElements((3.0, 5)).as('a, 'b) - .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,7,6,-4" - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala deleted file mode 100644 index 4b46458..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.tree.Literal -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit._ - - -@RunWith(classOf[Parameterized]) -class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = null - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testAllRejectingFilter: Unit = { - /* - * Test all-rejecting filter. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( Literal(false) ) - - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "\n" - } - - @Test - def testAllPassingFilter: Unit = { - /* - * Test all-passing filter. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( Literal(true) ) - - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test - def testFilterOnStringTupleField: Unit = { - /* - * Test filter on String tuple field. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env) - val filterDs = ds.filter( _._3.contains("world") ) - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" - } - - @Test - def testFilterOnIntegerTupleField: Unit = { - /* - * Test filter on Integer tuple field. - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - - val filterDs = ds.filter( 'a % 2 === 0 ) - - filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + - "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" - } - - // These two not yet done, but are planned - - @Ignore - @Test - def testFilterBasicType: Unit = { - /* - * Test filter on basic type - */ - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.getStringDataSet(env) - - val filterDs = ds.filter( _.startsWith("H") ) - - filterDs.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" - } - - @Ignore - @Test - def testFilterOnCustomType: Unit = { - /* - * Test filter on custom type - */ - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.getCustomTypeDataSet(env) - val filterDs = ds.filter( _.myString.contains("a") ) - filterDs.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala deleted file mode 100644 index 06f61db..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test(expected = classOf[ExpressionException]) - def testGroupingOnNonExistentField: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('_foo) - .select('a.avg) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test - def testGroupedAggregate: Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('b, 'a.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" - } - - @Test - def testGroupingKeyForwardIfNotUsed: Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('a.sum) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" - } - - - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala deleted file mode 100644 index d52acf6..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testJoin: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" - } - - @Test - def testJoinWithFilter: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "Hi,Hallo\n" - } - - @Test - def testJoinWithMultipleKeys: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + - "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" - } - - @Test(expected = classOf[ExpressionException]) - def testJoinNonExistingKey: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test(expected = classOf[ExpressionException]) - def testJoinWithNonMatchingKeyTypes: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "" - } - - @Test - def testJoinWithAggregation: Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) - - val joinDs = ds1.join(ds2).where('a === 'd).select('g.count) - - joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) - env.execute() - expected = "6" - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java deleted file mode 100644 index aefc8cf..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java +++ /dev/null @@ -1,100 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.flink.api.scala.expressions; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.examples.scala.PageRankExpression; -import org.apache.flink.test.testdata.PageRankData; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - -@RunWith(Parameterized.class) -public class PageRankExpressionITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 2; - - private int curProgId = config.getInteger("ProgramId", -1); - - private String verticesPath; - private String edgesPath; - private String resultPath; - private String expectedResult; - - public PageRankExpressionITCase(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); - edgesPath = createTempFile("edges.txt", PageRankData.EDGES); - } - - @Override - protected void testProgram() throws Exception { - expectedResult = runProgram(curProgId); - } - - @Override - protected void postSubmit() throws Exception { - compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01); - } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); - } - - - public String runProgram(int progId) throws Exception { - - switch(progId) { - case 1: { - PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData - .NUM_VERTICES + "", "3"}); - return PageRankData.RANKS_AFTER_3_ITERATIONS; - } - case 2: { - // start with a very high number of iteration such that the dynamic convergence criterion must handle termination - PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); - return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; - } - - default: - throw new IllegalArgumentException("Invalid program id"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala deleted file mode 100644 index b286421..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testSimpleSelectAll: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test - def testSimpleSelectAllWithAs: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - } - - @Test - def testSimpleSelectWithNaming: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).toExpression - .select('_1 as 'a, '_2 as 'b) - .select('a, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToFewFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithToManyFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } - - @Test(expected = classOf[ExpressionException]) - def testAsWithAmbiguousFields: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c as 'b) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "no" - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala deleted file mode 100644 index 3a7ad02..0000000 --- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.scala._ -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null - private var expected: String = "" - private val _tempFolder = new TemporaryFolder() - - @Rule - def tempFolder = _tempFolder - - @Before - def before(): Unit = { - resultPath = tempFolder.newFile().toURI.toString - } - - @After - def after: Unit = { - compareResultsByLinesInMemory(expected, resultPath) - } - - @Test - def testSubstring: Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b) - .select('a.substring(0, 'b)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "AA\nB" - } - - @Test - def testSubstringWithMaxEnd: Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b) - .select('a.substring('b)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "CD\nBCD" - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingSubstring1: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b) - .select('a.substring(0, 'b)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "AAA\nBB" - } - - @Test(expected = classOf[ExpressionException]) - def testNonWorkingSubstring2: Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b) - .select('a.substring('b, 15)) - - ds.writeAsText(resultPath, WriteMode.OVERWRITE) - env.execute() - expected = "AAA\nBB" - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala new file mode 100644 index 0000000..4a358bc --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testAggregationTypes: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toExpression + .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "231,1,21,21,11" + } + + @Test(expected = classOf[ExpressionException]) + def testAggregationOnNonExistingField: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).toExpression + .select('foo.avg) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "" + } + + @Test + def testWorkingAggregationDataTypes: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression + .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,1,1,1,1.5,1.5,2" + } + + @Test + def testAggregationWithArithmetic: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression + .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT") + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "5.5,2 THE COUNT" + } + + @Test(expected = classOf[ExpressionException]) + def testNonWorkingAggregationDataTypes: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("Hello", 1)).toExpression + .select('_1.sum) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "" + } + + @Test(expected = classOf[ExpressionException]) + def testNoNestedAggregations: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("Hello", 1)).toExpression + .select('_2.sum.sum) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "" + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala new file mode 100644 index 0000000..18d7b09 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testAs: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToFewFields: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithToManyFields: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithAmbiguousFields: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithNonFieldReference1: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + // as can only have field references + val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } + + @Test(expected = classOf[ExpressionException]) + def testAsWithNonFieldReference2: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + // as can only have field references + val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "no" + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala new file mode 100644 index 0000000..599ef6b --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testAutoCastToString: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression + .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d") + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1b,1s,1i,1L,1.0f,1.0d" + } + + @Test + def testNumericAutoCastInArithmetic: Unit = { + + // don't test everything, just some common cast directions + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression + .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "2,2,2,2.0,2.0,2.0" + } + + @Test + def testNumericAutoCastInComparison: Unit = { + + // don't test everything, just some common cast directions + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f) + .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "2,2,2,2,2.0,2.0" + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala new file mode 100644 index 0000000..9d37f70 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = "" + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testArithmetic: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((5, 10)).as('a, 'b) + .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "0,10,2,10,1,-5" + } + + @Test + def testLogic: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((5, true)).as('a, 'b) + .select('b && true, 'b && false, 'b || false, !'b) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "true,false,true,false" + } + + @Test + def testComparisons: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c) + .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "true,true,false,false,true" + } + + @Test + def testBitwiseOperations: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b) + .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,7,6,-4" + } + + @Test + def testBitwiseWithAutocast: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((3, 5.toByte)).as('a, 'b) + .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,7,6,-4" + } + + @Test(expected = classOf[ExpressionException]) + def testBitwiseWithNonWorkingAutocast: Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((3.0, 5)).as('a, 'b) + .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "1,7,6,-4" + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala new file mode 100644 index 0000000..2841534 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions.test + +import org.apache.flink.api.expressions.tree.Literal +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + + +@RunWith(classOf[Parameterized]) +class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private var resultPath: String = null + private var expected: String = null + private val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder = _tempFolder + + @Before + def before(): Unit = { + resultPath = tempFolder.newFile().toURI.toString + } + + @After + def after: Unit = { + compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + def testAllRejectingFilter: Unit = { + /* + * Test all-rejecting filter. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + + val filterDs = ds.filter( Literal(false) ) + + filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "\n" + } + + @Test + def testAllPassingFilter: Unit = { + /* + * Test all-passing filter. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + + val filterDs = ds.filter( Literal(true) ) + + filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + } + + @Test + def testFilterOnStringTupleField: Unit = { + /* + * Test filter on String tuple field. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env) + val filterDs = ds.filter( _._3.contains("world") ) + filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + } + + @Test + def testFilterOnIntegerTupleField: Unit = { + /* + * Test filter on Integer tuple field. + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + + val filterDs = ds.filter( 'a % 2 === 0 ) + + filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + env.execute() + expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" + } + + // These two not yet done, but are planned + + @Ignore + @Test + def testFilterBasicType: Unit = { + /* + * Test filter on basic type + */ + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.getStringDataSet(env) + + val filterDs = ds.filter( _.startsWith("H") ) + + filterDs.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + } + + @Ignore + @Test + def testFilterOnCustomType: Unit = { + /* + * Test filter on custom type + */ + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.getCustomTypeDataSet(env) + val filterDs = ds.filter( _.myString.contains("a") ) + filterDs.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + } + +}