[FLINK-6377] [table] Support map types in the Table / SQL API This closes #3767.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b6e71ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b6e71ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b6e71ce Branch: refs/heads/table-retraction Commit: 5b6e71ceb12d1fdf7d09d70744f3c0a8a4722768 Parents: 0a33431 Author: Haohui Mai <whe...@apache.org> Authored: Mon Apr 24 23:12:29 2017 -0700 Committer: twalthr <twal...@apache.org> Committed: Sun Apr 30 18:59:05 2017 +0200 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkTypeFactory.scala | 13 ++++-- .../flink/table/codegen/CodeGenerator.scala | 22 ++++++--- .../table/codegen/calls/ScalarOperators.scala | 34 +++++++++++++- .../flink/table/plan/nodes/FlinkRelNode.scala | 2 +- .../table/plan/schema/MapRelDataType.scala | 49 ++++++++++++++++++++ .../table/api/java/batch/sql/SqlITCase.java | 33 +++++++++++++ 6 files changed, 140 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 22a5c9f..7762ff8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -28,13 +28,12 @@ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo} import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType} +import org.apache.flink.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType, MapRelDataType} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple -import org.apache.flink.table.plan.schema.ArrayRelDataType import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName import org.apache.flink.types.Row @@ -123,6 +122,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp case oa: ObjectArrayTypeInfo[_, _] => new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true) + case mp: MapTypeInfo[_, _] => + new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo), + createTypeFromTypeInfo(mp.getValueTypeInfo), true) + case ti: TypeInformation[_] => new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) @@ -226,6 +229,10 @@ object FlinkTypeFactory { val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] arrayRelDataType.typeInfo + case MAP if relDataType.isInstanceOf[MapRelDataType] => + val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] + mapRelDataType.typeInfo + case _@t => throw TableException(s"Type is not supported: $t") } http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 298fb70..648efe6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -28,9 +28,9 @@ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.GenericInputFormat -import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{AtomicType, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.Configuration import org.apache.flink.table.api.TableConfig @@ -1414,11 +1414,19 @@ class CodeGenerator( generateArray(this, resultType, operands) case ITEM => - val array = operands.head - val index = operands(1) - requireArray(array) - requireInteger(index) - generateArrayElementAt(this, array, index) + operands.head.resultType match { + case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => + val array = operands.head + val index = operands(1) + requireInteger(index) + generateArrayElementAt(this, array, index) + + case map: MapTypeInfo[_, _] => + val key = operands(1) + generateMapGet(this, operands.head, key) + + case _ => throw new CodeGenException("Expect an array or a map.") + } case CARDINALITY => val array = operands.head http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 47a81ab..0c5baa6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -22,9 +22,9 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange} import org.apache.calcite.util.BuiltInMethod import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.codegen.CodeGenUtils._ -import org.apache.flink.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression} +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils._ @@ -911,6 +911,36 @@ object ScalarOperators { } } + def generateMapGet( + codeGenerator: CodeGenerator, + map: GeneratedExpression, + key: GeneratedExpression) + : GeneratedExpression = { + + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val ty = map.resultType.asInstanceOf[MapTypeInfo[_,_]] + val resultType = ty.getValueTypeInfo + val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo) + val accessCode = if (codeGenerator.nullCheck) { + s""" + |${map.code} + |${key.code} + |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm}); + |$resultTypeTerm $resultTerm = $nullTerm ? + | null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm}); + |""".stripMargin + } else { + s""" + |${map.code} + |${key.code} + |$resultTypeTerm $resultTerm = ($resultTypeTerm) + | ${map.resultTerm}.get(${key.resultTerm}); + |""".stripMargin + } + GeneratedExpression(resultTerm, nullTerm, accessCode, resultType) + } + // ---------------------------------------------------------------------------------------------- private def generateUnaryOperatorIfNotNull( http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala index ccdddef..7554ea9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala @@ -93,7 +93,7 @@ trait FlinkRelNode extends RelNode { case SqlTypeName.ARRAY => // 16 is an arbitrary estimate estimateDataTypeSize(t.getComponentType) * 16 - case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate + case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary estimate case _ => throw TableException(s"Unsupported data type encountered: $t") } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala new file mode 100644 index 0000000..b3ff99f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.schema + +import com.google.common.base.Objects +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql.`type`.MapSqlType +import org.apache.flink.api.common.typeinfo.TypeInformation + +class MapRelDataType( + val typeInfo: TypeInformation[_], + val keyType: RelDataType, + val valueType: RelDataType, + isNullable: Boolean) extends MapSqlType(keyType, valueType, isNullable) { + + override def toString: String = s"MAP($typeInfo)" + + def canEqual(other: Any): Boolean = other.isInstanceOf[MapRelDataType] + + override def equals(other: Any): Boolean = other match { + case that: MapRelDataType => + super.equals(that) && + (that canEqual this) && + keyType == that.keyType && + valueType == that.valueType && + isNullable == that.isNullable + case _ => false + } + + override def hashCode(): Int = { + Objects.hashCode(keyType, valueType) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java index 5ba67dd..114226c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java @@ -18,9 +18,14 @@ package org.apache.flink.table.api.java.batch.sql; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.types.Row; @@ -32,7 +37,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; @RunWith(Parameterized.class) public class SqlITCase extends TableProgramsCollectionTestBase { @@ -138,4 +146,29 @@ public class SqlITCase extends TableProgramsCollectionTestBase { String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; compareResultAsText(results, expected); } + + @Test + public void testMap() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>(); + rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar"))); + rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam"))); + + TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>( + BasicTypeInfo.INT_TYPE_INFO, + new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + + DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty); + tableEnv.registerDataSet("t1", ds1, "a, b"); + + String sqlQuery = "SELECT b['foo'] FROM t1"; + Table result = tableEnv.sql(sqlQuery); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = "bar\n" + "spam\n"; + compareResultAsText(results, expected); + } }