danny0405 commented on a change in pull request #9952: 
[FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
URL: https://github.com/apache/flink/pull/9952#discussion_r338371878
 
 

 ##########
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 ##########
 @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) {
                public List<SqlNode> columnList = new ArrayList<>();
                public SqlNodeList primaryKeyList = SqlNodeList.EMPTY;
                public List<SqlNodeList> uniqueKeysList = new ArrayList<>();
+               @Nullable public SqlWatermark watermark;
        }
 
        public String[] fullTableName() {
                return tableName.names.toArray(new String[0]);
        }
+
+       // 
-------------------------------------------------------------------------------------
+
+       private static final class ColumnValidator {
+
+               private final Set<String> allColumnNames = new HashSet<>();
+
+               /**
+                * Adds column name to the registered column set. This will add 
nested column names recursive.
+                * Nested column names are qualified using "." separator.
+                */
+               public void addColumn(SqlNode column) throws 
SqlValidateException {
+                       String columnName;
+                       if (column instanceof SqlTableColumn) {
+                               SqlTableColumn tableColumn = (SqlTableColumn) 
column;
+                               columnName = tableColumn.getName().getSimple();
+                               addNestedColumn(columnName, 
tableColumn.getType());
+                       } else if (column instanceof SqlBasicCall) {
+                               SqlBasicCall tableColumn = (SqlBasicCall) 
column;
+                               columnName = 
tableColumn.getOperands()[1].toString();
+                       } else {
+                               throw new 
UnsupportedOperationException("Unsupported column:" + column);
+                       }
+
+                       addColumnName(columnName, column.getParserPosition());
+               }
+
+               /**
+                * Returns true if the column name is existed in the registered 
column set.
+                * This supports qualified column name using "." separator.
+                */
+               public boolean contains(String columnName) {
+                       return allColumnNames.contains(columnName);
+               }
+
+               private void addNestedColumn(String columnName, SqlDataTypeSpec 
columnType) throws SqlValidateException {
+                       SqlTypeNameSpec typeName = columnType.getTypeNameSpec();
+                       // validate composite type
+                       if (typeName instanceof ExtendedSqlRowTypeNameSpec) {
 
 Review comment:
   If that is the case, we should match the `ARRAY` and `MULTISET` type 
explicitly in the `ColumnValidator`, and throws a message that is readable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to