lincoln-lil commented on code in PR #21507:
URL: https://github.com/apache/flink/pull/21507#discussion_r1096998249


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java:
##########
@@ -80,4 +83,20 @@ public static void validateAndChangeColumnNullability(
             }
         }
     }
+
+    /** Check table/column constraint. */

Review Comment:
   nit: remove '/column' since only table constant is being validated?



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java:
##########
@@ -80,4 +83,20 @@ public static void validateAndChangeColumnNullability(
             }
         }
     }
+
+    /** Check table/column constraint. */
+    private static void validate(SqlTableConstraint constraint) throws 
SqlValidateException {
+        if (constraint.isUnique()) {
+            throw new SqlValidateException(
+                    constraint.getParserPosition(), "UNIQUE constraint is not 
supported yet");
+        }
+        if (constraint.isEnforced()) {
+            throw new SqlValidateException(
+                    constraint.getParserPosition(),
+                    "Flink doesn't support ENFORCED mode for "
+                            + "PRIMARY KEY constraint. ENFORCED/NOT ENFORCED 
controls if the constraint checks are performed on the incoming/outgoing data. "
+                            + "Flink does not own the data therefore the only 
supported mode "
+                            + "is the NOT ENFORCED mode");

Review Comment:
   nit: adjust the splitting of sentences to make each row similar in length



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java:
##########
@@ -52,21 +52,24 @@ public static List<SqlTableConstraint> getFullConstraints(
         return ret;
     }
 
-    /** Check duplicate constraints and change the nullability of primary key 
columns. */
+    /**
+     * Check constraints and change the nullability of primary key columns.
+     *
+     * @throws SqlValidateException if encountered duplicate primary key 
constraints, or the
+     *     constraint is enforced or unique.
+     */
     public static void validateAndChangeColumnNullability(
             List<SqlTableConstraint> tableConstraints, SqlNodeList columnList)
             throws SqlValidateException {
-        List<SqlTableConstraint> constraints =
-                getFullConstraints(tableConstraints, columnList).stream()
-                        .filter(SqlTableConstraint::isPrimaryKey)
-                        .collect(Collectors.toList());
-
-        if (constraints.size() > 1) {
+        List<SqlTableConstraint> fullConstraints = 
getFullConstraints(tableConstraints, columnList);
+        if 
(fullConstraints.stream().filter(SqlTableConstraint::isPrimaryKey).count() > 1) 
{
             throw new SqlValidateException(
-                    constraints.get(1).getParserPosition(), "Duplicate primary 
key definition");
-        } else if (constraints.size() == 1) {
+                    fullConstraints.get(1).getParserPosition(), "Duplicate 
primary key definition");
+        }
+        for (SqlTableConstraint constraint : fullConstraints) {
+            validate(constraint);
             Set<String> primaryKeyColumns =
-                    
Arrays.stream(constraints.get(0).getColumnNames()).collect(Collectors.toSet());
+                    
Arrays.stream(constraint.getColumnNames()).collect(Collectors.toSet());

Review Comment:
   add comment to state the nullability rewriting here or extract these lines 
to a single method `rewriteNullability` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to