danny0405 commented on a change in pull request #9952: [FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL URL: https://github.com/apache/flink/pull/9952#discussion_r338371878
########## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ########## @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) { public List<SqlNode> columnList = new ArrayList<>(); public SqlNodeList primaryKeyList = SqlNodeList.EMPTY; public List<SqlNodeList> uniqueKeysList = new ArrayList<>(); + @Nullable public SqlWatermark watermark; } public String[] fullTableName() { return tableName.names.toArray(new String[0]); } + + // ------------------------------------------------------------------------------------- + + private static final class ColumnValidator { + + private final Set<String> allColumnNames = new HashSet<>(); + + /** + * Adds column name to the registered column set. This will add nested column names recursive. + * Nested column names are qualified using "." separator. + */ + public void addColumn(SqlNode column) throws SqlValidateException { + String columnName; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + addNestedColumn(columnName, tableColumn.getType()); + } else if (column instanceof SqlBasicCall) { + SqlBasicCall tableColumn = (SqlBasicCall) column; + columnName = tableColumn.getOperands()[1].toString(); + } else { + throw new UnsupportedOperationException("Unsupported column:" + column); + } + + addColumnName(columnName, column.getParserPosition()); + } + + /** + * Returns true if the column name is existed in the registered column set. + * This supports qualified column name using "." separator. + */ + public boolean contains(String columnName) { + return allColumnNames.contains(columnName); + } + + private void addNestedColumn(String columnName, SqlDataTypeSpec columnType) throws SqlValidateException { + SqlTypeNameSpec typeName = columnType.getTypeNameSpec(); + // validate composite type + if (typeName instanceof ExtendedSqlRowTypeNameSpec) { Review comment: If that is the case, we should match the `ARRAY` and `MULTISET` type explicitly in the `ColumnValidator`, and throws a message that is readable. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services