[FLINK-6377] [table] Support map types in the Table / SQL API

This closes #3767.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b6e71ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b6e71ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b6e71ce

Branch: refs/heads/table-retraction
Commit: 5b6e71ceb12d1fdf7d09d70744f3c0a8a4722768
Parents: 0a33431
Author: Haohui Mai <whe...@apache.org>
Authored: Mon Apr 24 23:12:29 2017 -0700
Committer: twalthr <twal...@apache.org>
Committed: Sun Apr 30 18:59:05 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 13 ++++--
 .../flink/table/codegen/CodeGenerator.scala     | 22 ++++++---
 .../table/codegen/calls/ScalarOperators.scala   | 34 +++++++++++++-
 .../flink/table/plan/nodes/FlinkRelNode.scala   |  2 +-
 .../table/plan/schema/MapRelDataType.scala      | 49 ++++++++++++++++++++
 .../table/api/java/batch/sql/SqlITCase.java     | 33 +++++++++++++
 6 files changed, 140 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 22a5c9f..7762ff8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -28,13 +28,12 @@ import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, 
PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, 
RowTypeInfo}
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.schema.{CompositeRelDataType, 
GenericRelDataType}
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, 
CompositeRelDataType, GenericRelDataType, MapRelDataType}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
-import org.apache.flink.table.plan.schema.ArrayRelDataType
 import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
 import org.apache.flink.types.Row
 
@@ -123,6 +122,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
     case oa: ObjectArrayTypeInfo[_, _] =>
       new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), 
true)
 
+    case mp: MapTypeInfo[_, _] =>
+      new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
+        createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+
     case ti: TypeInformation[_] =>
       new GenericRelDataType(typeInfo, 
getTypeSystem.asInstanceOf[FlinkTypeSystem])
 
@@ -226,6 +229,10 @@ object FlinkTypeFactory {
       val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
       arrayRelDataType.typeInfo
 
+    case MAP if relDataType.isInstanceOf[MapRelDataType] =>
+      val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
+      mapRelDataType.typeInfo
+
     case _@t =>
       throw TableException(s"Type is not supported: $t")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 298fb70..648efe6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -28,9 +28,9 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeinfo.{AtomicType, 
PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, 
RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.TableConfig
@@ -1414,11 +1414,19 @@ class CodeGenerator(
         generateArray(this, resultType, operands)
 
       case ITEM =>
-        val array = operands.head
-        val index = operands(1)
-        requireArray(array)
-        requireInteger(index)
-        generateArrayElementAt(this, array, index)
+        operands.head.resultType match {
+          case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+            val array = operands.head
+            val index = operands(1)
+            requireInteger(index)
+            generateArrayElementAt(this, array, index)
+
+          case map: MapTypeInfo[_, _] =>
+            val key = operands(1)
+            generateMapGet(this, operands.head, key)
+
+          case _ => throw new CodeGenException("Expect an array or a map.")
+        }
 
       case CARDINALITY =>
         val array = operands.head

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 47a81ab..0c5baa6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, 
TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
 import org.apache.flink.table.codegen.CodeGenUtils._
-import org.apache.flink.table.codegen.{CodeGenerator, CodeGenException, 
GeneratedExpression}
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 
@@ -911,6 +911,36 @@ object ScalarOperators {
     }
   }
 
+  def generateMapGet(
+      codeGenerator: CodeGenerator,
+      map: GeneratedExpression,
+      key: GeneratedExpression)
+  : GeneratedExpression = {
+
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val ty = map.resultType.asInstanceOf[MapTypeInfo[_,_]]
+    val resultType = ty.getValueTypeInfo
+    val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo)
+    val accessCode = if (codeGenerator.nullCheck) {
+          s"""
+             |${map.code}
+             |${key.code}
+             |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
+             |$resultTypeTerm $resultTerm = $nullTerm ?
+             |  null : ($resultTypeTerm) 
${map.resultTerm}.get(${key.resultTerm});
+             |""".stripMargin
+        } else {
+          s"""
+             |${map.code}
+             |${key.code}
+             |$resultTypeTerm $resultTerm = ($resultTypeTerm)
+             | ${map.resultTerm}.get(${key.resultTerm});
+             |""".stripMargin
+        }
+    GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
+  }
+
   // 
----------------------------------------------------------------------------------------------
 
   private def generateUnaryOperatorIfNotNull(

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index ccdddef..7554ea9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -93,7 +93,7 @@ trait FlinkRelNode extends RelNode {
     case SqlTypeName.ARRAY =>
       // 16 is an arbitrary estimate
       estimateDataTypeSize(t.getComponentType) * 16
-    case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate
+    case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary 
estimate
     case _ => throw TableException(s"Unsupported data type encountered: $t")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
new file mode 100644
index 0000000..b3ff99f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.schema
+
+import com.google.common.base.Objects
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MapSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MapRelDataType(
+   val typeInfo: TypeInformation[_],
+   val keyType: RelDataType,
+   val valueType: RelDataType,
+   isNullable: Boolean) extends MapSqlType(keyType, valueType, isNullable) {
+
+  override def toString: String = s"MAP($typeInfo)"
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[MapRelDataType]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: MapRelDataType =>
+      super.equals(that) &&
+        (that canEqual this) &&
+        keyType == that.keyType &&
+        valueType == that.valueType &&
+        isNullable == that.isNullable
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(keyType, valueType)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index 5ba67dd..114226c 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.table.api.java.batch.sql;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.types.Row;
@@ -32,7 +37,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 @RunWith(Parameterized.class)
 public class SqlITCase extends TableProgramsCollectionTestBase {
@@ -138,4 +146,29 @@ public class SqlITCase extends 
TableProgramsCollectionTestBase {
                String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
                compareResultAsText(results, expected);
        }
+
+       @Test
+       public void testMap() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
+
+               List<Tuple2<Integer, Map<String, String>>> rows = new 
ArrayList<>();
+               rows.add(new Tuple2<>(1, Collections.singletonMap("foo", 
"bar")));
+               rows.add(new Tuple2<>(2, Collections.singletonMap("foo", 
"spam")));
+
+               TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new 
TupleTypeInfo<>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO));
+
+               DataSet<Tuple2<Integer, Map<String, String>>> ds1 = 
env.fromCollection(rows, ty);
+               tableEnv.registerDataSet("t1", ds1, "a, b");
+
+               String sqlQuery = "SELECT b['foo'] FROM t1";
+               Table result = tableEnv.sql(sqlQuery);
+
+               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = resultSet.collect();
+               String expected = "bar\n" + "spam\n";
+               compareResultAsText(results, expected);
+       }
 }

Reply via email to