This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a3fd687 [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37 a3fd687 is described below commit a3fd687a515af66df30ce4ee265e0af3c5663820 Author: yuzhao.cyz <yuzhao....@alibaba-inc.com> AuthorDate: Fri Jul 26 22:37:53 2019 +0800 [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37 This brings the SQL DDL closer to FLIP-37. However, there are a couple of known limitations. Currently unsupported features: - INTERVAL - ROW with comments - ANY - NULL - NOT NULL/NULL for top-level types - ignoring collation/charset - VARCHAR without length (=VARCHAR(1)) - TIMESTAMP WITH TIME ZONE - user-defined types - data types in non-DDL parts (e.g. CAST(f AS STRING)) This closes #9243. --- docs/dev/table/sql.md | 2 +- .../src/main/codegen/data/Parser.tdd | 27 +- .../src/main/codegen/includes/parserImpls.ftl | 294 +++++++++++++- .../flink/sql/parser/FlinkSqlDataTypeSpec.java | 325 +++++++++++++++ .../type/{SqlMapType.java => SqlBytesType.java} | 31 +- .../apache/flink/sql/parser/type/SqlMapType.java | 6 +- .../type/{SqlMapType.java => SqlMultisetType.java} | 28 +- .../apache/flink/sql/parser/type/SqlRowType.java | 34 +- .../type/{SqlMapType.java => SqlStringType.java} | 31 +- .../apache/flink/sql/parser/type/SqlTimeType.java | 73 ++++ .../flink/sql/parser/type/SqlTimestampType.java | 73 ++++ .../java/org/apache/flink/sql/parser/Fixture.java | 115 ++++++ .../flink/sql/parser/FlinkDDLDataTypeTest.java | 446 +++++++++++++++++++++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 35 +- .../runtime/stream/sql/WindowAggregateITCase.scala | 10 +- .../runtime/stream/TimeAttributesITCase.scala | 2 +- 16 files changed, 1394 insertions(+), 138 deletions(-) diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 3155915..e607716 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1085,7 +1085,7 @@ Although not every SQL feature is implemented yet, some string combinations are {% highlight sql %} -A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHA [...] +A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENG [...] {% endhighlight %} diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 77a8e58..5cefc93 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -31,10 +31,16 @@ "org.apache.flink.sql.parser.dml.RichSqlInsert", "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword", "org.apache.flink.sql.parser.type.SqlArrayType", + "org.apache.flink.sql.parser.type.SqlBytesType", "org.apache.flink.sql.parser.type.SqlMapType", + "org.apache.flink.sql.parser.type.SqlMultisetType", "org.apache.flink.sql.parser.type.SqlRowType", + "org.apache.flink.sql.parser.type.SqlStringType", + "org.apache.flink.sql.parser.type.SqlTimeType", + "org.apache.flink.sql.parser.type.SqlTimestampType", "org.apache.flink.sql.parser.utils.SqlTimeUnit", "org.apache.flink.sql.parser.validate.FlinkSqlConformance", + "org.apache.flink.sql.parser.FlinkSqlDataTypeSpec", "org.apache.flink.sql.parser.SqlProperty", "org.apache.calcite.sql.SqlDrop", "org.apache.calcite.sql.SqlCreate", @@ -53,7 +59,9 @@ "FROM_SOURCE", "BOUNDED", "DELAY", - "OVERWRITE" + "OVERWRITE", + "STRING", + "BYTES" ] # List of keywords from "keywords" section that are not reserved. @@ -384,13 +392,24 @@ literalParserMethods: [ ] - # List of methods for parsing custom data types. + # List of methods for parsing ddl supported data types. # Return type of method implementation should be "SqlIdentifier". # Example: SqlParseTimeStampZ(). - dataTypeParserMethods: [ + flinkDataTypeParserMethods: [ "SqlArrayType()", + "SqlMultisetType()", "SqlMapType()", - "SqlRowType()" + "SqlRowType()", + "SqlStringType()", + "SqlBytesType()", + "SqlTimestampType()", + "SqlTimeType()" + ] + + # List of methods for parsing custom data types. + # Return type of method implementation should be "SqlIdentifier". + # Example: SqlParseTimeStampZ(). + dataTypeParserMethods: [ ] # List of methods for parsing builtin function calls. diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 95621fe..ae66846 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -15,6 +15,20 @@ // limitations under the License. --> +/** +* Parse a nullable option, default to be nullable. +*/ +boolean NullableOpt() : +{ +} +{ + <NULL> { return true; } +| + <NOT> <NULL> { return false; } +| + { return true; } +} + void TableColumn(TableCreationContext context) : { } @@ -55,14 +69,8 @@ void TableColumn2(List<SqlNode> list) : } { name = SimpleIdentifier() - type = DataType() - ( - <NULL> { type = type.withNullable(true); } - | - <NOT> <NULL> { type = type.withNullable(false); } - | - { type = type.withNullable(true); } - ) + <#-- #FlinkDataType already takes care of the nullable attribute. --> + type = FlinkDataType() [ <COMMENT> <QUOTED_STRING> { String p = SqlParserUtil.parseString(token.image); comment = SqlLiteral.createCharString(p, getPos()); @@ -367,53 +375,295 @@ SqlDrop SqlDropView(Span s, boolean replace) : } } +SqlIdentifier FlinkCollectionsTypeName() : +{ +} +{ + LOOKAHEAD(2) + <MULTISET> { + return new SqlIdentifier(SqlTypeName.MULTISET.name(), getPos()); + } +| + <ARRAY> { + return new SqlIdentifier(SqlTypeName.ARRAY.name(), getPos()); + } +} + +SqlIdentifier FlinkTypeName() : +{ + final SqlTypeName sqlTypeName; + final SqlIdentifier typeName; + final Span s = Span.of(); +} +{ + ( +<#-- additional types are included here --> +<#-- make custom data types in front of Calcite core data types --> +<#list parser.flinkDataTypeParserMethods as method> + <#if (method?index > 0)> + | + </#if> + LOOKAHEAD(2) + typeName = ${method} +</#list> + | + LOOKAHEAD(2) + sqlTypeName = SqlTypeName(s) { + typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this)); + } + | + LOOKAHEAD(2) + typeName = FlinkCollectionsTypeName() + | + typeName = CompoundIdentifier() { + throw new ParseException("UDT in DDL is not supported yet."); + } + ) + { + return typeName; + } +} + +/** +* Parse a Flink data type with nullable options, NULL -> nullable, NOT NULL -> not nullable. +* Default to be nullable. +*/ +SqlDataTypeSpec FlinkDataType() : +{ + final SqlIdentifier typeName; + SqlIdentifier collectionTypeName = null; + int scale = -1; + int precision = -1; + String charSetName = null; + final Span s; + boolean nullable = true; + boolean elementNullable = true; +} +{ + typeName = FlinkTypeName() { + s = span(); + } + [ + <LPAREN> + precision = UnsignedIntLiteral() + [ + <COMMA> + scale = UnsignedIntLiteral() + ] + <RPAREN> + ] + elementNullable = NullableOpt() + [ + collectionTypeName = FlinkCollectionsTypeName() + nullable = NullableOpt() + ] + { + if (null != collectionTypeName) { + return new FlinkSqlDataTypeSpec( + collectionTypeName, + typeName, + precision, + scale, + charSetName, + nullable, + elementNullable, + s.end(collectionTypeName)); + } + nullable = elementNullable; + return new FlinkSqlDataTypeSpec(typeName, + precision, + scale, + charSetName, + null, + nullable, + elementNullable, + s.end(this)); + } +} + +SqlIdentifier SqlStringType() : +{ +} +{ + <STRING> { return new SqlStringType(getPos()); } +} + +SqlIdentifier SqlBytesType() : +{ +} +{ + <BYTES> { return new SqlBytesType(getPos()); } +} + +boolean WithLocalTimeZone() : +{ +} +{ + <WITHOUT> <TIME> <ZONE> { return false; } +| + <WITH> + ( + <LOCAL> <TIME> <ZONE> { return true; } + | + <TIME> <ZONE> { + throw new ParseException("'WITH TIME ZONE' is not supported yet, options: " + + "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'."); + } + ) +| + { return false; } +} + +SqlIdentifier SqlTimeType() : +{ + int precision = -1; + boolean withLocalTimeZone = false; +} +{ + <TIME> + ( + <LPAREN> precision = UnsignedIntLiteral() <RPAREN> + | + { precision = -1; } + ) + withLocalTimeZone = WithLocalTimeZone() + { return new SqlTimeType(getPos(), precision, withLocalTimeZone); } +} + +SqlIdentifier SqlTimestampType() : +{ + int precision = -1; + boolean withLocalTimeZone = false; +} +{ + <TIMESTAMP> + ( + <LPAREN> precision = UnsignedIntLiteral() <RPAREN> + | + { precision = -1; } + ) + withLocalTimeZone = WithLocalTimeZone() + { return new SqlTimestampType(getPos(), precision, withLocalTimeZone); } +} + SqlIdentifier SqlArrayType() : { SqlParserPos pos; SqlDataTypeSpec elementType; + boolean nullable = true; } { <ARRAY> { pos = getPos(); } - <LT> elementType = DataType() + <LT> + elementType = FlinkDataType() <GT> { return new SqlArrayType(pos, elementType); } } +SqlIdentifier SqlMultisetType() : +{ + SqlParserPos pos; + SqlDataTypeSpec elementType; + boolean nullable = true; +} +{ + <MULTISET> { pos = getPos(); } + <LT> + elementType = FlinkDataType() + <GT> + { + return new SqlMultisetType(pos, elementType); + } +} + SqlIdentifier SqlMapType() : { SqlDataTypeSpec keyType; SqlDataTypeSpec valType; + boolean nullable = true; } { <MAP> - <LT> keyType = DataType() - <COMMA> valType = DataType() + <LT> + keyType = FlinkDataType() + <COMMA> + valType = FlinkDataType() <GT> { return new SqlMapType(getPos(), keyType, valType); } } +/** +* Parse a "name1 type1 ['i'm a comment'], name2 type2 ..." list. +*/ +void FieldNameTypeCommaList( + List<SqlIdentifier> fieldNames, + List<SqlDataTypeSpec> fieldTypes, + List<SqlCharStringLiteral> comments) : +{ + SqlIdentifier fName; + SqlDataTypeSpec fType; +} +{ + [ + fName = SimpleIdentifier() + fType = FlinkDataType() + { + fieldNames.add(fName); + fieldTypes.add(fType); + } + ( + <QUOTED_STRING> { + String p = SqlParserUtil.parseString(token.image); + comments.add(SqlLiteral.createCharString(p, getPos())); + } + | + { comments.add(null); } + ) + ] + ( + <COMMA> + fName = SimpleIdentifier() + fType = FlinkDataType() + { + fieldNames.add(fName); + fieldTypes.add(fType); + } + ( + <QUOTED_STRING> { + String p = SqlParserUtil.parseString(token.image); + comments.add(SqlLiteral.createCharString(p, getPos())); + } + | + { comments.add(null); } + ) + )* +} + +/** +* Parse Row type, we support both Row(name1 type1, name2 type2) and Row<name1 type1, name2 type2>. +* Every item type can have suffix of `NULL` or `NOT NULL` to indicate if this type is nullable. +* i.e. Row(f0 int not null, f1 varchar null). +*/ SqlIdentifier SqlRowType() : { - SqlParserPos pos; List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>(); List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>(); + List<SqlCharStringLiteral> comments = new ArrayList<SqlCharStringLiteral>(); } { - <ROW> { pos = getPos(); SqlIdentifier fName; SqlDataTypeSpec fType;} - <LT> - fName = SimpleIdentifier() <COLON> fType = DataType() - { fieldNames.add(fName); fieldTypes.add(fType); } + <ROW> ( - <COMMA> - fName = SimpleIdentifier() <COLON> fType = DataType() - { fieldNames.add(fName); fieldTypes.add(fType); } - )* - <GT> + <NE> + | + <LT> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <GT> + | + <LPAREN> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <RPAREN> + ) { - return new SqlRowType(pos, fieldNames, fieldTypes); + return new SqlRowType(getPos(), fieldNames, fieldTypes, comments); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java new file mode 100644 index 0000000..a3797d7 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.flink.sql.parser.type.ExtendedSqlType; +import org.apache.flink.sql.parser.type.SqlArrayType; +import org.apache.flink.sql.parser.type.SqlBytesType; +import org.apache.flink.sql.parser.type.SqlMapType; +import org.apache.flink.sql.parser.type.SqlMultisetType; +import org.apache.flink.sql.parser.type.SqlRowType; +import org.apache.flink.sql.parser.type.SqlStringType; +import org.apache.flink.sql.parser.type.SqlTimeType; +import org.apache.flink.sql.parser.type.SqlTimestampType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.util.Util; + +import java.nio.charset.Charset; +import java.util.Objects; +import java.util.TimeZone; +import java.util.stream.Collectors; + +/** + * Represents a SQL data type specification in a parse tree. + * + * <p>A <code>SqlDataTypeSpec</code> is immutable; once created, you cannot + * change any of the fields.</p> + * + * <p>This class is an extension to {@link SqlDataTypeSpec}, we support + * complex type expressions like:</p> + * + * <blockquote><code>ROW(<br> + * foo NUMBER(5, 2) NOT NULL,<br> + * rec ROW(b BOOLEAN, i MyUDT NOT NULL))</code></blockquote> + * + * <p>Until <a href="https://issues.apache.org/jira/browse/CALCITE-3213">CALCITE-3213</a> + * is resolved, we can remove this class. + */ +public class FlinkSqlDataTypeSpec extends SqlDataTypeSpec { + // Flag saying if the element type is nullable if this type is a collection type. + // For collection type, we mean ARRAY and MULTISET type now. + private Boolean elementNullable; + + public FlinkSqlDataTypeSpec( + SqlIdentifier collectionsTypeName, + SqlIdentifier typeName, + int precision, + int scale, + String charSetName, + Boolean nullable, + Boolean elementNullable, + SqlParserPos pos) { + super(collectionsTypeName, typeName, precision, scale, + charSetName, null, nullable, pos); + this.elementNullable = elementNullable; + } + + public FlinkSqlDataTypeSpec( + SqlIdentifier collectionsTypeName, + SqlIdentifier typeName, + int precision, + int scale, + String charSetName, + TimeZone timeZone, + Boolean nullable, + Boolean elementNullable, + SqlParserPos pos) { + super(collectionsTypeName, typeName, precision, scale, + charSetName, timeZone, nullable, pos); + this.elementNullable = elementNullable; + } + + public FlinkSqlDataTypeSpec( + SqlIdentifier typeName, + int precision, + int scale, + String charSetName, + TimeZone timeZone, + Boolean nullable, + Boolean elementNullable, + SqlParserPos pos) { + super(null, typeName, precision, scale, + charSetName, timeZone, nullable, pos); + this.elementNullable = elementNullable; + } + + @Override + public SqlNode clone(SqlParserPos pos) { + return (getCollectionsTypeName() != null) + ? new FlinkSqlDataTypeSpec(getCollectionsTypeName(), getTypeName(), getPrecision(), + getScale(), getCharSetName(), getNullable(), this.elementNullable, pos) + : new FlinkSqlDataTypeSpec(getTypeName(), getPrecision(), getScale(), + getCharSetName(), getTimeZone(), getNullable(), this.elementNullable, pos); + } + + /** Returns a copy of this data type specification with a given + * nullability. */ + @Override + public SqlDataTypeSpec withNullable(Boolean nullable) { + if (Objects.equals(nullable, this.getNullable())) { + return this; + } + return new FlinkSqlDataTypeSpec(getCollectionsTypeName(), getTypeName(), + getPrecision(), getScale(), getCharSetName(), getTimeZone(), nullable, + this.elementNullable, getParserPosition()); + } + + @Override + public RelDataType deriveType(RelDataTypeFactory typeFactory) { + // Default to be nullable. + return this.deriveType(typeFactory, true); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + final SqlIdentifier typeName = getTypeName(); + String name = typeName.getSimple(); + if (typeName instanceof ExtendedSqlType) { + typeName.unparse(writer, leftPrec, rightPrec); + } else if (SqlTypeName.get(name) != null) { + SqlTypeName sqlTypeName = SqlTypeName.get(name); + writer.keyword(name); + if (sqlTypeName.allowsPrec() && this.getPrecision() >= 0) { + SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")"); + writer.print(this.getPrecision()); + if (sqlTypeName.allowsScale() && this.getScale() >= 0) { + writer.sep(",", true); + writer.print(this.getScale()); + } + + writer.endList(frame); + } + + if (this.getCharSetName() != null) { + writer.keyword("CHARACTER SET"); + writer.identifier(this.getCharSetName(), false); + } + + if (this.getCollectionsTypeName() != null) { + // Fix up nullable attribute if this is a collection type. + if (elementNullable != null && !elementNullable) { + writer.keyword("NOT NULL"); + } + writer.keyword(this.getCollectionsTypeName().getSimple()); + } + } else if (name.startsWith("_")) { + writer.keyword(name.substring(1)); + } else { + this.getTypeName().unparse(writer, leftPrec, rightPrec); + } + if (getNullable() != null && !getNullable()) { + writer.keyword("NOT NULL"); + } + } + + @Override + public RelDataType deriveType(RelDataTypeFactory typeFactory, boolean nullable) { + final SqlIdentifier typeName = getTypeName(); + if (!typeName.isSimple()) { + return null; + } + final String name = typeName.getSimple(); + final SqlTypeName sqlTypeName = SqlTypeName.get(name); + // Try to get Flink custom data type first. + RelDataType type = getExtendedType(typeFactory, typeName); + if (type == null) { + if (sqlTypeName == null) { + return null; + } else { + // NOTE jvs 15-Jan-2009: earlier validation is supposed to + // have caught these, which is why it's OK for them + // to be assertions rather than user-level exceptions. + final int precision = getPrecision(); + final int scale = getScale(); + if ((precision >= 0) && (scale >= 0)) { + assert sqlTypeName.allowsPrecScale(true, true); + type = typeFactory.createSqlType(sqlTypeName, precision, scale); + } else if (precision >= 0) { + assert sqlTypeName.allowsPrecNoScale(); + type = typeFactory.createSqlType(sqlTypeName, precision); + } else { + assert sqlTypeName.allowsNoPrecNoScale(); + type = typeFactory.createSqlType(sqlTypeName); + } + } + } + + if (SqlTypeUtil.inCharFamily(type)) { + // Applying Syntax rule 10 from SQL:99 spec section 6.22 "If TD is a + // fixed-length, variable-length or large object character string, + // then the collating sequence of the result of the <cast + // specification> is the default collating sequence for the + // character repertoire of TD and the result of the <cast + // specification> has the Coercible coercibility characteristic." + SqlCollation collation = SqlCollation.COERCIBLE; + + Charset charset; + final String charSetName = getCharSetName(); + if (null == charSetName) { + charset = typeFactory.getDefaultCharset(); + } else { + String javaCharSetName = + Objects.requireNonNull( + SqlUtil.translateCharacterSetName(charSetName), charSetName); + charset = Charset.forName(javaCharSetName); + } + type = + typeFactory.createTypeWithCharsetAndCollation( + type, + charset, + collation); + } + + final SqlIdentifier collectionsTypeName = getCollectionsTypeName(); + if (null != collectionsTypeName) { + // Fix the nullability of the element type first. + boolean elementNullable = true; + if (this.elementNullable != null) { + elementNullable = this.elementNullable; + } + type = typeFactory.createTypeWithNullability(type, elementNullable); + + final String collectionName = collectionsTypeName.getSimple(); + final SqlTypeName collectionsSqlTypeName = + Objects.requireNonNull(SqlTypeName.get(collectionName), + collectionName); + + switch (collectionsSqlTypeName) { + case MULTISET: + type = typeFactory.createMultisetType(type, -1); + break; + case ARRAY: + type = typeFactory.createArrayType(type, -1); + break; + default: + throw Util.unexpected(collectionsSqlTypeName); + } + } + + // Fix the nullability of this type. + if (this.getNullable() != null) { + nullable = this.getNullable(); + } + type = typeFactory.createTypeWithNullability(type, nullable); + + return type; + } + + private RelDataType getExtendedType(RelDataTypeFactory typeFactory, SqlIdentifier typeName) { + // quick check. + if (!(typeName instanceof ExtendedSqlType)) { + return null; + } + if (typeName instanceof SqlBytesType) { + return typeFactory.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE); + } else if (typeName instanceof SqlStringType) { + return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE); + } else if (typeName instanceof SqlArrayType) { + final SqlArrayType arrayType = (SqlArrayType) typeName; + return typeFactory.createArrayType(arrayType.getElementType() + .deriveType(typeFactory), -1); + } else if (typeName instanceof SqlMultisetType) { + final SqlMultisetType multiSetType = (SqlMultisetType) typeName; + return typeFactory.createMultisetType(multiSetType.getElementType() + .deriveType(typeFactory), -1); + } else if (typeName instanceof SqlMapType) { + final SqlMapType mapType = (SqlMapType) typeName; + return typeFactory.createMapType( + mapType.getKeyType().deriveType(typeFactory), + mapType.getValType().deriveType(typeFactory)); + } else if (typeName instanceof SqlRowType) { + final SqlRowType rowType = (SqlRowType) typeName; + return typeFactory.createStructType( + rowType.getFieldTypes().stream().map(ft -> ft.deriveType(typeFactory)) + .collect(Collectors.toList()), + rowType.getFieldNames().stream().map(SqlIdentifier::getSimple) + .collect(Collectors.toList())); + } else if (typeName instanceof SqlTimeType) { + final SqlTimeType zonedTimeType = (SqlTimeType) typeName; + if (zonedTimeType.getPrecision() >= 0) { + return typeFactory.createSqlType(zonedTimeType.getSqlTypeName(), + zonedTimeType.getPrecision()); + } else { + // Use default precision. + return typeFactory.createSqlType(zonedTimeType.getSqlTypeName()); + } + } else if (typeName instanceof SqlTimestampType) { + final SqlTimestampType zonedTimestampType = (SqlTimestampType) typeName; + if (zonedTimestampType.getPrecision() >= 0) { + return typeFactory.createSqlType(zonedTimestampType.getSqlTypeName(), + zonedTimestampType.getPrecision()); + } else { + // Use default precision. + return typeFactory.createSqlType(zonedTimestampType.getSqlTypeName()); + } + } + return null; + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java similarity index 55% copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java index 588c993..dfc2d1f 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,40 +18,21 @@ package org.apache.flink.sql.parser.type; -import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeName; /** - * Parse column of Map type. + * Parse type "BYTES" which is a synonym of VARBINARY(INT_MAX). */ -public class SqlMapType extends SqlIdentifier implements ExtendedSqlType { +public class SqlBytesType extends SqlIdentifier implements ExtendedSqlType { - private final SqlDataTypeSpec keyType; - private final SqlDataTypeSpec valType; - - public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) { - super(SqlTypeName.MAP.getName(), pos); - this.keyType = keyType; - this.valType = valType; - } - - public SqlDataTypeSpec getKeyType() { - return keyType; - } - - public SqlDataTypeSpec getValType() { - return valType; + public SqlBytesType(SqlParserPos pos) { + super("BYTES", pos); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("MAP<"); - ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec); - writer.sep(","); - ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec); - writer.keyword(">"); + writer.keyword("BYTES"); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java index 588c993..fd071c0 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java @@ -48,10 +48,12 @@ public class SqlMapType extends SqlIdentifier implements ExtendedSqlType { @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("MAP<"); + writer.keyword("MAP"); + SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">"); + writer.sep(","); ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec); writer.sep(","); ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec); - writer.keyword(">"); + writer.endList(frame); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java similarity index 64% copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java index 588c993..3b6f19c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java @@ -25,33 +25,25 @@ import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; /** - * Parse column of Map type. + * Parse column of MULTISET type. */ -public class SqlMapType extends SqlIdentifier implements ExtendedSqlType { +public class SqlMultisetType extends SqlIdentifier implements ExtendedSqlType { - private final SqlDataTypeSpec keyType; - private final SqlDataTypeSpec valType; + private final SqlDataTypeSpec elementType; - public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) { - super(SqlTypeName.MAP.getName(), pos); - this.keyType = keyType; - this.valType = valType; + public SqlMultisetType(SqlParserPos pos, SqlDataTypeSpec elementType) { + super(SqlTypeName.MULTISET.getName(), pos); + this.elementType = elementType; } - public SqlDataTypeSpec getKeyType() { - return keyType; - } - - public SqlDataTypeSpec getValType() { - return valType; + public SqlDataTypeSpec getElementType() { + return elementType; } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("MAP<"); - ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec); - writer.sep(","); - ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec); + writer.keyword("MULTISET<"); + ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec); writer.keyword(">"); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java index e77529e..886125c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java @@ -18,6 +18,7 @@ package org.apache.flink.sql.parser.type; +import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlWriter; @@ -34,13 +35,16 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType { private final List<SqlIdentifier> fieldNames; private final List<SqlDataTypeSpec> fieldTypes; + private final List<SqlCharStringLiteral> comments; public SqlRowType(SqlParserPos pos, List<SqlIdentifier> fieldNames, - List<SqlDataTypeSpec> fieldTypes) { + List<SqlDataTypeSpec> fieldTypes, + List<SqlCharStringLiteral> comments) { super(SqlTypeName.ROW.getName(), pos); this.fieldNames = fieldNames; this.fieldTypes = fieldTypes; + this.comments = comments; } public List<SqlIdentifier> getFieldNames() { @@ -51,6 +55,10 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType { return fieldTypes; } + public List<SqlCharStringLiteral> getComments() { + return comments; + } + public int getArity() { return fieldNames.size(); } @@ -65,14 +73,22 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType { @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("ROW"); - SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">"); - for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) { - writer.sep(",", false); - p.left.unparse(writer, 0, 0); - writer.sep(":"); - ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec); + writer.print("ROW"); + if (getFieldNames().size() == 0) { + writer.print("<>"); + } else { + SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">"); + int i = 0; + for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) { + writer.sep(",", false); + p.left.unparse(writer, 0, 0); + ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec); + if (comments.get(i) != null) { + comments.get(i).unparse(writer, leftPrec, rightPrec); + } + i += 1; + } + writer.endList(frame); } - writer.endList(frame); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java similarity index 55% copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java index 588c993..a134b13 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,40 +18,21 @@ package org.apache.flink.sql.parser.type; -import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeName; /** - * Parse column of Map type. + * Parse type "STRING" which is a synonym of VARCHAR(INT_MAX). */ -public class SqlMapType extends SqlIdentifier implements ExtendedSqlType { +public class SqlStringType extends SqlIdentifier implements ExtendedSqlType { - private final SqlDataTypeSpec keyType; - private final SqlDataTypeSpec valType; - - public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) { - super(SqlTypeName.MAP.getName(), pos); - this.keyType = keyType; - this.valType = valType; - } - - public SqlDataTypeSpec getKeyType() { - return keyType; - } - - public SqlDataTypeSpec getValType() { - return valType; + public SqlStringType(SqlParserPos pos) { + super("STRING", pos); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("MAP<"); - ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec); - writer.sep(","); - ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec); - writer.keyword(">"); + writer.keyword("STRING"); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java new file mode 100644 index 0000000..23855ce --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.type; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Parse type "TIME WITHOUT TIME ZONE", "TIME(3) WITHOUT TIME ZONE", "TIME WITH LOCAL TIME ZONE", + * or "TIME(3) WITH LOCAL TIME ZONE". + */ +public class SqlTimeType extends SqlIdentifier implements ExtendedSqlType { + private final int precision; + private final boolean withLocalTimeZone; + + public SqlTimeType(SqlParserPos pos, int precision, boolean withLocalTimeZone) { + super(getTypeName(withLocalTimeZone), pos); + this.precision = precision; + this.withLocalTimeZone = withLocalTimeZone; + } + + private static String getTypeName(boolean withLocalTimeZone) { + if (withLocalTimeZone) { + return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE.name(); + } else { + return SqlTypeName.TIME.name(); + } + } + + public SqlTypeName getSqlTypeName() { + if (withLocalTimeZone) { + return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE; + } else { + return SqlTypeName.TIME; + } + } + + public int getPrecision() { + return this.precision; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword(SqlTypeName.TIME.name()); + if (this.precision >= 0) { + final SqlWriter.Frame frame = + writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")"); + writer.print(precision); + writer.endList(frame); + } + if (this.withLocalTimeZone) { + writer.keyword("WITH LOCAL TIME ZONE"); + } + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java new file mode 100644 index 0000000..09e2d08 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.type; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Parse type "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP(3) WITHOUT TIME ZONE", + * "TIMESTAMP WITH LOCAL TIME ZONE", or "TIMESTAMP(3) WITH LOCAL TIME ZONE". + */ +public class SqlTimestampType extends SqlIdentifier implements ExtendedSqlType { + private final int precision; + private final boolean withLocalTimeZone; + + public SqlTimestampType(SqlParserPos pos, int precision, boolean withLocalTimeZone) { + super(getTypeName(withLocalTimeZone), pos); + this.precision = precision; + this.withLocalTimeZone = withLocalTimeZone; + } + + private static String getTypeName(boolean withLocalTimeZone) { + if (withLocalTimeZone) { + return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name(); + } else { + return SqlTypeName.TIMESTAMP.name(); + } + } + + public SqlTypeName getSqlTypeName() { + if (withLocalTimeZone) { + return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE; + } else { + return SqlTypeName.TIMESTAMP; + } + } + + public int getPrecision() { + return this.precision; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword(SqlTypeName.TIMESTAMP.name()); + if (this.precision >= 0) { + final SqlWriter.Frame frame = + writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")"); + writer.print(precision); + writer.endList(frame); + } + if (this.withLocalTimeZone) { + writer.keyword("WITH LOCAL TIME ZONE"); + } + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java new file mode 100644 index 0000000..671958e --- /dev/null +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.util.List; + +/** + * Type used during tests. + */ +public class Fixture { + private final RelDataTypeFactory typeFactory; + + final RelDataType char1Type; + final RelDataType char33Type; + final RelDataType varcharType; + final RelDataType varchar33Type; + final RelDataType booleanType; + final RelDataType binaryType; + final RelDataType binary33Type; + final RelDataType varbinaryType; + final RelDataType varbinary33Type; + final RelDataType decimalType; + final RelDataType decimalP10S0Type; + final RelDataType decimalP10S3Type; + final RelDataType tinyintType; + final RelDataType smallintType; + final RelDataType intType; + final RelDataType bigintType; + final RelDataType floatType; + final RelDataType doubleType; + final RelDataType dateType; + final RelDataType timeType; + final RelDataType time3Type; + final RelDataType timestampType; + final RelDataType timestamp3Type; + final RelDataType timestampWithLocalTimeZoneType; + final RelDataType timestamp3WithLocalTimeZoneType; + final RelDataType nullType; + + Fixture(RelDataTypeFactory typeFactory) { + this.typeFactory = typeFactory; + this.char1Type = typeFactory.createSqlType(SqlTypeName.CHAR); + this.char33Type = typeFactory.createSqlType(SqlTypeName.CHAR, 33); + this.varcharType = typeFactory.createSqlType(SqlTypeName.VARCHAR); + this.varchar33Type = typeFactory.createSqlType(SqlTypeName.VARCHAR, 33); + this.booleanType = typeFactory.createSqlType(SqlTypeName.BOOLEAN); + this.binaryType = typeFactory.createSqlType(SqlTypeName.BINARY); + this.binary33Type = typeFactory.createSqlType(SqlTypeName.BINARY, 33); + this.varbinaryType = typeFactory.createSqlType(SqlTypeName.VARBINARY); + this.varbinary33Type = typeFactory.createSqlType(SqlTypeName.VARBINARY, 33); + this.decimalType = typeFactory.createSqlType(SqlTypeName.DECIMAL); + this.decimalP10S0Type = typeFactory.createSqlType(SqlTypeName.DECIMAL, 10); + this.decimalP10S3Type = typeFactory.createSqlType(SqlTypeName.DECIMAL, 10, 3); + this.tinyintType = typeFactory.createSqlType(SqlTypeName.TINYINT); + this.smallintType = typeFactory.createSqlType(SqlTypeName.SMALLINT); + this.intType = typeFactory.createSqlType(SqlTypeName.INTEGER); + this.bigintType = typeFactory.createSqlType(SqlTypeName.BIGINT); + this.floatType = typeFactory.createSqlType(SqlTypeName.FLOAT); + this.doubleType = typeFactory.createSqlType(SqlTypeName.DOUBLE); + this.dateType = typeFactory.createSqlType(SqlTypeName.DATE); + this.timeType = typeFactory.createSqlType(SqlTypeName.TIME); + this.time3Type = typeFactory.createSqlType(SqlTypeName.TIME, 3); + this.timestampType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + this.timestamp3Type = typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 3); + this.timestampWithLocalTimeZoneType = + typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + this.timestamp3WithLocalTimeZoneType = + typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3); + this.nullType = typeFactory.createSqlType(SqlTypeName.NULL); + } + + public RelDataType createSqlType(SqlTypeName sqlTypeName, int precision) { + return typeFactory.createSqlType(sqlTypeName, precision); + } + + public RelDataType createArrayType(RelDataType elementType) { + return typeFactory.createArrayType(elementType, -1); + } + + public RelDataType createMultisetType(RelDataType elementType) { + return typeFactory.createMultisetType(elementType, -1); + } + + public RelDataType createMapType(RelDataType keyType, RelDataType valType) { + return typeFactory.createMapType(keyType, valType); + } + + public RelDataType createStructType(List<RelDataType> keyTypes, List<String> names) { + return typeFactory.createStructType(keyTypes, names); + } + + public RelDataType nullable(RelDataType type) { + return typeFactory.createTypeWithNullability(type, true); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java new file mode 100644 index 0000000..4b4499f --- /dev/null +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.flink.sql.parser.ddl.SqlCreateTable; +import org.apache.flink.sql.parser.ddl.SqlTableColumn; +import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; +import org.apache.flink.sql.parser.validate.FlinkSqlConformance; + +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.dialect.CalciteSqlDialect; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParserTest; +import org.apache.calcite.sql.parser.SqlParserUtil; +import org.apache.calcite.sql.pretty.SqlPrettyWriter; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.SqlValidatorTestCase; +import org.apache.calcite.util.Util; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for all the sup[ported flink DDL data types. + */ +@RunWith(Parameterized.class) +public class FlinkDDLDataTypeTest { + private FlinkSqlConformance conformance = FlinkSqlConformance.DEFAULT; + private static final RelDataTypeFactory TYPE_FACTORY = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + private static final Fixture FIXTURE = new Fixture(TYPE_FACTORY); + private static final String DDL_FORMAT = "create table t1 (\n" + + " f0 %s\n" + + ") with (\n" + + " k1 = 'v1'\n" + + ")"; + + @Parameterized.Parameters(name = "{index}: {0}") + public static List<TestItem> testData() { + return Arrays.asList( + createTestItem("CHAR", nullable(FIXTURE.char1Type), "CHAR"), + createTestItem("CHAR NOT NULL", FIXTURE.char1Type, "CHAR NOT NULL"), + createTestItem("CHAR NOT \t\nNULL", FIXTURE.char1Type, "CHAR NOT NULL"), + createTestItem("char not null", FIXTURE.char1Type, "CHAR NOT NULL"), + createTestItem("CHAR NULL", nullable(FIXTURE.char1Type), "CHAR"), + createTestItem("CHAR(33)", nullable(FIXTURE.char33Type), "CHAR(33)"), + createTestItem("VARCHAR", nullable(FIXTURE.varcharType), "VARCHAR"), + createTestItem("VARCHAR(33)", nullable(FIXTURE.varchar33Type), "VARCHAR(33)"), + createTestItem("STRING", + nullable(FIXTURE.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE)), "STRING"), + createTestItem("BOOLEAN", nullable(FIXTURE.booleanType), "BOOLEAN"), + createTestItem("BINARY", nullable(FIXTURE.binaryType), "BINARY"), + createTestItem("BINARY(33)", nullable(FIXTURE.binary33Type), "BINARY(33)"), + createTestItem("VARBINARY", nullable(FIXTURE.varbinaryType), "VARBINARY"), + createTestItem("VARBINARY(33)", nullable(FIXTURE.varbinary33Type), + "VARBINARY(33)"), + createTestItem("BYTES", + nullable(FIXTURE.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE)), + "BYTES"), + createTestItem("DECIMAL", nullable(FIXTURE.decimalType), "DECIMAL"), + createTestItem("DEC", nullable(FIXTURE.decimalType), "DECIMAL"), + createTestItem("NUMERIC", nullable(FIXTURE.decimalType), "DECIMAL"), + createTestItem("DECIMAL(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"), + createTestItem("DEC(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"), + createTestItem("NUMERIC(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"), + createTestItem("DECIMAL(10, 3)", nullable(FIXTURE.decimalP10S3Type), + "DECIMAL(10, 3)"), + createTestItem("DEC(10, 3)", nullable(FIXTURE.decimalP10S3Type), + "DECIMAL(10, 3)"), + createTestItem("NUMERIC(10, 3)", nullable(FIXTURE.decimalP10S3Type), + "DECIMAL(10, 3)"), + createTestItem("TINYINT", nullable(FIXTURE.tinyintType), "TINYINT"), + createTestItem("SMALLINT", nullable(FIXTURE.smallintType), "SMALLINT"), + createTestItem("INTEGER", nullable(FIXTURE.intType), "INTEGER"), + createTestItem("INT", nullable(FIXTURE.intType), "INTEGER"), + createTestItem("BIGINT", nullable(FIXTURE.bigintType), "BIGINT"), + createTestItem("FLOAT", nullable(FIXTURE.floatType), "FLOAT"), + createTestItem("DOUBLE", nullable(FIXTURE.doubleType), "DOUBLE"), + createTestItem("DOUBLE PRECISION", nullable(FIXTURE.doubleType), "DOUBLE"), + createTestItem("DATE", nullable(FIXTURE.dateType), "DATE"), + createTestItem("TIME", nullable(FIXTURE.timeType), "TIME"), + createTestItem("TIME WITHOUT TIME ZONE", nullable(FIXTURE.timeType), "TIME"), + createTestItem("TIME(3)", nullable(FIXTURE.time3Type), "TIME(3)"), + createTestItem("TIME(3) WITHOUT TIME ZONE", nullable(FIXTURE.time3Type), + "TIME(3)"), + createTestItem("TIMESTAMP", nullable(FIXTURE.timestampType), "TIMESTAMP"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE", nullable(FIXTURE.timestampType), + "TIMESTAMP"), + createTestItem("TIMESTAMP(3)", nullable(FIXTURE.timestamp3Type), "TIMESTAMP(3)"), + createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", + nullable(FIXTURE.timestamp3Type), "TIMESTAMP(3)"), + createTestItem("TIMESTAMP WITH LOCAL TIME ZONE", + nullable(FIXTURE.timestampWithLocalTimeZoneType), + "TIMESTAMP WITH LOCAL TIME ZONE"), + createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE", + nullable(FIXTURE.timestamp3WithLocalTimeZoneType), + "TIMESTAMP(3) WITH LOCAL TIME ZONE"), + createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>", + nullable(FIXTURE.createArrayType(nullable(FIXTURE.timestamp3WithLocalTimeZoneType))), + "ARRAY< TIMESTAMP(3) WITH LOCAL TIME ZONE >"), + createTestItem("ARRAY<INT NOT NULL>", + nullable(FIXTURE.createArrayType(FIXTURE.intType)), + "ARRAY< INTEGER NOT NULL >"), + createTestItem("INT ARRAY", + nullable(FIXTURE.createArrayType(nullable(FIXTURE.intType))), + "INTEGER ARRAY"), + createTestItem("INT NOT NULL ARRAY", + nullable(FIXTURE.createArrayType(FIXTURE.intType)), + "INTEGER NOT NULL ARRAY"), + createTestItem("INT ARRAY NOT NULL", + FIXTURE.createArrayType(nullable(FIXTURE.intType)), + "INTEGER ARRAY NOT NULL"), + createTestItem("MULTISET<INT NOT NULL>", + nullable(FIXTURE.createMultisetType(FIXTURE.intType)), + "MULTISET< INTEGER NOT NULL >"), + createTestItem("INT MULTISET", + nullable(FIXTURE.createMultisetType(nullable(FIXTURE.intType))), + "INTEGER MULTISET"), + createTestItem("INT NOT NULL MULTISET", + nullable(FIXTURE.createMultisetType(FIXTURE.intType)), + "INTEGER NOT NULL MULTISET"), + createTestItem("INT MULTISET NOT NULL", + FIXTURE.createMultisetType(nullable(FIXTURE.intType)), + "INTEGER MULTISET NOT NULL"), + createTestItem("MAP<BIGINT, BOOLEAN>", + nullable(FIXTURE.createMapType( + nullable(FIXTURE.bigintType), + nullable(FIXTURE.booleanType))), + "MAP< BIGINT, BOOLEAN >"), + createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>", + nullable(FIXTURE.createStructType( + Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)), + Arrays.asList("f0", "f1"))), + "ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"), + createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)", + nullable(FIXTURE.createStructType( + Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)), + Arrays.asList("f0", "f1"))), + "ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"), + createTestItem("ROW<`f0` INT>", + nullable(FIXTURE.createStructType( + Collections.singletonList(nullable(FIXTURE.intType)), + Collections.singletonList("f0"))), + "ROW< `f0` INTEGER >"), + createTestItem("ROW(`f0` INT)", + nullable(FIXTURE.createStructType( + Collections.singletonList(nullable(FIXTURE.intType)), + Collections.singletonList("f0"))), + "ROW< `f0` INTEGER >"), + createTestItem("ROW<>", + nullable(FIXTURE.createStructType( + Collections.emptyList(), + Collections.emptyList())), + "ROW<>"), + createTestItem("ROW()", + nullable(FIXTURE.createStructType( + Collections.emptyList(), + Collections.emptyList())), + "ROW<>"), + createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', " + + "f1 BOOLEAN 'This as well.'>", + nullable(FIXTURE.createStructType( + Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)), + Arrays.asList("f0", "f1"))), + "ROW< `f0` INTEGER NOT NULL 'This is a comment.', " + + "`f1` BOOLEAN 'This as well.' >"), + + // test parse throws error. + createTestItem("TIMESTAMP WITH TIME ZONE", + "'WITH TIME ZONE' is not supported yet, options: " + + "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'."), + createTestItem("TIMESTAMP(3) WITH TIME ZONE", + "'WITH TIME ZONE' is not supported yet, options: " + + "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'."), + createTestItem("^NULL^", + "(?s).*Encountered \"NULL\" at line 2, column 6..*"), + createTestItem("cat.db.MyType", + "(?s).*UDT in DDL is not supported yet..*"), + createTestItem("`db`.`MyType`", + "(?s).*UDT in DDL is not supported yet..*"), + createTestItem("MyType", + "(?s).*UDT in DDL is not supported yet..*"), + createTestItem("ARRAY<MyType>", + "(?s).*UDT in DDL is not supported yet..*"), + createTestItem("ROW<f0 MyType, f1 `c`.`d`.`t`>", + "(?s).*UDT in DDL is not supported yet..*"), + createTestItem("^INTERVAL^ YEAR", + "(?s).*Encountered \"INTERVAL\" at line 2, column 6..*"), + createTestItem("ANY(^'unknown.class'^, '')", + "(?s).*Encountered \"\\\\'unknown.class\\\\'\" at line 2, column 10.\n.*" + + "Was expecting:\n" + + " <UNSIGNED_INTEGER_LITERAL> ...\n" + + ".*")); + } + + private static TestItem createTestItem(Object... args) { + assert args.length >= 2; + final String testExpr = (String) args[0]; + TestItem testItem = TestItem.fromTestExpr(testExpr); + if (args[1] instanceof String) { + testItem.withExpectedError((String) args[1]); + } else if (args[1] instanceof RelDataType) { + testItem.withExpectedType((RelDataType) args[1]); + } + if (args.length == 3) { + testItem.withExpectedUnparsed((String) args[2]); + } + return testItem; + } + + @Parameterized.Parameter + public TestItem testItem; + + @Test + public void testDataTypeParsing() { + if (testItem.expectedType != null) { + checkType(testItem.testExpr, testItem.expectedType); + } + } + + @Test + public void testThrowsError() { + if (testItem.expectedError != null) { + checkFails(testItem.testExpr, testItem.expectedError); + } + } + + @Test + public void testDataTypeUnparsing() { + if (testItem.expectedUnparsed != null) { + checkUnparseTo(testItem.testExpr, testItem.expectedUnparsed); + } + } + + private static RelDataType nullable(RelDataType type) { + return FIXTURE.nullable(type); + } + + private void checkType(String typeExpr, RelDataType expectedType) { + this.sql(String.format(DDL_FORMAT, typeExpr)).checkType(expectedType); + } + + private void checkFails(String typeExpr, String expectedMsgPattern) { + sql(String.format(DDL_FORMAT, typeExpr)).fails(expectedMsgPattern); + } + + private void checkUnparseTo(String typeExpr, String expectedUnparsed) { + sql(String.format(DDL_FORMAT, typeExpr)).unparsedTo(expectedUnparsed); + } + + private Tester getTester() { + return new TesterImpl(); + } + + private Sql sql(String sql) { + return new Sql(sql); + } + + //~ Inner Classes ---------------------------------------------------------- + + private static class TestItem { + private final String testExpr; + @Nullable + private RelDataType expectedType; + @Nullable + private String expectedError; + @Nullable + private String expectedUnparsed; + + private TestItem(String testExpr) { + this.testExpr = testExpr; + } + + static TestItem fromTestExpr(String testExpr) { + return new TestItem(testExpr); + } + + TestItem withExpectedType(RelDataType expectedType) { + this.expectedType = expectedType; + return this; + } + + TestItem withExpectedError(String expectedError) { + this.expectedError = expectedError; + return this; + } + + TestItem withExpectedUnparsed(String expectedUnparsed) { + this.expectedUnparsed = expectedUnparsed; + return this; + } + + @Override + public String toString() { + return this.testExpr; + } + } + + private class Sql { + private final String sql; + + Sql(String sql) { + this.sql = sql; + } + + public Sql checkType(RelDataType type) { + getTester().checkType(this.sql, type); + return this; + } + + public Sql fails(String expectedMsgPattern) { + getTester().checkFails(this.sql, expectedMsgPattern); + return this; + } + + public Sql unparsedTo(String expectedUnparsed) { + getTester().checkUnparsed(this.sql, expectedUnparsed); + return this; + } + } + + /** + * Callback to control how test actions are performed. + */ + protected interface Tester { + void checkType(String sql, RelDataType type); + + void checkFails(String sql, String expectedMsgPattern); + + void checkUnparsed(String sql, String expectedUnparsed); + } + + /** + * Default implementation of {@link SqlParserTest.Tester}. + */ + protected class TesterImpl implements Tester { + private SqlParser getSqlParser(String sql) { + return SqlParser.create(sql, + SqlParser.configBuilder() + .setParserFactory(FlinkSqlParserImpl.FACTORY) + .setQuoting(Quoting.BACK_TICK) + .setUnquotedCasing(Casing.UNCHANGED) + .setQuotedCasing(Casing.UNCHANGED) + .setConformance(conformance) + .build()); + } + + private SqlDialect getSqlDialect() { + return new CalciteSqlDialect(SqlDialect.EMPTY_CONTEXT + .withQuotedCasing(Casing.UNCHANGED) + .withConformance(conformance) + .withUnquotedCasing(Casing.UNCHANGED) + .withIdentifierQuoteString("`")); + } + + public void checkType(String sql, RelDataType type) { + final SqlNode sqlNode = parseStmtAndHandleEx(sql); + assert sqlNode instanceof SqlCreateTable; + final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode; + SqlNodeList columns = sqlCreateTable.getColumnList(); + assert columns.size() == 1; + RelDataType columnType = ((SqlTableColumn) columns.get(0)).getType() + .deriveType(TYPE_FACTORY); + assertEquals(type, columnType); + } + + private SqlNode parseStmtAndHandleEx(String sql) { + final SqlNode sqlNode; + try { + sqlNode = getSqlParser(sql).parseStmt(); + } catch (SqlParseException e) { + throw new RuntimeException("Error while parsing SQL: " + sql, e); + } + return sqlNode; + } + + public void checkFails( + String sql, + String expectedMsgPattern) { + SqlParserUtil.StringAndPos sap = SqlParserUtil.findPos(sql); + Throwable thrown = null; + try { + final SqlNode sqlNode; + sqlNode = getSqlParser(sap.sql).parseStmt(); + Util.discard(sqlNode); + } catch (Throwable ex) { + thrown = ex; + } + + checkEx(expectedMsgPattern, sap, thrown); + } + + public void checkUnparsed(String sql, String expectedUnparsed) { + final SqlNode sqlNode = parseStmtAndHandleEx(sql); + assert sqlNode instanceof SqlCreateTable; + final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode; + SqlNodeList columns = sqlCreateTable.getColumnList(); + assert columns.size() == 1; + SqlDataTypeSpec dataTypeSpec = ((SqlTableColumn) columns.get(0)).getType(); + SqlWriter sqlWriter = new SqlPrettyWriter(getSqlDialect(), false); + dataTypeSpec.unparse(sqlWriter, 0, 0); + assertEquals(expectedUnparsed, sqlWriter.toSqlString().getSql()); + } + + private void checkEx(String expectedMsgPattern, + SqlParserUtil.StringAndPos sap, + Throwable thrown) { + SqlValidatorTestCase.checkEx(thrown, expectedMsgPattern, sap); + } + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index f045817..cc892ab 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -366,7 +366,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest { check("CREATE TABLE tbl1 (\n" + " a ARRAY<bigint>, \n" + " b MAP<int, varchar>,\n" + - " c ROW<cc0:int, cc1: float, cc2: varchar>,\n" + + " c ROW<cc0 int, cc1 float, cc2 varchar>,\n" + + " d MULTISET<varchar>,\n" + " PRIMARY KEY (a, b) \n" + ") with (\n" + " x = 'y', \n" + @@ -374,28 +375,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest { ")\n", "CREATE TABLE `TBL1` (\n" + " `A` ARRAY< BIGINT >,\n" + " `B` MAP< INTEGER, VARCHAR >,\n" + - " `C` ROW< `CC0` : INTEGER, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" + - " PRIMARY KEY (`A`, `B`)\n" + - ") WITH (\n" + - " `X` = 'y',\n" + - " `ASD` = 'data'\n" + - ")"); - } - - @Test - public void testCreateTableWithDecimalType() { - check("CREATE TABLE tbl1 (\n" + - " a decimal, \n" + - " b decimal(10, 0),\n" + - " c decimal(38, 38),\n" + - " PRIMARY KEY (a, b) \n" + - ") with (\n" + - " x = 'y', \n" + - " asd = 'data'\n" + - ")\n", "CREATE TABLE `TBL1` (\n" + - " `A` DECIMAL,\n" + - " `B` DECIMAL(10, 0),\n" + - " `C` DECIMAL(38, 38),\n" + + " `C` ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" + + " `D` MULTISET< VARCHAR >,\n" + " PRIMARY KEY (`A`, `B`)\n" + ") WITH (\n" + " `X` = 'y',\n" + @@ -408,7 +389,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest { check("CREATE TABLE tbl1 (\n" + " a ARRAY<ARRAY<bigint>>, \n" + " b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" + - " c ROW<cc0:ARRAY<int>, cc1: float, cc2: varchar>,\n" + + " c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" + + " d MULTISET<ARRAY<int>>,\n" + " PRIMARY KEY (a, b) \n" + ") with (\n" + " x = 'y', \n" + @@ -416,7 +398,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest { ")\n", "CREATE TABLE `TBL1` (\n" + " `A` ARRAY< ARRAY< BIGINT > >,\n" + " `B` MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" + - " `C` ROW< `CC0` : ARRAY< INTEGER >, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" + + " `C` ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" + + " `D` MULTISET< ARRAY< INTEGER > >,\n" + " PRIMARY KEY (`A`, `B`)\n" + ") WITH (\n" + " `X` = 'y',\n" + @@ -437,7 +420,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { ")\n", "(?s).*Encountered \"\\(\" at line 4, column 14.\n" + "Was expecting one of:\n" + " \"AS\" ...\n" + - " \"CHARACTER\" ...\n" + + " \"ARRAY\" ...\n" + ".*"); } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 132755f..077fdfd 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -70,7 +70,7 @@ class WindowAggregateITCase(mode: StateBackendMode) val sql = """ |SELECT - | string, + | `string`, | HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | COUNT(1), @@ -79,7 +79,7 @@ class WindowAggregateITCase(mode: StateBackendMode) | COUNT(DISTINCT `float`), | concat_distinct_agg(name) |FROM T1 - |GROUP BY string, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) + |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) """.stripMargin val sink = new TestingAppendSink @@ -122,7 +122,7 @@ class WindowAggregateITCase(mode: StateBackendMode) val sql = """ |SELECT - | string, + | `string`, | SESSION_START(rowtime, INTERVAL '0.005' SECOND), | SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND), | COUNT(1), @@ -131,7 +131,7 @@ class WindowAggregateITCase(mode: StateBackendMode) | SUM(`int`), | COUNT(DISTINCT name) |FROM T1 - |GROUP BY string, SESSION(rowtime, INTERVAL '0.005' SECOND) + |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND) """.stripMargin val sink = new TestingAppendSink @@ -171,7 +171,7 @@ class WindowAggregateITCase(mode: StateBackendMode) val sql = """ |SELECT - | string, + | `string`, | TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start, | TUMBLE_END(rowtime, INTERVAL '0.005' SECOND), | COUNT(DISTINCT `long`), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 01a55f1..b332818 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -512,7 +512,7 @@ class TimeAttributesITCase extends AbstractTestBase { .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) tEnv.registerTable("T1", table) - val querySql = "select rowtime as ts, string as msg from T1" + val querySql = "select rowtime as ts, `string` as msg from T1" val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1] results.addSink(new StreamITCase.StringSink[Pojo1])