[FLINK-8038] [table] Support map value constructor, cardinality, and item This closes #5015.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5f5615c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5f5615c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5f5615c Branch: refs/heads/master Commit: c5f5615cf84026039614701b5e6b3b0e003eada0 Parents: 52599ff Author: Rong Rong <ro...@uber.com> Authored: Tue Nov 14 10:48:16 2017 -0800 Committer: twalthr <twal...@apache.org> Committed: Tue Nov 21 17:09:02 2017 +0100 ---------------------------------------------------------------------- docs/dev/table/sql.md | 45 +++++ docs/dev/table/tableApi.md | 48 ++++- .../flink/table/api/scala/expressionDsl.scala | 25 ++- .../flink/table/calcite/FlinkTypeFactory.scala | 11 ++ .../flink/table/codegen/CodeGenUtils.scala | 5 + .../flink/table/codegen/CodeGenerator.scala | 38 +++- .../table/codegen/calls/ScalarOperators.scala | 64 +++++++ .../table/expressions/ExpressionParser.scala | 2 + .../table/expressions/ExpressionUtils.scala | 4 + .../apache/flink/table/expressions/array.scala | 60 ------- .../flink/table/expressions/cardinality.scala | 50 ++++++ .../apache/flink/table/expressions/item.scala | 76 ++++++++ .../apache/flink/table/expressions/map.scala | 76 ++++++++ .../flink/table/plan/ProjectionTranslator.scala | 7 + .../flink/table/typeutils/TypeCheckUtils.scala | 5 +- .../flink/table/validate/FunctionCatalog.scala | 16 +- .../flink/table/expressions/ArrayTypeTest.scala | 9 + .../flink/table/expressions/MapTypeTest.scala | 173 ++++++++++++++++++- .../table/expressions/SqlExpressionTest.scala | 2 + .../expressions/utils/MapTypeTestBase.scala | 23 ++- .../validation/MapTypeValidationTest.scala | 2 +- 21 files changed, 652 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 3097d9e..50aa933 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -2310,6 +2310,51 @@ ELEMENT(ARRAY) </tbody> </table> +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 40%">Map functions</th> + <th class="text-center">Description</th> + </tr> + </thead> + + <tbody> + + <tr> + <td> + {% highlight text %} +MAP â[â key, value [, key, value ]* â]â +{% endhighlight %} + </td> + <td> + <p>Creates a map from a list of key-value pairs.</p> + </td> + </tr> + + <tr> + <td> + {% highlight text %} +CARDINALITY(MAP) +{% endhighlight %} + </td> + <td> + <p>Returns the number of entries of a map.</p> + </td> + </tr> + + <tr> + <td> + {% highlight text %} +map â[â key â]â +{% endhighlight %} + </td> + <td> + <p>Returns the value specified by a particular key in a map.</p> + </td> + </tr> + </tbody> +</table> + ### Unsupported Functions The following functions are not supported yet: http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index f5a2059..498dbbc 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -1610,7 +1610,7 @@ rowInterval = composite , "." , "rows" ; cast = composite , ".cast(" , dataType , ")" ; -dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ; +dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ; as = composite , ".as(" , fieldReference , ")" ; @@ -2950,6 +2950,52 @@ ARRAY.element() <table class="table table-bordered"> <thead> <tr> + <th class="text-left" style="width: 40%">Map functions</th> + <th class="text-center">Description</th> + </tr> + </thead> + + <tbody> + + <tr> + <td> + {% highlight java %} +map(ANY, ANY [, ANY, ANY ]*) +{% endhighlight %} + </td> + <td> + <p>Creates a map from a list of key-value pairs.</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} +MAP.cardinality() +{% endhighlight %} + </td> + <td> + <p>Returns the number of entries of a map.</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} +MAP.at(ANY) +{% endhighlight %} + </td> + <td> + <p>Returns the value specified by a particular key in a map.</p> + </td> + </tr> + + </tbody> +</table> + +<table class="table table-bordered"> + <thead> + <tr> <th class="text-left" style="width: 40%">Auxiliary functions</th> <th class="text-center">Description</th> </tr> http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index b62e142..72a5561 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -683,19 +683,19 @@ trait ImplicitExpressionOperations { def flatten() = Flattening(expr) /** - * Accesses the element of an array based on an index (starting at 1). + * Accesses the element of an array or map based on a key or an index (starting at 1). * - * @param index position of the element (starting at 1) + * @param index key or position of the element (array index starting at 1) * @return value of the element */ - def at(index: Expression) = ArrayElementAt(expr, index) + def at(index: Expression) = ItemAt(expr, index) /** - * Returns the number of elements of an array. + * Returns the number of elements of an array or number of entries of a map. * - * @return number of elements + * @return number of elements or entries */ - def cardinality() = ArrayCardinality(expr) + def cardinality() = Cardinality(expr) /** * Returns the sole element of an array with a single element. Returns null if the array is @@ -960,6 +960,19 @@ object array { } /** + * Creates a map of literals. The map will be a map between two objects (not primitives). + */ +object map { + + /** + * Creates a map of literals. The map will be a map between two objects (not primitives). + */ + def apply(key: Expression, value: Expression, tail: Expression*): Expression = { + MapConstructor(Seq(key, value) ++ tail.toSeq) + } +} + +/** * Returns a value that is closer than any other value to pi. */ object pi { http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 7bcdc0f..448029b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -222,6 +222,17 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp canonize(relType) } + override def createMapType(keyType: RelDataType, valueType: RelDataType): RelDataType = { + val relType = new MapRelDataType( + new MapTypeInfo( + FlinkTypeFactory.toTypeInfo(keyType), + FlinkTypeFactory.toTypeInfo(valueType)), + keyType, + valueType, + isNullable = false) + this.canonize(relType) + } + override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = { val relType = new MultisetRelDataType( MultisetTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)), http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala index 161f9a3..d4ba902 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala @@ -198,6 +198,11 @@ object CodeGenUtils { throw new CodeGenException("Array expression type expected.") } + def requireMap(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isMap(genExpr.resultType)) { + throw new CodeGenException("Map expression type expected.") + } + def requireInteger(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isInteger(genExpr.resultType)) { throw new CodeGenException("Integer expression type expected.") http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index a794b08..b51cdbe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -957,6 +957,10 @@ abstract class CodeGenerator( case ARRAY_VALUE_CONSTRUCTOR => generateArray(this, resultType, operands) + // maps + case MAP_VALUE_CONSTRUCTOR => + generateMap(this, resultType, operands) + case ITEM => operands.head.resultType match { case _: ObjectArrayTypeInfo[_, _] | @@ -975,9 +979,21 @@ abstract class CodeGenerator( } case CARDINALITY => - val array = operands.head - requireArray(array) - generateArrayCardinality(nullCheck, array) + operands.head.resultType match { + case _: ObjectArrayTypeInfo[_, _] | + _: BasicArrayTypeInfo[_, _] | + _: PrimitiveArrayTypeInfo[_] => + val array = operands.head + requireArray(array) + generateArrayCardinality(nullCheck, array) + + case _: MapTypeInfo[_, _] => + val map = operands.head + requireMap(map) + generateMapCardinality(nullCheck, map) + + case _ => throw new CodeGenException("Expect an array or a map.") + } case ELEMENT => val array = operands.head @@ -1562,6 +1578,22 @@ abstract class CodeGenerator( } /** + * Adds a reusable hash map to the member area of the generated [[Function]]. + */ + def addReusableMap(clazz: Class[_]): String = { + val fieldTerm = newName("map") + val classQualifier = "java.util.Map" + val initMap = "java.util.HashMap()" + val fieldMap = + s""" + |final $classQualifier $fieldTerm = + | new $initMap; + |""".stripMargin + reusableMemberStatements.add(fieldMap) + fieldTerm + } + + /** * Adds a reusable timestamp to the beginning of the SAM of the generated [[Function]]. */ def addReusableTimestamp(): String = { http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index bd5b1f7..522d826 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -188,6 +188,13 @@ object ScalarOperators { (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)" } } + // map types + else if (isMap(left.resultType) && + left.resultType.getTypeClass == right.resultType.getTypeClass) { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + (leftTerm, rightTerm) => s"java.util.Map.equals($leftTerm, $rightTerm)" + } + } // comparable types of same type else if (isComparable(left.resultType) && left.resultType == right.resultType) { generateComparison("==", nullCheck, left, right) @@ -229,6 +236,13 @@ object ScalarOperators { (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)" } } + // map types + else if (isMap(left.resultType) && + left.resultType.getTypeClass == right.resultType.getTypeClass) { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + (leftTerm, rightTerm) => s"!java.util.Map.equals($leftTerm, $rightTerm)" + } + } // comparable types else if (isComparable(left.resultType) && left.resultType == right.resultType) { generateComparison("!=", nullCheck, left, right) @@ -1061,6 +1075,47 @@ object ScalarOperators { GeneratedExpression(resultTerm, nullTerm, operatorCode, Types.STRING) } + def generateMap( + codeGenerator: CodeGenerator, + resultType: TypeInformation[_], + elements: Seq[GeneratedExpression]) + : GeneratedExpression = { + val mapTerm = codeGenerator.addReusableMap(resultType.getTypeClass) + + val boxedElements: Seq[GeneratedExpression] = resultType match { + case mti: MapTypeInfo[_, _] => + // we box the elements to also represent null values + val boxedKeyTypeTerm = boxedTypeTermForTypeInfo(mti.getKeyTypeInfo) + val boxedValueTypeTerm = boxedTypeTermForTypeInfo(mti.getValueTypeInfo) + + elements.zipWithIndex.map { case (element, idx) => + val boxedExpr = codeGenerator.generateOutputFieldBoxing(element) + val exprOrNull: String = if (codeGenerator.nullCheck) { + if (idx % 2 == 0) { + s"${boxedExpr.nullTerm} ? null : ($boxedKeyTypeTerm) ${boxedExpr.resultTerm}" + } else { + s"${boxedExpr.nullTerm} ? null : ($boxedValueTypeTerm) ${boxedExpr.resultTerm}" + } + } else { + boxedExpr.resultTerm + } + boxedExpr.copy(resultTerm = exprOrNull) + } + } + + val code = boxedElements.grouped(2) + .map { case Seq(key, value) => + s""" + |${key.code} + |${value.code} + |$mapTerm.put(${key.resultTerm}, ${value.resultTerm}); + |""".stripMargin + } + .mkString("\n") + + GeneratedExpression(mapTerm, GeneratedExpression.NEVER_NULL, code, resultType) + } + def generateMapGet( codeGenerator: CodeGenerator, map: GeneratedExpression, @@ -1091,6 +1146,15 @@ object ScalarOperators { GeneratedExpression(resultTerm, nullTerm, accessCode, resultType) } + def generateMapCardinality( + nullCheck: Boolean, + map: GeneratedExpression) + : GeneratedExpression = { + generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, map) { + (operandTerm) => s"${map.resultTerm}.size" + } + } + // ---------------------------------------------------------------------------------------------- private def generateUnaryOperatorIfNotNull( http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index 201679b..aa82464 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -86,6 +86,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val FALSE: Keyword = Keyword("false") lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY") lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY") + lazy val MAP: Keyword = Keyword("MAP") lazy val BYTE: Keyword = Keyword("BYTE") lazy val SHORT: Keyword = Keyword("SHORT") lazy val INTERVAL_MONTHS: Keyword = Keyword("INTERVAL_MONTHS") @@ -141,6 +142,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val dataType: PackratParser[TypeInformation[_]] = PRIMITIVE_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.PRIMITIVE_ARRAY(ct) } | OBJECT_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.OBJECT_ARRAY(ct) } | + MAP ~ "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ { mt => Types.MAP(mt._1._1, mt._2)} | BYTE ^^ { e => Types.BYTE } | SHORT ^^ { e => Types.SHORT } | INTERVAL_MONTHS ^^ { e => Types.INTERVAL_MONTHS } | http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala index 08abc8f..3b52ab4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala @@ -138,6 +138,10 @@ object ExpressionUtils { } } + private[flink] def convertMap(map: Map[Expression, Expression]): Expression = { + MapConstructor(map.flatMap(entry => Seq(entry._1, entry._2)).toSeq) + } + // ---------------------------------------------------------------------------------------------- // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable) // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala index 3288478..c43bddd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala @@ -62,66 +62,6 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression { } } -case class ArrayElementAt(array: Expression, index: Expression) extends Expression { - - override private[flink] def children: Seq[Expression] = Seq(array, index) - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder - .getRexBuilder - .makeCall(SqlStdOperatorTable.ITEM, array.toRexNode, index.toRexNode) - } - - override def toString = s"($array).at($index)" - - override private[flink] def resultType = array.resultType match { - case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo - case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo - case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType - } - - override private[flink] def validateInput(): ValidationResult = { - array.resultType match { - case ati: TypeInformation[_] if isArray(ati) => - if (index.resultType == INT_TYPE_INFO) { - // check for common user mistake - index match { - case Literal(value: Int, INT_TYPE_INFO) if value < 1 => - ValidationFailure( - s"Array element access needs an index starting at 1 but was $value.") - case _ => ValidationSuccess - } - } else { - ValidationFailure( - s"Array element access needs an integer index but was '${index.resultType}'.") - } - case other@_ => ValidationFailure(s"Array expected but was '$other'.") - } - } -} - -case class ArrayCardinality(array: Expression) extends Expression { - - override private[flink] def children: Seq[Expression] = Seq(array) - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder - .getRexBuilder - .makeCall(SqlStdOperatorTable.CARDINALITY, array.toRexNode) - } - - override def toString = s"($array).cardinality()" - - override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO - - override private[flink] def validateInput(): ValidationResult = { - array.resultType match { - case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess - case other@_ => ValidationFailure(s"Array expected but was '$other'.") - } - } -} - case class ArrayElement(array: Expression) extends Expression { override private[flink] def children: Seq[Expression] = Seq(array) http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala new file mode 100644 index 0000000..aaf52b0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap} +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +case class Cardinality(container: Expression) extends Expression { + + override private[flink] def children: Seq[Expression] = Seq(container) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder + .getRexBuilder + .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode) + } + + override def toString = s"($container).cardinality()" + + override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO + + override private[flink] def validateInput(): ValidationResult = { + container.resultType match { + case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess + case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess + case other@_ => ValidationFailure(s"Array expected but was '$other'.") + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala new file mode 100644 index 0000000..75a1224 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} +import org.apache.flink.table.typeutils.TypeCheckUtils.isArray +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +case class ItemAt(container: Expression, key: Expression) extends Expression { + + override private[flink] def children: Seq[Expression] = Seq(container, key) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder + .getRexBuilder + .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode) + } + + override def toString = s"($container).at($key)" + + override private[flink] def resultType = container.resultType match { + case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo + case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo + case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo + case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType + } + + override private[flink] def validateInput(): ValidationResult = { + container.resultType match { + case ati: TypeInformation[_] if isArray(ati) => + if (key.resultType == INT_TYPE_INFO) { + // check for common user mistake + key match { + case Literal(value: Int, INT_TYPE_INFO) if value < 1 => + ValidationFailure( + s"Array element access needs an index starting at 1 but was $value.") + case _ => ValidationSuccess + } + } else { + ValidationFailure( + s"Array element access needs an integer index but was '${key.resultType}'.") + } + case mti: MapTypeInfo[_, _] => + if (key.resultType == mti.getKeyTypeInfo) { + ValidationSuccess + } else { + ValidationFailure( + s"Map key-value access needs a valid key of type " + + s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.") + } + case other@_ => ValidationFailure(s"Array or map expected but was '$other'.") + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala new file mode 100644 index 0000000..bf71401 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo} +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import org.apache.flink.table.plan.schema.MapRelDataType +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +import scala.collection.JavaConverters._ + +case class MapConstructor(elements: Seq[Expression]) extends Expression { + override private[flink] def children: Seq[Expression] = elements + + private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo( + new GenericTypeInfo[AnyRef](classOf[AnyRef]), + new GenericTypeInfo[AnyRef](classOf[AnyRef])) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory + val relDataType = typeFactory.createMapType( + typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable = true), + typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable = true) + ) + val values = elements.map(_.toRexNode).toList.asJava + relBuilder + .getRexBuilder + .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values) + } + + override def toString = s"map(${elements + .grouped(2) + .map(x => s"[${x.mkString(": ")}]").mkString(", ")})" + + override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo( + elements.head.resultType, + elements.last.resultType + ) + + override private[flink] def validateInput(): ValidationResult = { + if (elements.isEmpty) { + return ValidationFailure("Empty maps are not supported yet.") + } + if (elements.size % 2 != 0) { + return ValidationFailure("Maps must have even number of elements to form key value pairs.") + } + if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) { + return ValidationFailure("Not all key elements of the map literal have the same type.") + } + if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) { + return ValidationFailure("Not all value elements of the map literal have the same type.") + } + ValidationSuccess + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala index dfb44b1..97cd9cc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala @@ -174,6 +174,13 @@ object ProjectionTranslator { replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames)) c.makeCopy(Array(newArgs)) + // map constructor + case c @ MapConstructor(args) => + val newArgs = c.elements + .map((exp: Expression) => + replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames)) + c.makeCopy(Array(newArgs)) + // General expression case e: Expression => val newArgs = e.productIterator.map { http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala index 00627ad..fa7abab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.typeutils import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo._ -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS} import org.apache.flink.table.validate._ @@ -79,6 +79,9 @@ object TypeCheckUtils { case _ => false } + def isMap(dataType: TypeInformation[_]): Boolean = + dataType.isInstanceOf[MapTypeInfo[_, _]] + def isComparable(dataType: TypeInformation[_]): Boolean = classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType) http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 535fd6e..120bf54 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -237,12 +237,19 @@ object FunctionCatalog { "dateTimePlus" -> classOf[Plus], "dateFormat" -> classOf[DateFormat], + // item + "at" -> classOf[ItemAt], + + // cardinality + "cardinality" -> classOf[Cardinality], + // array "array" -> classOf[ArrayConstructor], - "cardinality" -> classOf[ArrayCardinality], - "at" -> classOf[ArrayElementAt], "element" -> classOf[ArrayElement], + // map + "map" -> classOf[MapConstructor], + // window properties "start" -> classOf[WindowStart], "end" -> classOf[WindowEnd], @@ -330,9 +337,12 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.VAR_SAMP, // ARRAY OPERATORS SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, + SqlStdOperatorTable.ELEMENT, + // MAP OPERATORS + SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, + // ARRAY MAP SHARED OPERATORS SqlStdOperatorTable.ITEM, SqlStdOperatorTable.CARDINALITY, - SqlStdOperatorTable.ELEMENT, // SPECIAL OPERATORS SqlStdOperatorTable.ROW, SqlStdOperatorTable.OVERLAPS, http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala index 536f6ba..28023dd 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala @@ -295,4 +295,13 @@ class ArrayTypeTest extends ArrayTypeTestBase { "f11 <> f9", "false") } + + @Test + def testArrayTypeCasting(): Unit = { + testTableApi( + 'f3.cast(Types.OBJECT_ARRAY(Types.SQL_DATE)), + "f3.cast(OBJECT_ARRAY(SQL_DATE))", + "[1984-03-12, 1984-02-10]" + ) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala index b2b8016..a2f2a0b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala @@ -18,17 +18,178 @@ package org.apache.flink.table.expressions +import java.sql.Date + +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.MapTypeTestBase import org.junit.Test class MapTypeTest extends MapTypeTestBase { @Test - def testItem(): Unit = { - testSqlApi("f0['map is null']", "null") - testSqlApi("f1['map is empty']", "null") - testSqlApi("f2['b']", "13") - testSqlApi("f3[1]", "null") - testSqlApi("f3[12]", "a") + def testMapLiteral(): Unit = { + // primitive literals + testAllApis(map(1, 1), "map(1, 1)", "MAP[1, 1]", "{1=1}") + + testAllApis( + map(true, true), + "map(true, true)", + "map[TRUE, TRUE]", + "{true=true}") + + // object literals + testTableApi(map(BigDecimal(1), BigDecimal(1)), "map(1p, 1p)", "{1=1}") + + testAllApis( + map(map(1, 2), map(3, 4)), + "map(map(1, 2), map(3, 4))", + "MAP[MAP[1, 2], MAP[3, 4]]", + "{{1=2}={3=4}}") + + testAllApis( + map(1 + 2, 3 * 3, 6 / 3, 4 - 2), + "map(1 + 2, 3 * 3, 6 / 3, 4 - 2)", + "map[1 + 2, 3 * 3, 6 / 3, 4 - 2]", + "{2=2, 3=9}") + + testAllApis( + map(1, Null(Types.INT)), + "map(1, Null(INT))", + "map[1, NULLIF(1,1)]", + "{1=null}") + + // explicit conversion + testAllApis( + map(1, 2L , 3, 4L), + "map(1, 2L, 3, 4L)", + "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)]", + "{1=2, 3=4}") + + testAllApis( + map(Date.valueOf("1985-04-11"), 1), + "map('1985-04-11'.toDate, 1)", + "MAP[DATE '1985-04-11', 1]", + "{1985-04-11=1}") + + testAllApis( + map(BigDecimal(2.0002), BigDecimal(2.0003)), + "map(2.0002p, 2.0003p)", + "MAP[CAST(2.0002 AS DECIMAL), CAST(2.0003 AS DECIMAL)]", + "{2.0002=2.0003}") + } + + @Test + def testMapField(): Unit = { + testAllApis( + map('f4, 'f5), + "map(f4, f5)", + "MAP[f4, f5]", + "{foo=12}") + + testAllApis( + map('f4, 'f1), + "map(f4, f1)", + "MAP[f4, f1]", + "{foo={}}") + + testAllApis( + map('f2, 'f3), + "map(f2, f3)", + "MAP[f2, f3]", + "{{a=12, b=13}={12=a, 13=b}}") + + testAllApis( + map('f1.at("a"), 'f5), + "map(f1.at('a'), f5)", + "MAP[f1['a'], f5]", + "{null=12}") + + testAllApis( + 'f1, + "f1", + "f1", + "{}") + + testAllApis( + 'f2, + "f2", + "f2", + "{a=12, b=13}") + + testAllApis( + 'f2.at("a"), + "f2.at('a')", + "f2['a']", + "12") + + testAllApis( + 'f3.at(12), + "f3.at(12)", + "f3[12]", + "a") + + testAllApis( + map('f4, 'f3).at("foo").at(13), + "map(f4, f3).at('foo').at(13)", + "MAP[f4, f3]['foo'][13]", + "b") + } + + @Test + def testMapOperations(): Unit = { + + // comparison + testAllApis( + 'f5 === 'f2.at("a"), + "f5 === f2.at('a')", + "f5 = f2['a']", + "true") + + // comparison + testAllApis( + 'f5 === 'f2.at("a"), + "f5 === f2.at('a')", + "f5 = f2['a']", + "true") + + testAllApis( + 'f0.at("map is null"), + "f0.at('map is null')", + "f0['map is null']", + "null") + + testAllApis( + 'f1.at("map is empty"), + "f1.at('map is empty')", + "f1['map is empty']", + "null") + + testAllApis( + 'f2.at("b"), + "f2.at('b')", + "f2['b']", + "13") + + testAllApis( + 'f3.at(1), + "f3.at(1)", + "f3[1]", + "null") + + testAllApis( + 'f3.at(12), + "f3.at(12)", + "f3[12]", + "a") + } + + @Test + def testMapTypeCasting(): Unit = { + testTableApi( + 'f2.cast(Types.MAP(Types.STRING, Types.INT)), + "f2.cast(MAP(STRING, INT))", + "{a=12, b=13}" + ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala index c631212..7bdfee0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala @@ -156,6 +156,8 @@ class SqlExpressionTest extends ExpressionTestBase { // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0 testSqlApi("ARRAY[TRUE, FALSE][2]", "false") testSqlApi("ARRAY[TRUE, TRUE]", "[true, true]") + testSqlApi("MAP['k1', 'v1', 'k2', 'v2']['k2']", "v2") + testSqlApi("MAP['k1', CAST(true AS VARCHAR(256)), 'k2', 'foo']['k1']", "true") } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala index d90d9df..a5df7ec 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala @@ -22,22 +22,26 @@ import java.util.{HashMap => JHashMap} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{MapTypeInfo, RowTypeInfo} +import org.apache.flink.table.api.Types import org.apache.flink.types.Row class MapTypeTestBase extends ExpressionTestBase { override def testData: Any = { - val testData = new Row(4) - testData.setField(0, null) - testData.setField(1, new JHashMap[String, Int]()) - val map = new JHashMap[String, Int]() - map.put("a", 12) - map.put("b", 13) - testData.setField(2, map) + val map1 = new JHashMap[String, Int]() + map1.put("a", 12) + map1.put("b", 13) val map2 = new JHashMap[Int, String]() map2.put(12, "a") map2.put(13, "b") + val testData = new Row(7) + testData.setField(0, null) + testData.setField(1, new JHashMap[String, Int]()) + testData.setField(2, map1) testData.setField(3, map2) + testData.setField(4, "foo") + testData.setField(5, 12) + testData.setField(6, Array(1.2, 1.3)) testData } @@ -46,7 +50,10 @@ class MapTypeTestBase extends ExpressionTestBase { new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), - new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) + new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), + Types.STRING, + Types.INT, + Types.OBJECT_ARRAY(Types.DOUBLE) ).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala index 3f8306d..ae85b0d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala @@ -26,6 +26,6 @@ class MapTypeValidationTest extends MapTypeTestBase { @Test(expected = classOf[ValidationException]) def testWrongKeyType(): Unit = { - testSqlApi("f4[12]", "FAIL") + testSqlApi("f2[12]", "FAIL") } }