[FLINK-6377] [table] Add additional map tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d49efbd2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d49efbd2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d49efbd2 Branch: refs/heads/table-retraction Commit: d49efbd222c1aa963f3f9a7fb3cf359071d1bbd3 Parents: 5b6e71c Author: twalthr <twal...@apache.org> Authored: Wed Apr 26 18:04:00 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Sun Apr 30 18:59:05 2017 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 1 + .../org/apache/flink/table/api/Types.scala | 12 +++- .../flink/table/codegen/ExpressionReducer.scala | 7 +- .../flink/table/plan/nodes/FlinkRelNode.scala | 5 +- .../flink/table/expressions/MapTypeTest.scala | 72 ++++++++++++++++++++ 5 files changed, 93 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 022a73d..2b777c6 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1747,6 +1747,7 @@ The Table API is built on top of Flink's DataSet and DataStream API. Internally, | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` | | `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` | | `Types.OBJECT_ARRAY` | `ARRAY` | e.g. `java.lang.Byte[]`| +| `Types.MAP` | `MAP` | `java.util.HashMap` | Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and array types (object or primitive arrays) can be fields of a row. http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala index f22fa32..2152b72 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.api import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.types.Row @@ -100,4 +100,14 @@ object Types { def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = { ObjectArrayTypeInfo.getInfoFor(elementType) } + + /** + * Generates type information for a Java HashMap. + * + * @param keyType type of the keys of the map e.g. Types.STRING + * @param valueType type of the values of the map e.g. Types.STRING + */ + def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = { + new MapTypeInfo(keyType, valueType) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala index 3fcbdc1..b7e1335 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala @@ -65,7 +65,10 @@ class ExpressionReducer(config: TableConfig) ) // we don't support object literals yet, we skip those constant expressions - case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) | (SqlTypeName.ARRAY, _) => None + case (SqlTypeName.ANY, _) | + (SqlTypeName.ROW, _) | + (SqlTypeName.ARRAY, _) | + (SqlTypeName.MAP, _) => None case (_, e) => Some(e) } @@ -103,7 +106,7 @@ class ExpressionReducer(config: TableConfig) val unreduced = constExprs.get(i) unreduced.getType.getSqlTypeName match { // we insert the original expression for object literals - case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY => + case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY | SqlTypeName.MAP => reducedValues.add(unreduced) case _ => val reducedValue = reduced.getField(reducedIdx) http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala index 7554ea9..0b244e9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala @@ -93,7 +93,10 @@ trait FlinkRelNode extends RelNode { case SqlTypeName.ARRAY => // 16 is an arbitrary estimate estimateDataTypeSize(t.getComponentType) * 16 - case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary estimate + case SqlTypeName.MAP => + // 16 is an arbitrary estimate + (estimateDataTypeSize(t.getKeyType) + estimateDataTypeSize(t.getValueType)) * 16 + case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate case _ => throw TableException(s"Unsupported data type encountered: $t") } } http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala new file mode 100644 index 0000000..ca80737 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import java.util.{HashMap => JHashMap} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, RowTypeInfo} +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.expressions.utils.ExpressionTestBase +import org.apache.flink.types.Row +import org.junit.Test + +class MapTypeTest extends ExpressionTestBase { + + @Test(expected = classOf[ValidationException]) + def testWrongKeyType(): Unit = { + testSqlApi("f4[12]", "FAIL") + } + + @Test + def testItem(): Unit = { + testSqlApi("f0['map is null']", "null") + testSqlApi("f1['map is empty']", "null") + testSqlApi("f2['b']", "13") + testSqlApi("f3[1]", "null") + testSqlApi("f3[12]", "a") + } + + // ---------------------------------------------------------------------------------------------- + + override def testData: Any = { + val testData = new Row(4) + testData.setField(0, null) + testData.setField(1, new JHashMap[String, Int]()) + val map = new JHashMap[String, Int]() + map.put("a", 12) + map.put("b", 13) + testData.setField(2, map) + val map2 = new JHashMap[Int, String]() + map2.put(12, "a") + map2.put(13, "b") + testData.setField(3, map2) + testData + } + + override def typeInfo: TypeInformation[Any] = { + new RowTypeInfo( + new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), + new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), + new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), + new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) + ).asInstanceOf[TypeInformation[Any]] + } + +}