danny0405 commented on a change in pull request #9952: 
[FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
URL: https://github.com/apache/flink/pull/9952#discussion_r338883419
 
 

 ##########
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 ##########
 @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) {
                public List<SqlNode> columnList = new ArrayList<>();
                public SqlNodeList primaryKeyList = SqlNodeList.EMPTY;
                public List<SqlNodeList> uniqueKeysList = new ArrayList<>();
+               @Nullable public SqlWatermark watermark;
        }
 
        public String[] fullTableName() {
                return tableName.names.toArray(new String[0]);
        }
+
+       // 
-------------------------------------------------------------------------------------
+
+       private static final class ColumnValidator {
+
+               private final Set<String> allColumnNames = new HashSet<>();
+
+               /**
+                * Adds column name to the registered column set. This will add 
nested column names recursive.
+                * Nested column names are qualified using "." separator.
+                */
+               public void addColumn(SqlNode column) throws 
SqlValidateException {
+                       String columnName;
+                       if (column instanceof SqlTableColumn) {
+                               SqlTableColumn tableColumn = (SqlTableColumn) 
column;
+                               columnName = tableColumn.getName().getSimple();
+                               addNestedColumn(columnName, 
tableColumn.getType());
+                       } else if (column instanceof SqlBasicCall) {
+                               SqlBasicCall tableColumn = (SqlBasicCall) 
column;
+                               columnName = 
tableColumn.getOperands()[1].toString();
+                       } else {
+                               throw new 
UnsupportedOperationException("Unsupported column:" + column);
+                       }
+
+                       addColumnName(columnName, column.getParserPosition());
+               }
+
+               /**
+                * Returns true if the column name is existed in the registered 
column set.
+                * This supports qualified column name using "." separator.
+                */
+               public boolean contains(String columnName) {
+                       return allColumnNames.contains(columnName);
+               }
+
+               private void addNestedColumn(String columnName, SqlDataTypeSpec 
columnType) throws SqlValidateException {
+                       SqlTypeNameSpec typeName = columnType.getTypeNameSpec();
+                       // validate composite type
+                       if (typeName instanceof ExtendedSqlRowTypeNameSpec) {
 
 Review comment:
   For "Does not exist", do you mean the value is null ? A record type field 
can also have null value.
   
   BTW, the user did knows that if the `array[1]` exists, i don't think we 
forbidden the syntax just for a "safe" reason. Actually, in the standard SQL, 
`ARRAY` type has a fixed length, that means, its structure is fixed.
   
   For `MAP` type, i think it's equivalent with a Java POJO or record, we 
support the record but not the map, that does not make sense to me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to