asfgit closed pull request #6578: [FLINK-10170] [table] Support string representation for map and array types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 1bfff420d62..4def3a485c3 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -417,13 +417,19 @@ DECIMAL DATE TIME TIMESTAMP -ROW(fieldtype, ...) # unnamed row; e.g. ROW(VARCHAR, INT) that is mapped to Flink's RowTypeInfo +MAP<fieldtype, fieldtype> # generic map; e.g. MAP<VARCHAR, INT> that is mapped to Flink's MapTypeInfo +MULTISET<fieldtype> # multiset; e.g. MULTISET<VARCHAR> that is mapped to Flink's MultisetTypeInfo +PRIMITIVE_ARRAY<fieldtype> # primitive array; e.g. PRIMITIVE_ARRAY<INT> that is mapped to Flink's PrimitiveArrayTypeInfo +BASIC_ARRAY<fieldtype> # basic array; e.g. BASIC_ARRAY<INT> that is mapped to Flink's BasicArrayTypeInfo +OBJECT_ARRAY<fieldtype> # object array; e.g. OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to + # Flink's ObjectArrayTypeInfo +ROW<fieldtype, ...> # unnamed row; e.g. ROW<VARCHAR, INT> that is mapped to Flink's RowTypeInfo # with indexed fields names f0, f1, ... -ROW(fieldname fieldtype, ...) # named row; e.g., ROW(myField VARCHAR, myOtherField INT) that +ROW<fieldname fieldtype, ...> # named row; e.g., ROW<myField VARCHAR, myOtherField INT> that # is mapped to Flink's RowTypeInfo -POJO(class) # e.g., POJO(org.mycompany.MyPojoClass) that is mapped to Flink's PojoTypeInfo -ANY(class) # e.g., ANY(org.mycompany.MyClass) that is mapped to Flink's GenericTypeInfo -ANY(class, serialized) # used for type information that is not supported by Flink's Table & SQL API +POJO<class> # e.g., POJO<org.mycompany.MyPojoClass> that is mapped to Flink's PojoTypeInfo +ANY<class> # e.g., ANY<org.mycompany.MyClass> that is mapped to Flink's GenericTypeInfo +ANY<class, serialized> # used for type information that is not supported by Flink's Table & SQL API {% endhighlight %} {% top %} @@ -1046,4 +1052,4 @@ table.writeToSink(sink) </div> </div> -{% top %} \ No newline at end of file +{% top %} diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java index 6e370a02c13..ac6ff11c370 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java @@ -106,7 +106,7 @@ public void testDuplicateSchema() { final Map<String, String> props3 = new HashMap<>(); props3.put("format.type", "json"); props3.put("format.property-version", "1"); - props3.put("format.schema", "ROW(test1 VARCHAR, test2 TIMESTAMP)"); + props3.put("format.schema", "ROW<test1 VARCHAR, test2 TIMESTAMP>"); props3.put("format.fail-on-missing-field", "true"); final Map<String, String> props4 = new HashMap<>(); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala index afc6506cbef..9e96ea5a535 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.typeutils import java.io.Serializable +import java.lang.reflect.Array import org.apache.commons.codec.binary.Base64 import org.apache.commons.lang3.StringEscapeUtils @@ -67,6 +68,11 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { lazy val ROW: Keyword = Keyword("ROW") lazy val ANY: Keyword = Keyword("ANY") lazy val POJO: Keyword = Keyword("POJO") + lazy val MAP: Keyword = Keyword("MAP") + lazy val MULTISET: Keyword = Keyword("MULTISET") + lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY") + lazy val BASIC_ARRAY: Keyword = Keyword("BASIC_ARRAY") + lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY") lazy val qualifiedName: Parser[String] = """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r @@ -74,6 +80,13 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { lazy val base64Url: Parser[String] = """[A-Za-z0-9_-]*""".r + // keep parenthesis to ensure backward compatibility + lazy val leftBracket: PackratParser[(String)] = + "(" | "<" + + lazy val rightBracket: PackratParser[(String)] = + ")" | ">" + lazy val atomic: PackratParser[TypeInformation[_]] = (VARCHAR | STRING) ^^ { e => Types.STRING } | BOOLEAN ^^ { e => Types.BOOLEAN } | @@ -101,34 +114,35 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { } lazy val namedRow: PackratParser[TypeInformation[_]] = - ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ { + ROW ~ leftBracket ~> rep1sep(field, ", ") <~ rightBracket ^^ { fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray) } | failure("Named row type expected.") lazy val unnamedRow: PackratParser[TypeInformation[_]] = - ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ { + ROW ~ leftBracket ~> rep1sep(typeInfo, ", ") <~ rightBracket ^^ { types => Types.ROW(types: _*) } | failure("Unnamed row type expected.") lazy val generic: PackratParser[TypeInformation[_]] = - ANY ~ "(" ~> qualifiedName <~ ")" ^^ { + ANY ~ leftBracket ~> qualifiedName <~ rightBracket ^^ { typeClass => val clazz = loadClass(typeClass) new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]]) } - lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ { - typeClass => - val clazz = loadClass(typeClass) - val info = TypeExtractor.createTypeInfo(clazz) - if (!info.isInstanceOf[PojoTypeInfo[_]]) { - throw new ValidationException(s"Class '$typeClass'is not a POJO type.") - } - info - } + lazy val pojo: PackratParser[TypeInformation[_]] = + POJO ~ leftBracket ~> qualifiedName <~ rightBracket ^^ { + typeClass => + val clazz = loadClass(typeClass) + val info = TypeExtractor.createTypeInfo(clazz) + if (!info.isInstanceOf[PojoTypeInfo[_]]) { + throw new ValidationException(s"Class '$typeClass'is not a POJO type.") + } + info + } lazy val any: PackratParser[TypeInformation[_]] = - ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ { + ANY ~ leftBracket ~ qualifiedName ~ "," ~ base64Url ~ rightBracket ^^ { case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=> val clazz = loadClass(typeClass) val typeInfo = deserialize(serialized) @@ -140,8 +154,45 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { typeInfo } + lazy val genericMap: PackratParser[TypeInformation[_]] = + MAP ~ leftBracket ~ typeInfo ~ "," ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ keyTypeInfo ~ _ ~ valueTypeInfo ~ _=> + Types.MAP(keyTypeInfo, valueTypeInfo) + } + + lazy val multiSet: PackratParser[TypeInformation[_]] = + MULTISET ~ leftBracket ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ elementTypeInfo ~ _ => + Types.MULTISET(elementTypeInfo) + } + + lazy val primitiveArray: PackratParser[TypeInformation[_]] = + PRIMITIVE_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ componentTypeInfo ~ _ => + Types.PRIMITIVE_ARRAY(componentTypeInfo) + } + + lazy val basicArray: PackratParser[TypeInformation[_]] = + BASIC_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ componentTypeInfo ~ _ => + BasicArrayTypeInfo.getInfoFor(Array.newInstance( + componentTypeInfo.getTypeClass, 0).getClass) + } + + lazy val objectArray: PackratParser[TypeInformation[_]] = + OBJECT_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ componentTypeInfo ~ _ => + Types.OBJECT_ARRAY(componentTypeInfo) + } + + lazy val map: PackratParser[TypeInformation[_]] = + genericMap | multiSet + + lazy val array: PackratParser[TypeInformation[_]] = + primitiveArray | basicArray | objectArray + lazy val typeInfo: PackratParser[TypeInformation[_]] = - namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.") + namedRow | unnamedRow | any | generic | pojo | atomic | map | array | failure("Invalid type.") def readTypeInfo(typeString: String): TypeInformation[_] = { parseAll(typeInfo, typeString) match { @@ -182,10 +233,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { s"$name ${normalizeTypeInfo(f._2)}" } - s"${ROW.key}(${normalizedFields.mkString(", ")})" + s"${ROW.key}<${normalizedFields.mkString(", ")}>" case generic: GenericTypeInfo[_] => - s"${ANY.key}(${generic.getTypeClass.getName})" + s"${ANY.key}<${generic.getTypeClass.getName}>" case pojo: PojoTypeInfo[_] => // we only support very simple POJOs that only contain extracted fields @@ -196,7 +247,7 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { case _: InvalidTypesException => None } extractedInfo match { - case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})" + case Some(ei) if ei == pojo => s"${POJO.key}<${pojo.getTypeClass.getName}>" case _ => throw new TableException( "A string representation for custom POJO types is not supported yet.") @@ -205,15 +256,25 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { case _: CompositeType[_] => throw new TableException("A string representation for composite types is not supported yet.") - case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] | - _: PrimitiveArrayTypeInfo[_] => - throw new TableException("A string representation for array types is not supported yet.") + case array: PrimitiveArrayTypeInfo[_] => + s"${PRIMITIVE_ARRAY.key}<${normalizeTypeInfo(array.getComponentType)}>" + + case array: BasicArrayTypeInfo[_, _] => + s"${BASIC_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>" + + case array: ObjectArrayTypeInfo[_, _] => + s"${OBJECT_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>" + + case set: MultisetTypeInfo[_] => + s"${MULTISET.key}<${normalizeTypeInfo(set.getElementTypeInfo)}>" - case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] => - throw new TableException("A string representation for map types is not supported yet.") + case map: MapTypeInfo[_, _] => + val normalizedKey = normalizeTypeInfo(map.getKeyTypeInfo) + val normalizedValue = normalizeTypeInfo(map.getValueTypeInfo) + s"${MAP.key}<${normalizedKey}, ${normalizedValue}>" case any: TypeInformation[_] => - s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})" + s"${ANY.key}<${any.getTypeClass.getName}, ${serialize(any)}>" } // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala index 0c2e8069474..8d01b8be3c3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala @@ -78,9 +78,9 @@ class CsvTest extends DescriptorTestBase { "format.fields.1.name" -> "field2", "format.fields.1.type" -> "TIMESTAMP", "format.fields.2.name" -> "field3", - "format.fields.2.type" -> "ANY(java.lang.Class)", + "format.fields.2.type" -> "ANY<java.lang.Class>", "format.fields.3.name" -> "field4", - "format.fields.3.type" -> "ROW(test INT, row VARCHAR)", + "format.fields.3.type" -> "ROW<test INT, row VARCHAR>", "format.line-delimiter" -> "^") val props2 = Map( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala index ccac3170cbd..00e3a21c9c2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala @@ -18,7 +18,9 @@ package org.apache.flink.table.descriptors +import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.api.Types +import org.apache.flink.table.runtime.utils.CommonTestData.Person import org.apache.flink.table.utils.TableTestBase import org.junit.Assert.assertEquals import org.junit.Test @@ -45,6 +47,10 @@ class TableDescriptorTest extends TableTestBase { val schema = Schema() .field("myfield", Types.STRING) .field("myfield2", Types.INT) + .field("myfield3", Types.MAP(Types.STRING, Types.INT)) + .field("myfield4", Types.MULTISET(Types.LONG)) + .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT)) + .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))) // CSV table source and sink do not support proctime yet //if (isStreaming) { // schema.field("proctime", Types.SQL_TIMESTAMP).proctime() @@ -56,6 +62,10 @@ class TableDescriptorTest extends TableTestBase { val format = Csv() .field("myfield", Types.STRING) .field("myfield2", Types.INT) + .field("myfield3", Types.MAP(Types.STRING, Types.INT)) + .field("myfield4", Types.MULTISET(Types.LONG)) + .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT)) + .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))) .fieldDelimiter("#") val descriptor: RegistrableDescriptor = if (isStreaming) { @@ -84,11 +94,29 @@ class TableDescriptorTest extends TableTestBase { "format.fields.0.type" -> "VARCHAR", "format.fields.1.name" -> "myfield2", "format.fields.1.type" -> "INT", + "format.fields.2.name" -> "myfield3", + "format.fields.2.type" -> "MAP<VARCHAR, INT>", + "format.fields.3.name" -> "myfield4", + "format.fields.3.type" -> "MULTISET<BIGINT>", + "format.fields.4.name" -> "myfield5", + "format.fields.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>", + "format.fields.5.name" -> "myfield6", + "format.fields.5.type" -> + "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>", "format.field-delimiter" -> "#", "schema.0.name" -> "myfield", "schema.0.type" -> "VARCHAR", "schema.1.name" -> "myfield2", - "schema.1.type" -> "INT" + "schema.1.type" -> "INT", + "schema.2.name" -> "myfield3", + "schema.2.type" -> "MAP<VARCHAR, INT>", + "schema.3.name" -> "myfield4", + "schema.3.type" -> "MULTISET<BIGINT>", + "schema.4.name" -> "myfield5", + "schema.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>", + "schema.5.name" -> "myfield6", + "schema.5.type" -> + "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>" ) val expectedProperties = if (isStreaming) { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala index 9ea8be08620..56e971b964a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala @@ -18,14 +18,12 @@ package org.apache.flink.table.typeutils -import java.util - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, BasicArrayTypeInfo, BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor} import org.apache.flink.table.api.Types import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person} import org.junit.Assert.{assertEquals, assertTrue} -import org.junit.{Assert, Test} +import org.junit.{Test} /** * Tests for string-based representation of [[TypeInformation]]. @@ -49,7 +47,7 @@ class TypeStringUtilsTest { // unsupported type information testReadAndWrite( - "ANY(java.lang.Void, " + + "ANY<java.lang.Void, " + "rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl" + "ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z" + "c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h" + @@ -59,31 +57,62 @@ class TypeStringUtilsTest { "cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA" + "AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp" + "YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1" + - "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)", + "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA>", BasicTypeInfo.VOID_TYPE_INFO) } @Test def testWriteComplexTypes(): Unit = { testReadAndWrite( - "ROW(f0 DECIMAL, f1 TINYINT)", + "ROW<f0 DECIMAL, f1 TINYINT>", Types.ROW(Types.DECIMAL, Types.BYTE)) testReadAndWrite( - "ROW(hello DECIMAL, world TINYINT)", + "ROW<hello DECIMAL, world TINYINT>", Types.ROW( Array[String]("hello", "world"), Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) testReadAndWrite( - "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)", + "POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>", TypeExtractor.createTypeInfo(classOf[Person])) testReadAndWrite( - "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)", + "ANY<org.apache.flink.table.runtime.utils.CommonTestData$NonPojo>", TypeExtractor.createTypeInfo(classOf[NonPojo])) + testReadAndWrite( + "MAP<VARCHAR, ROW<f0 DECIMAL, f1 TINYINT>>", + Types.MAP(Types.STRING, Types.ROW(Types.DECIMAL, Types.BYTE)) + ) + + testReadAndWrite( + "MULTISET<ROW<f0 DECIMAL, f1 TINYINT>>", + Types.MULTISET(Types.ROW(Types.DECIMAL, Types.BYTE)) + ) + + testReadAndWrite( + "PRIMITIVE_ARRAY<TINYINT>", + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO + ) + + testReadAndWrite( + "BASIC_ARRAY<INT>", + BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO + ) + + testReadAndWrite( + "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>", + Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])) + ) + // test escaping + assertTrue( + TypeStringUtils.readTypeInfo("ROW<\"he \\nllo\" DECIMAL, world TINYINT>") + .asInstanceOf[RowTypeInfo].getFieldNames + .sameElements(Array[String]("he \nllo", "world"))) + + // test backward compatibility with brackets () assertTrue( TypeStringUtils.readTypeInfo("ROW(\"he \\nllo\" DECIMAL, world TINYINT)") .asInstanceOf[RowTypeInfo].getFieldNames ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services