Repository: flink Updated Branches: refs/heads/master 885fbaf66 -> 343d05a40
[hotfix] [tableAPI] Moved tests to correct package. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/343d05a4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/343d05a4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/343d05a4 Branch: refs/heads/master Commit: 343d05a4067dd154b45e58b6bfce0ae6a4ebd5f4 Parents: 885fbaf Author: Fabian Hueske <fhue...@apache.org> Authored: Sun May 22 19:32:01 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon May 23 18:44:02 2016 +0200 ---------------------------------------------------------------------- .../scala/expression/ScalarFunctionsTest.scala | 490 ------------------- .../expression/utils/ExpressionEvaluator.scala | 119 ----- .../api/scala/typeutils/RowComparatorTest.scala | 136 ----- .../api/scala/typeutils/RowSerializerTest.scala | 194 -------- .../table/expressions/ScalarFunctionsTest.scala | 490 +++++++++++++++++++ .../expressions/utils/ExpressionEvaluator.scala | 119 +++++ .../api/table/typeutils/RowComparatorTest.scala | 136 +++++ .../api/table/typeutils/RowSerializerTest.scala | 194 ++++++++ 8 files changed, 939 insertions(+), 939 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala deleted file mode 100644 index 8d1cfa2..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expression - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.expression.utils.ExpressionEvaluator -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.junit.Assert.assertEquals -import org.junit.Test - -class ScalarFunctionsTest { - - // ---------------------------------------------------------------------------------------------- - // String functions - // ---------------------------------------------------------------------------------------------- - - @Test - def testSubstring(): Unit = { - testFunction( - 'f0.substring(2), - "f0.substring(2)", - "SUBSTRING(f0, 2)", - "his is a test String.") - - testFunction( - 'f0.substring(2, 5), - "f0.substring(2, 5)", - "SUBSTRING(f0, 2, 5)", - "his i") - - testFunction( - 'f0.substring(1, 'f7), - "f0.substring(1, f7)", - "SUBSTRING(f0, 1, f7)", - "Thi") - } - - @Test - def testTrim(): Unit = { - testFunction( - 'f8.trim(), - "f8.trim()", - "TRIM(f8)", - "This is a test String.") - - testFunction( - 'f8.trim(removeLeading = true, removeTrailing = true, " "), - "trim(f8)", - "TRIM(f8)", - "This is a test String.") - - testFunction( - 'f8.trim(removeLeading = false, removeTrailing = true, " "), - "f8.trim(TRAILING, ' ')", - "TRIM(TRAILING FROM f8)", - " This is a test String.") - - testFunction( - 'f0.trim(removeLeading = true, removeTrailing = true, "."), - "trim(BOTH, '.', f0)", - "TRIM(BOTH '.' FROM f0)", - "This is a test String") - } - - @Test - def testCharLength(): Unit = { - testFunction( - 'f0.charLength(), - "f0.charLength()", - "CHAR_LENGTH(f0)", - "22") - - testFunction( - 'f0.charLength(), - "charLength(f0)", - "CHARACTER_LENGTH(f0)", - "22") - } - - @Test - def testUpperCase(): Unit = { - testFunction( - 'f0.upperCase(), - "f0.upperCase()", - "UPPER(f0)", - "THIS IS A TEST STRING.") - } - - @Test - def testLowerCase(): Unit = { - testFunction( - 'f0.lowerCase(), - "f0.lowerCase()", - "LOWER(f0)", - "this is a test string.") - } - - @Test - def testInitCap(): Unit = { - testFunction( - 'f0.initCap(), - "f0.initCap()", - "INITCAP(f0)", - "This Is A Test String.") - } - - @Test - def testConcat(): Unit = { - testFunction( - 'f0 + 'f0, - "f0 + f0", - "f0||f0", - "This is a test String.This is a test String.") - } - - @Test - def testLike(): Unit = { - testFunction( - 'f0.like("Th_s%"), - "f0.like('Th_s%')", - "f0 LIKE 'Th_s%'", - "true") - - testFunction( - 'f0.like("%is a%"), - "f0.like('%is a%')", - "f0 LIKE '%is a%'", - "true") - } - - @Test - def testNotLike(): Unit = { - testFunction( - !'f0.like("Th_s%"), - "!f0.like('Th_s%')", - "f0 NOT LIKE 'Th_s%'", - "false") - - testFunction( - !'f0.like("%is a%"), - "!f0.like('%is a%')", - "f0 NOT LIKE '%is a%'", - "false") - } - - @Test - def testSimilar(): Unit = { - testFunction( - 'f0.similar("_*"), - "f0.similar('_*')", - "f0 SIMILAR TO '_*'", - "true") - - testFunction( - 'f0.similar("This (is)? a (test)+ Strin_*"), - "f0.similar('This (is)? a (test)+ Strin_*')", - "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'", - "true") - } - - @Test - def testNotSimilar(): Unit = { - testFunction( - !'f0.similar("_*"), - "!f0.similar('_*')", - "f0 NOT SIMILAR TO '_*'", - "false") - - testFunction( - !'f0.similar("This (is)? a (test)+ Strin_*"), - "!f0.similar('This (is)? a (test)+ Strin_*')", - "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'", - "false") - } - - @Test - def testMod(): Unit = { - testFunction( - 'f4.mod('f7), - "f4.mod(f7)", - "MOD(f4, f7)", - "2") - - testFunction( - 'f4.mod(3), - "mod(f4, 3)", - "MOD(f4, 3)", - "2") - - testFunction( - 'f4 % 3, - "mod(44, 3)", - "MOD(44, 3)", - "2") - - } - - @Test - def testExp(): Unit = { - testFunction( - 'f2.exp(), - "f2.exp()", - "EXP(f2)", - math.exp(42.toByte).toString) - - testFunction( - 'f3.exp(), - "f3.exp()", - "EXP(f3)", - math.exp(43.toShort).toString) - - testFunction( - 'f4.exp(), - "f4.exp()", - "EXP(f4)", - math.exp(44.toLong).toString) - - testFunction( - 'f5.exp(), - "f5.exp()", - "EXP(f5)", - math.exp(4.5.toFloat).toString) - - testFunction( - 'f6.exp(), - "f6.exp()", - "EXP(f6)", - math.exp(4.6).toString) - - testFunction( - 'f7.exp(), - "exp(3)", - "EXP(3)", - math.exp(3).toString) - } - - @Test - def testLog10(): Unit = { - testFunction( - 'f2.log10(), - "f2.log10()", - "LOG10(f2)", - math.log10(42.toByte).toString) - - testFunction( - 'f3.log10(), - "f3.log10()", - "LOG10(f3)", - math.log10(43.toShort).toString) - - testFunction( - 'f4.log10(), - "f4.log10()", - "LOG10(f4)", - math.log10(44.toLong).toString) - - testFunction( - 'f5.log10(), - "f5.log10()", - "LOG10(f5)", - math.log10(4.5.toFloat).toString) - - testFunction( - 'f6.log10(), - "f6.log10()", - "LOG10(f6)", - math.log10(4.6).toString) - } - - @Test - def testPower(): Unit = { - testFunction( - 'f2.power('f7), - "f2.power(f7)", - "POWER(f2, f7)", - math.pow(42.toByte, 3).toString) - - testFunction( - 'f3.power('f6), - "f3.power(f6)", - "POWER(f3, f6)", - math.pow(43.toShort, 4.6D).toString) - - testFunction( - 'f4.power('f5), - "f4.power(f5)", - "POWER(f4, f5)", - math.pow(44.toLong, 4.5.toFloat).toString) - } - - @Test - def testLn(): Unit = { - testFunction( - 'f2.ln(), - "f2.ln()", - "LN(f2)", - math.log(42.toByte).toString) - - testFunction( - 'f3.ln(), - "f3.ln()", - "LN(f3)", - math.log(43.toShort).toString) - - testFunction( - 'f4.ln(), - "f4.ln()", - "LN(f4)", - math.log(44.toLong).toString) - - testFunction( - 'f5.ln(), - "f5.ln()", - "LN(f5)", - math.log(4.5.toFloat).toString) - - testFunction( - 'f6.ln(), - "f6.ln()", - "LN(f6)", - math.log(4.6).toString) - } - - @Test - def testAbs(): Unit = { - testFunction( - 'f2.abs(), - "f2.abs()", - "ABS(f2)", - "42") - - testFunction( - 'f3.abs(), - "f3.abs()", - "ABS(f3)", - "43") - - testFunction( - 'f4.abs(), - "f4.abs()", - "ABS(f4)", - "44") - - testFunction( - 'f5.abs(), - "f5.abs()", - "ABS(f5)", - "4.5") - - testFunction( - 'f6.abs(), - "f6.abs()", - "ABS(f6)", - "4.6") - - testFunction( - 'f9.abs(), - "f9.abs()", - "ABS(f9)", - "42") - - testFunction( - 'f10.abs(), - "f10.abs()", - "ABS(f10)", - "43") - - testFunction( - 'f11.abs(), - "f11.abs()", - "ABS(f11)", - "44") - - testFunction( - 'f12.abs(), - "f12.abs()", - "ABS(f12)", - "4.5") - - testFunction( - 'f13.abs(), - "f13.abs()", - "ABS(f13)", - "4.6") - } - - @Test - def testArithmeticFloorCeil(): Unit = { - testFunction( - 'f5.floor(), - "f5.floor()", - "FLOOR(f5)", - "4.0") - - testFunction( - 'f5.ceil(), - "f5.ceil()", - "CEIL(f5)", - "5.0") - - testFunction( - 'f3.floor(), - "f3.floor()", - "FLOOR(f3)", - "43") - - testFunction( - 'f3.ceil(), - "f3.ceil()", - "CEIL(f3)", - "43") - } - - // ---------------------------------------------------------------------------------------------- - - def testFunction( - expr: Expression, - exprString: String, - sqlExpr: String, - expected: String): Unit = { - val testData = new Row(15) - testData.setField(0, "This is a test String.") - testData.setField(1, true) - testData.setField(2, 42.toByte) - testData.setField(3, 43.toShort) - testData.setField(4, 44.toLong) - testData.setField(5, 4.5.toFloat) - testData.setField(6, 4.6) - testData.setField(7, 3) - testData.setField(8, " This is a test String. ") - testData.setField(9, -42.toByte) - testData.setField(10, -43.toShort) - testData.setField(11, -44.toLong) - testData.setField(12, -4.5.toFloat) - testData.setField(13, -4.6) - testData.setField(14, -3) - - val typeInfo = new RowTypeInfo(Seq( - STRING_TYPE_INFO, - BOOLEAN_TYPE_INFO, - BYTE_TYPE_INFO, - SHORT_TYPE_INFO, - LONG_TYPE_INFO, - FLOAT_TYPE_INFO, - DOUBLE_TYPE_INFO, - INT_TYPE_INFO, - STRING_TYPE_INFO, - BYTE_TYPE_INFO, - SHORT_TYPE_INFO, - LONG_TYPE_INFO, - FLOAT_TYPE_INFO, - DOUBLE_TYPE_INFO, - INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]] - - val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr) - assertEquals(expected, exprResult) - - val exprStringResult = ExpressionEvaluator.evaluate( - testData, - typeInfo, - ExpressionParser.parseExpression(exprString)) - assertEquals(expected, exprStringResult) - - val exprSqlResult = ExpressionEvaluator.evaluate(testData, typeInfo, sqlExpr) - assertEquals(expected, exprSqlResult) - } - - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala deleted file mode 100644 index fe606e0..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expression.utils - -import org.apache.calcite.rel.logical.LogicalProject -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR -import org.apache.calcite.tools.{Frameworks, RelBuilder} -import org.apache.flink.api.common.functions.{Function, MapFunction} -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.{DataSet => JDataSet} -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} -import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.runtime.FunctionCompiler -import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment} -import org.mockito.Mockito._ - -/** - * Utility to translate and evaluate an RexNode or Table API expression to a String. - */ -object ExpressionEvaluator { - - // TestCompiler that uses current class loader - class TestCompiler[T <: Function] extends FunctionCompiler[T] { - def compile(genFunc: GeneratedFunction[T]): Class[T] = - compile(getClass.getClassLoader, genFunc.name, genFunc.code) - } - - private def prepareTable( - typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = { - - // create DataSetTable - val dataSetMock = mock(classOf[DataSet[Any]]) - val jDataSetMock = mock(classOf[JDataSet[Any]]) - when(dataSetMock.javaSet).thenReturn(jDataSetMock) - when(jDataSetMock.getType).thenReturn(typeInfo) - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val tableName = "myTable" - tEnv.registerDataSet(tableName, dataSetMock) - - // prepare RelBuilder - val relBuilder = tEnv.getRelBuilder - relBuilder.scan(tableName) - - (tableName, relBuilder, tEnv) - } - - def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): String = { - // create DataSetTable - val table = prepareTable(typeInfo) - - // create RelNode from SQL expression - val planner = Frameworks.getPlanner(table._3.getFrameworkConfig) - val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1) - val validated = planner.validate(parsed) - val converted = planner.rel(validated) - - val expr: RexNode = converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0) - - evaluate(data, typeInfo, table._2, expr) - } - - def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = { - val table = prepareTable(typeInfo) - val env = table._3 - val resolvedExpr = - env.asInstanceOf[BatchTableEnvironment].scan("myTable").select(expr). - getRelNode.asInstanceOf[LogicalProject].getChildExps.get(0) - evaluate(data, typeInfo, table._2, resolvedExpr) - } - - def evaluate( - data: Any, - typeInfo: TypeInformation[Any], - relBuilder: RelBuilder, - rexNode: RexNode): String = { - // generate code for Mapper - val config = new TableConfig() - val generator = new CodeGenerator(config, false, typeInfo) - val genExpr = generator.generateExpression(relBuilder.cast(rexNode, VARCHAR)) // cast to String - val bodyCode = - s""" - |${genExpr.code} - |return ${genExpr.resultTerm}; - |""".stripMargin - val genFunc = generator.generateFunction[MapFunction[Any, String]]( - "TestFunction", - classOf[MapFunction[Any, String]], - bodyCode, - STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]]) - - // compile and evaluate - val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc) - val mapper = clazz.newInstance() - mapper.map(data) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala deleted file mode 100644 index 9ceb9d2..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.typeutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer} -import org.apache.flink.api.java.tuple -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} -import org.apache.flink.api.scala.typeutils.RowComparatorTest.MyPojo -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.junit.Assert._ - -class RowComparatorTest extends ComparatorTestBase[Row] { - - val typeInfo = new RowTypeInfo( - Array( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo[tuple.Tuple2[Int, Boolean]]( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.SHORT_TYPE_INFO), - TypeExtractor.createTypeInfo(classOf[MyPojo]))) - - val testPojo1 = new MyPojo() - // TODO we cannot test null here as PojoComparator has no support for null keys - testPojo1.name = "" - val testPojo2 = new MyPojo() - testPojo2.name = "Test1" - val testPojo3 = new MyPojo() - testPojo3.name = "Test2" - - val data: Array[Row] = Array( - createRow(null, null, null, null, null), - createRow(0, null, null, null, null), - createRow(0, 0.0, null, null, null), - createRow(0, 0.0, "a", null, null), - createRow(1, 0.0, "a", null, null), - createRow(1, 1.0, "a", null, null), - createRow(1, 1.0, "b", null, null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3) - ) - - override protected def deepEquals(message: String, should: Row, is: Row): Unit = { - val arity = should.productArity - assertEquals(message, arity, is.productArity) - var index = 0 - while (index < arity) { - val copiedValue: Any = should.productElement(index) - val element: Any = is.productElement(index) - assertEquals(message, element, copiedValue) - index += 1 - } - } - - override protected def createComparator(ascending: Boolean): TypeComparator[Row] = { - typeInfo.createComparator( - Array(0, 1, 2, 3, 4, 5, 6), - Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending), - 0, - new ExecutionConfig()) - } - - override protected def createSerializer(): TypeSerializer[Row] = { - typeInfo.createSerializer(new ExecutionConfig()) - } - - override protected def getSortedTestData: Array[Row] = { - data - } - - override protected def supportsNullKeys: Boolean = true - - def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = { - val r: Row = new Row(5) - r.setField(0, f0) - r.setField(1, f1) - r.setField(2, f2) - r.setField(3, f3) - r.setField(4, f4) - r - } -} - -object RowComparatorTest { - - class MyPojo() extends Serializable with Comparable[MyPojo] { - // we cannot use null because the PojoComparator does not support null properly - var name: String = "" - - override def compareTo(o: MyPojo): Int = { - if (name == null && o.name == null) { - 0 - } - else if (name == null) { - -1 - } - else if (o.name == null) { - 1 - } - else { - name.compareTo(o.name) - } - } - - override def equals(other: Any): Boolean = other match { - case that: MyPojo => compareTo(that) == 0 - case _ => false - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala deleted file mode 100644 index 95a1bb5..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer} -import org.apache.flink.api.java.tuple -import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo} -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.RowSerializerTest.MyPojo -import org.junit.Assert._ -import org.junit.Test - -class RowSerializerTest { - - class RowSerializerTestInstance( - serializer: TypeSerializer[Row], - testData: Array[Row]) - extends SerializerTestInstance[Row](serializer, classOf[Row], -1, testData: _*) { - - override protected def deepEquals(message: String, should: Row, is: Row): Unit = { - val arity = should.productArity - assertEquals(message, arity, is.productArity) - var index = 0 - while (index < arity) { - val copiedValue: Any = should.productElement(index) - val element: Any = is.productElement(index) - assertEquals(message, element, copiedValue) - index += 1 - } - } - } - - @Test - def testRowSerializer(): Unit = { - val rowInfo: TypeInformation[Row] = new RowTypeInfo( - Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) - - val row1 = new Row(2) - row1.setField(0, 1) - row1.setField(1, "a") - - val row2 = new Row(2) - row2.setField(0, 2) - row2.setField(1, null) - - val testData: Array[Row] = Array(row1, row2) - - val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) - - val testInstance = new RowSerializerTestInstance(rowSerializer, testData) - - testInstance.testAll() - } - - @Test - def testLargeRowSerializer(): Unit = { - val rowInfo: TypeInformation[Row] = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val row = new Row(13) - row.setField(0, 2) - row.setField(1, null) - row.setField(3, null) - row.setField(4, null) - row.setField(5, null) - row.setField(6, null) - row.setField(7, null) - row.setField(8, null) - row.setField(9, null) - row.setField(10, null) - row.setField(11, null) - row.setField(12, "Test") - - val testData: Array[Row] = Array(row) - - val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) - - val testInstance = new RowSerializerTestInstance(rowSerializer, testData) - - testInstance.testAll() - } - - @Test - def testRowSerializerWithComplexTypes(): Unit = { - val rowInfo = new RowTypeInfo( - Array( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo[tuple.Tuple2[Int, Boolean]]( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.SHORT_TYPE_INFO), - TypeExtractor.createTypeInfo(classOf[MyPojo]))) - - val testPojo1 = new MyPojo() - testPojo1.name = null - val testPojo2 = new MyPojo() - testPojo2.name = "Test1" - val testPojo3 = new MyPojo() - testPojo3.name = "Test2" - - val testData: Array[Row] = Array( - createRow(null, null, null, null, null), - createRow(0, null, null, null, null), - createRow(0, 0.0, null, null, null), - createRow(0, 0.0, "a", null, null), - createRow(1, 0.0, "a", null, null), - createRow(1, 1.0, "a", null, null), - createRow(1, 1.0, "b", null, null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3) - ) - - val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) - - val testInstance = new RowSerializerTestInstance(rowSerializer, testData) - - testInstance.testAll() - } - - // ---------------------------------------------------------------------------------------------- - - def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = { - val r: Row = new Row(5) - r.setField(0, f0) - r.setField(1, f1) - r.setField(2, f2) - r.setField(3, f3) - r.setField(4, f4) - r - } -} - -object RowSerializerTest { - class MyPojo() extends Serializable with Comparable[MyPojo] { - var name: String = null - - override def compareTo(o: MyPojo): Int = { - if (name == null && o.name == null) { - 0 - } - else if (name == null) { - -1 - } - else if (o.name == null) { - 1 - } - else { - name.compareTo(o.name) - } - } - - override def equals(other: Any): Boolean = other match { - case that: MyPojo => compareTo(that) == 0 - case _ => false - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala new file mode 100644 index 0000000..a3b67f5 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.expressions.utils.ExpressionEvaluator +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.junit.Assert.assertEquals +import org.junit.Test + +class ScalarFunctionsTest { + + // ---------------------------------------------------------------------------------------------- + // String functions + // ---------------------------------------------------------------------------------------------- + + @Test + def testSubstring(): Unit = { + testFunction( + 'f0.substring(2), + "f0.substring(2)", + "SUBSTRING(f0, 2)", + "his is a test String.") + + testFunction( + 'f0.substring(2, 5), + "f0.substring(2, 5)", + "SUBSTRING(f0, 2, 5)", + "his i") + + testFunction( + 'f0.substring(1, 'f7), + "f0.substring(1, f7)", + "SUBSTRING(f0, 1, f7)", + "Thi") + } + + @Test + def testTrim(): Unit = { + testFunction( + 'f8.trim(), + "f8.trim()", + "TRIM(f8)", + "This is a test String.") + + testFunction( + 'f8.trim(removeLeading = true, removeTrailing = true, " "), + "trim(f8)", + "TRIM(f8)", + "This is a test String.") + + testFunction( + 'f8.trim(removeLeading = false, removeTrailing = true, " "), + "f8.trim(TRAILING, ' ')", + "TRIM(TRAILING FROM f8)", + " This is a test String.") + + testFunction( + 'f0.trim(removeLeading = true, removeTrailing = true, "."), + "trim(BOTH, '.', f0)", + "TRIM(BOTH '.' FROM f0)", + "This is a test String") + } + + @Test + def testCharLength(): Unit = { + testFunction( + 'f0.charLength(), + "f0.charLength()", + "CHAR_LENGTH(f0)", + "22") + + testFunction( + 'f0.charLength(), + "charLength(f0)", + "CHARACTER_LENGTH(f0)", + "22") + } + + @Test + def testUpperCase(): Unit = { + testFunction( + 'f0.upperCase(), + "f0.upperCase()", + "UPPER(f0)", + "THIS IS A TEST STRING.") + } + + @Test + def testLowerCase(): Unit = { + testFunction( + 'f0.lowerCase(), + "f0.lowerCase()", + "LOWER(f0)", + "this is a test string.") + } + + @Test + def testInitCap(): Unit = { + testFunction( + 'f0.initCap(), + "f0.initCap()", + "INITCAP(f0)", + "This Is A Test String.") + } + + @Test + def testConcat(): Unit = { + testFunction( + 'f0 + 'f0, + "f0 + f0", + "f0||f0", + "This is a test String.This is a test String.") + } + + @Test + def testLike(): Unit = { + testFunction( + 'f0.like("Th_s%"), + "f0.like('Th_s%')", + "f0 LIKE 'Th_s%'", + "true") + + testFunction( + 'f0.like("%is a%"), + "f0.like('%is a%')", + "f0 LIKE '%is a%'", + "true") + } + + @Test + def testNotLike(): Unit = { + testFunction( + !'f0.like("Th_s%"), + "!f0.like('Th_s%')", + "f0 NOT LIKE 'Th_s%'", + "false") + + testFunction( + !'f0.like("%is a%"), + "!f0.like('%is a%')", + "f0 NOT LIKE '%is a%'", + "false") + } + + @Test + def testSimilar(): Unit = { + testFunction( + 'f0.similar("_*"), + "f0.similar('_*')", + "f0 SIMILAR TO '_*'", + "true") + + testFunction( + 'f0.similar("This (is)? a (test)+ Strin_*"), + "f0.similar('This (is)? a (test)+ Strin_*')", + "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'", + "true") + } + + @Test + def testNotSimilar(): Unit = { + testFunction( + !'f0.similar("_*"), + "!f0.similar('_*')", + "f0 NOT SIMILAR TO '_*'", + "false") + + testFunction( + !'f0.similar("This (is)? a (test)+ Strin_*"), + "!f0.similar('This (is)? a (test)+ Strin_*')", + "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'", + "false") + } + + @Test + def testMod(): Unit = { + testFunction( + 'f4.mod('f7), + "f4.mod(f7)", + "MOD(f4, f7)", + "2") + + testFunction( + 'f4.mod(3), + "mod(f4, 3)", + "MOD(f4, 3)", + "2") + + testFunction( + 'f4 % 3, + "mod(44, 3)", + "MOD(44, 3)", + "2") + + } + + @Test + def testExp(): Unit = { + testFunction( + 'f2.exp(), + "f2.exp()", + "EXP(f2)", + math.exp(42.toByte).toString) + + testFunction( + 'f3.exp(), + "f3.exp()", + "EXP(f3)", + math.exp(43.toShort).toString) + + testFunction( + 'f4.exp(), + "f4.exp()", + "EXP(f4)", + math.exp(44.toLong).toString) + + testFunction( + 'f5.exp(), + "f5.exp()", + "EXP(f5)", + math.exp(4.5.toFloat).toString) + + testFunction( + 'f6.exp(), + "f6.exp()", + "EXP(f6)", + math.exp(4.6).toString) + + testFunction( + 'f7.exp(), + "exp(3)", + "EXP(3)", + math.exp(3).toString) + } + + @Test + def testLog10(): Unit = { + testFunction( + 'f2.log10(), + "f2.log10()", + "LOG10(f2)", + math.log10(42.toByte).toString) + + testFunction( + 'f3.log10(), + "f3.log10()", + "LOG10(f3)", + math.log10(43.toShort).toString) + + testFunction( + 'f4.log10(), + "f4.log10()", + "LOG10(f4)", + math.log10(44.toLong).toString) + + testFunction( + 'f5.log10(), + "f5.log10()", + "LOG10(f5)", + math.log10(4.5.toFloat).toString) + + testFunction( + 'f6.log10(), + "f6.log10()", + "LOG10(f6)", + math.log10(4.6).toString) + } + + @Test + def testPower(): Unit = { + testFunction( + 'f2.power('f7), + "f2.power(f7)", + "POWER(f2, f7)", + math.pow(42.toByte, 3).toString) + + testFunction( + 'f3.power('f6), + "f3.power(f6)", + "POWER(f3, f6)", + math.pow(43.toShort, 4.6D).toString) + + testFunction( + 'f4.power('f5), + "f4.power(f5)", + "POWER(f4, f5)", + math.pow(44.toLong, 4.5.toFloat).toString) + } + + @Test + def testLn(): Unit = { + testFunction( + 'f2.ln(), + "f2.ln()", + "LN(f2)", + math.log(42.toByte).toString) + + testFunction( + 'f3.ln(), + "f3.ln()", + "LN(f3)", + math.log(43.toShort).toString) + + testFunction( + 'f4.ln(), + "f4.ln()", + "LN(f4)", + math.log(44.toLong).toString) + + testFunction( + 'f5.ln(), + "f5.ln()", + "LN(f5)", + math.log(4.5.toFloat).toString) + + testFunction( + 'f6.ln(), + "f6.ln()", + "LN(f6)", + math.log(4.6).toString) + } + + @Test + def testAbs(): Unit = { + testFunction( + 'f2.abs(), + "f2.abs()", + "ABS(f2)", + "42") + + testFunction( + 'f3.abs(), + "f3.abs()", + "ABS(f3)", + "43") + + testFunction( + 'f4.abs(), + "f4.abs()", + "ABS(f4)", + "44") + + testFunction( + 'f5.abs(), + "f5.abs()", + "ABS(f5)", + "4.5") + + testFunction( + 'f6.abs(), + "f6.abs()", + "ABS(f6)", + "4.6") + + testFunction( + 'f9.abs(), + "f9.abs()", + "ABS(f9)", + "42") + + testFunction( + 'f10.abs(), + "f10.abs()", + "ABS(f10)", + "43") + + testFunction( + 'f11.abs(), + "f11.abs()", + "ABS(f11)", + "44") + + testFunction( + 'f12.abs(), + "f12.abs()", + "ABS(f12)", + "4.5") + + testFunction( + 'f13.abs(), + "f13.abs()", + "ABS(f13)", + "4.6") + } + + @Test + def testArithmeticFloorCeil(): Unit = { + testFunction( + 'f5.floor(), + "f5.floor()", + "FLOOR(f5)", + "4.0") + + testFunction( + 'f5.ceil(), + "f5.ceil()", + "CEIL(f5)", + "5.0") + + testFunction( + 'f3.floor(), + "f3.floor()", + "FLOOR(f3)", + "43") + + testFunction( + 'f3.ceil(), + "f3.ceil()", + "CEIL(f3)", + "43") + } + + // ---------------------------------------------------------------------------------------------- + + def testFunction( + expr: Expression, + exprString: String, + sqlExpr: String, + expected: String): Unit = { + val testData = new Row(15) + testData.setField(0, "This is a test String.") + testData.setField(1, true) + testData.setField(2, 42.toByte) + testData.setField(3, 43.toShort) + testData.setField(4, 44.toLong) + testData.setField(5, 4.5.toFloat) + testData.setField(6, 4.6) + testData.setField(7, 3) + testData.setField(8, " This is a test String. ") + testData.setField(9, -42.toByte) + testData.setField(10, -43.toShort) + testData.setField(11, -44.toLong) + testData.setField(12, -4.5.toFloat) + testData.setField(13, -4.6) + testData.setField(14, -3) + + val typeInfo = new RowTypeInfo(Seq( + STRING_TYPE_INFO, + BOOLEAN_TYPE_INFO, + BYTE_TYPE_INFO, + SHORT_TYPE_INFO, + LONG_TYPE_INFO, + FLOAT_TYPE_INFO, + DOUBLE_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + BYTE_TYPE_INFO, + SHORT_TYPE_INFO, + LONG_TYPE_INFO, + FLOAT_TYPE_INFO, + DOUBLE_TYPE_INFO, + INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]] + + val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr) + assertEquals(expected, exprResult) + + val exprStringResult = ExpressionEvaluator.evaluate( + testData, + typeInfo, + ExpressionParser.parseExpression(exprString)) + assertEquals(expected, exprStringResult) + + val exprSqlResult = ExpressionEvaluator.evaluate(testData, typeInfo, sqlExpr) + assertEquals(expected, exprSqlResult) + } + + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala new file mode 100644 index 0000000..0b5a2de --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.utils + +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR +import org.apache.calcite.tools.{Frameworks, RelBuilder} +import org.apache.flink.api.common.functions.{Function, MapFunction} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.{DataSet => JDataSet} +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.runtime.FunctionCompiler +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment} +import org.mockito.Mockito._ + +/** + * Utility to translate and evaluate an RexNode or Table API expression to a String. + */ +object ExpressionEvaluator { + + // TestCompiler that uses current class loader + class TestCompiler[T <: Function] extends FunctionCompiler[T] { + def compile(genFunc: GeneratedFunction[T]): Class[T] = + compile(getClass.getClassLoader, genFunc.name, genFunc.code) + } + + private def prepareTable( + typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = { + + // create DataSetTable + val dataSetMock = mock(classOf[DataSet[Any]]) + val jDataSetMock = mock(classOf[JDataSet[Any]]) + when(dataSetMock.javaSet).thenReturn(jDataSetMock) + when(jDataSetMock.getType).thenReturn(typeInfo) + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val tableName = "myTable" + tEnv.registerDataSet(tableName, dataSetMock) + + // prepare RelBuilder + val relBuilder = tEnv.getRelBuilder + relBuilder.scan(tableName) + + (tableName, relBuilder, tEnv) + } + + def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): String = { + // create DataSetTable + val table = prepareTable(typeInfo) + + // create RelNode from SQL expression + val planner = Frameworks.getPlanner(table._3.getFrameworkConfig) + val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1) + val validated = planner.validate(parsed) + val converted = planner.rel(validated) + + val expr: RexNode = converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0) + + evaluate(data, typeInfo, table._2, expr) + } + + def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = { + val table = prepareTable(typeInfo) + val env = table._3 + val resolvedExpr = + env.asInstanceOf[BatchTableEnvironment].scan("myTable").select(expr). + getRelNode.asInstanceOf[LogicalProject].getChildExps.get(0) + evaluate(data, typeInfo, table._2, resolvedExpr) + } + + def evaluate( + data: Any, + typeInfo: TypeInformation[Any], + relBuilder: RelBuilder, + rexNode: RexNode): String = { + // generate code for Mapper + val config = new TableConfig() + val generator = new CodeGenerator(config, false, typeInfo) + val genExpr = generator.generateExpression(relBuilder.cast(rexNode, VARCHAR)) // cast to String + val bodyCode = + s""" + |${genExpr.code} + |return ${genExpr.resultTerm}; + |""".stripMargin + val genFunc = generator.generateFunction[MapFunction[Any, String]]( + "TestFunction", + classOf[MapFunction[Any, String]], + bodyCode, + STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]]) + + // compile and evaluate + val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc) + val mapper = clazz.newInstance() + mapper.map(data) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala new file mode 100644 index 0000000..557db3a --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer} +import org.apache.flink.api.java.tuple +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo +import org.junit.Assert._ + +class RowComparatorTest extends ComparatorTestBase[Row] { + + val typeInfo = new RowTypeInfo( + Array( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + new TupleTypeInfo[tuple.Tuple2[Int, Boolean]]( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO), + TypeExtractor.createTypeInfo(classOf[MyPojo]))) + + val testPojo1 = new MyPojo() + // TODO we cannot test null here as PojoComparator has no support for null keys + testPojo1.name = "" + val testPojo2 = new MyPojo() + testPojo2.name = "Test1" + val testPojo3 = new MyPojo() + testPojo3.name = "Test2" + + val data: Array[Row] = Array( + createRow(null, null, null, null, null), + createRow(0, null, null, null, null), + createRow(0, 0.0, null, null, null), + createRow(0, 0.0, "a", null, null), + createRow(1, 0.0, "a", null, null), + createRow(1, 1.0, "a", null, null), + createRow(1, 1.0, "b", null, null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3) + ) + + override protected def deepEquals(message: String, should: Row, is: Row): Unit = { + val arity = should.productArity + assertEquals(message, arity, is.productArity) + var index = 0 + while (index < arity) { + val copiedValue: Any = should.productElement(index) + val element: Any = is.productElement(index) + assertEquals(message, element, copiedValue) + index += 1 + } + } + + override protected def createComparator(ascending: Boolean): TypeComparator[Row] = { + typeInfo.createComparator( + Array(0, 1, 2, 3, 4, 5, 6), + Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending), + 0, + new ExecutionConfig()) + } + + override protected def createSerializer(): TypeSerializer[Row] = { + typeInfo.createSerializer(new ExecutionConfig()) + } + + override protected def getSortedTestData: Array[Row] = { + data + } + + override protected def supportsNullKeys: Boolean = true + + def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = { + val r: Row = new Row(5) + r.setField(0, f0) + r.setField(1, f1) + r.setField(2, f2) + r.setField(3, f3) + r.setField(4, f4) + r + } +} + +object RowComparatorTest { + + class MyPojo() extends Serializable with Comparable[MyPojo] { + // we cannot use null because the PojoComparator does not support null properly + var name: String = "" + + override def compareTo(o: MyPojo): Int = { + if (name == null && o.name == null) { + 0 + } + else if (name == null) { + -1 + } + else if (o.name == null) { + 1 + } + else { + name.compareTo(o.name) + } + } + + override def equals(other: Any): Boolean = other match { + case that: MyPojo => compareTo(that) == 0 + case _ => false + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala new file mode 100644 index 0000000..b52dd4d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer} +import org.apache.flink.api.java.tuple +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.typeutils.RowSerializerTest.MyPojo +import org.junit.Assert._ +import org.junit.Test + +class RowSerializerTest { + + class RowSerializerTestInstance( + serializer: TypeSerializer[Row], + testData: Array[Row]) + extends SerializerTestInstance[Row](serializer, classOf[Row], -1, testData: _*) { + + override protected def deepEquals(message: String, should: Row, is: Row): Unit = { + val arity = should.productArity + assertEquals(message, arity, is.productArity) + var index = 0 + while (index < arity) { + val copiedValue: Any = should.productElement(index) + val element: Any = is.productElement(index) + assertEquals(message, element, copiedValue) + index += 1 + } + } + } + + @Test + def testRowSerializer(): Unit = { + val rowInfo: TypeInformation[Row] = new RowTypeInfo( + Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) + + val row1 = new Row(2) + row1.setField(0, 1) + row1.setField(1, "a") + + val row2 = new Row(2) + row2.setField(0, 2) + row2.setField(1, null) + + val testData: Array[Row] = Array(row1, row2) + + val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) + + val testInstance = new RowSerializerTestInstance(rowSerializer, testData) + + testInstance.testAll() + } + + @Test + def testLargeRowSerializer(): Unit = { + val rowInfo: TypeInformation[Row] = new RowTypeInfo(Seq( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) + + val row = new Row(13) + row.setField(0, 2) + row.setField(1, null) + row.setField(3, null) + row.setField(4, null) + row.setField(5, null) + row.setField(6, null) + row.setField(7, null) + row.setField(8, null) + row.setField(9, null) + row.setField(10, null) + row.setField(11, null) + row.setField(12, "Test") + + val testData: Array[Row] = Array(row) + + val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) + + val testInstance = new RowSerializerTestInstance(rowSerializer, testData) + + testInstance.testAll() + } + + @Test + def testRowSerializerWithComplexTypes(): Unit = { + val rowInfo = new RowTypeInfo( + Array( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + new TupleTypeInfo[tuple.Tuple2[Int, Boolean]]( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO), + TypeExtractor.createTypeInfo(classOf[MyPojo]))) + + val testPojo1 = new MyPojo() + testPojo1.name = null + val testPojo2 = new MyPojo() + testPojo2.name = "Test1" + val testPojo3 = new MyPojo() + testPojo3.name = "Test2" + + val testData: Array[Row] = Array( + createRow(null, null, null, null, null), + createRow(0, null, null, null, null), + createRow(0, 0.0, null, null, null), + createRow(0, 0.0, "a", null, null), + createRow(1, 0.0, "a", null, null), + createRow(1, 1.0, "a", null, null), + createRow(1, 1.0, "b", null, null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2), + createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3) + ) + + val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) + + val testInstance = new RowSerializerTestInstance(rowSerializer, testData) + + testInstance.testAll() + } + + // ---------------------------------------------------------------------------------------------- + + def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = { + val r: Row = new Row(5) + r.setField(0, f0) + r.setField(1, f1) + r.setField(2, f2) + r.setField(3, f3) + r.setField(4, f4) + r + } +} + +object RowSerializerTest { + class MyPojo() extends Serializable with Comparable[MyPojo] { + var name: String = null + + override def compareTo(o: MyPojo): Int = { + if (name == null && o.name == null) { + 0 + } + else if (name == null) { + -1 + } + else if (o.name == null) { + 1 + } + else { + name.compareTo(o.name) + } + } + + override def equals(other: Any): Boolean = other match { + case that: MyPojo => compareTo(that) == 0 + case _ => false + } + } +}