wuchong commented on a change in pull request #11906:
URL: https://github.com/apache/flink/pull/11906#discussion_r427022575



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
                return baseUrl;
        }
 
+       // ------ Postgres default objects that shouldn't be exposed to users 
------
+
+       /**
+        * Retrieve the list of system schemas to ignore.
+        */
+       public abstract Set<String> getBuiltinSchemas();
+
+       /**
+        * Retrieve the list of system database to ignore.
+        */
+       public abstract Set<String> getBuiltinDatabases();
+
+       // ------ retrieve PK constraint ------
+
+       protected Map.Entry<String, List<String>> 
getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws 
SQLException {

Review comment:
       If we want to move this method into the base class, I would suggest to 
change the signature into 
   
   ```java
   UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, String schema, 
String table)
   ```
   
   1. `UniqueConstraint` is a standard public API to describe primary key which 
is exposed by Table API.
   2. `PostgresTablePath` is a postgres speicifc class which shouldn't be in 
the base class.  

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
                return baseUrl;
        }
 
+       // ------ Postgres default objects that shouldn't be exposed to users 
------
+
+       /**
+        * Retrieve the list of system schemas to ignore.
+        */
+       public abstract Set<String> getBuiltinSchemas();
+
+       /**
+        * Retrieve the list of system database to ignore.
+        */
+       public abstract Set<String> getBuiltinDatabases();

Review comment:
       I think we don't need to move this interfaces into base classes and 
`JdbcCatalog`, because they are not needed in `getPrimaryKey`.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/Constraint.java
##########
@@ -61,6 +61,6 @@
         */
        enum ConstraintType {
                PRIMARY_KEY,
-               UNIQUE_KEY
+               UNIQUE

Review comment:
       Could you revert this? We can discuss this and change it when we support 
UNIQUE.. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
                return baseUrl;
        }
 
+       // ------ Postgres default objects that shouldn't be exposed to users 
------
+
+       /**
+        * Retrieve the list of system schemas to ignore.
+        */
+       public abstract Set<String> getBuiltinSchemas();
+
+       /**
+        * Retrieve the list of system database to ignore.
+        */
+       public abstract Set<String> getBuiltinDatabases();
+
+       // ------ retrieve PK constraint ------
+
+       protected Map.Entry<String, List<String>> 
getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws 
SQLException {
+               ResultSet rs = metaData.getPrimaryKeys(null,  
pgPath.getPgSchemaName(), pgPath.getPgTableName());
+               Map.Entry<String, List<String>> ret = null;
+               while (rs.next()) {
+                       String schema = rs.getString("table_schem");
+                       String columnName = rs.getString("column_name");
+                       String pkName = rs.getString("pk_name");

Review comment:
       use upper case ?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
                return baseUrl;
        }
 
+       // ------ Postgres default objects that shouldn't be exposed to users 
------
+
+       /**
+        * Retrieve the list of system schemas to ignore.
+        */
+       public abstract Set<String> getBuiltinSchemas();
+
+       /**
+        * Retrieve the list of system database to ignore.
+        */
+       public abstract Set<String> getBuiltinDatabases();
+
+       // ------ retrieve PK constraint ------
+
+       protected Map.Entry<String, List<String>> 
getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws 
SQLException {
+               ResultSet rs = metaData.getPrimaryKeys(null,  
pgPath.getPgSchemaName(), pgPath.getPgTableName());
+               Map.Entry<String, List<String>> ret = null;
+               while (rs.next()) {
+                       String schema = rs.getString("table_schem");
+                       String columnName = rs.getString("column_name");
+                       String pkName = rs.getString("pk_name");
+                       if (!getBuiltinSchemas().contains(schema)) {
+                               if (ret == null) {
+                                       ret = new 
AbstractMap.SimpleEntry<>(pkName, new ArrayList<>());
+                               }
+                               ret.getValue().add(columnName);

Review comment:
       According to the Javadoc of `java.sql.DatabaseMetaData#getPrimaryKeys`, 
the returned primary key columns are ordered by COLUMN_NAME, not KEY_SEQ. We 
may need to sort them again based on the KEY_SEQ. I know that posgres return 
values ordered by the KEY_SEQ, but not all the dialects do like this.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -116,6 +122,36 @@ public String getBaseUrl() {
                return baseUrl;
        }
 
+       // ------ Postgres default objects that shouldn't be exposed to users 
------
+
+       /**
+        * Retrieve the list of system schemas to ignore.
+        */
+       public abstract Set<String> getBuiltinSchemas();
+
+       /**
+        * Retrieve the list of system database to ignore.
+        */
+       public abstract Set<String> getBuiltinDatabases();
+
+       // ------ retrieve PK constraint ------
+
+       protected Map.Entry<String, List<String>> 
getPrimaryKey(DatabaseMetaData metaData, PostgresTablePath pgPath) throws 
SQLException {
+               ResultSet rs = metaData.getPrimaryKeys(null,  
pgPath.getPgSchemaName(), pgPath.getPgTableName());
+               Map.Entry<String, List<String>> ret = null;
+               while (rs.next()) {
+                       String schema = rs.getString("table_schem");
+                       String columnName = rs.getString("column_name");
+                       String pkName = rs.getString("pk_name");
+                       if (!getBuiltinSchemas().contains(schema)) {

Review comment:
       Don't need to call `getBuiltinSchemas().contains(..)`, because the 
`schema` is must not be in builtin shemas, it must be the schema of users's 
table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to