fpompermaier commented on a change in pull request #11906: URL: https://github.com/apache/flink/pull/11906#discussion_r427826635
########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java ########## @@ -126,31 +124,33 @@ public String getBaseUrl() { // ------ retrieve PK constraint ------ - protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException { + protected Optional<UniqueConstraint> getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException { // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys, // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ. // We need to sort them based on the KEY_SEQ value. ResultSet rs = metaData.getPrimaryKeys(null, schema, table); - List<Map.Entry<Integer, String>> columnsWithIndex = null; + Map<Integer, String> keySeqColumnName = new HashMap<>(); String pkName = null; - while (rs.next()) { + while (rs.next()) { String columnName = rs.getString("COLUMN_NAME"); - pkName = rs.getString("PK_NAME"); + pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same int keySeq = rs.getInt("KEY_SEQ"); - if (columnsWithIndex == null) { - columnsWithIndex = new ArrayList<>(); - } - columnsWithIndex.add(new AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName)); + keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index } - if (columnsWithIndex != null) { - // sort columns by KEY_SEQ - columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey)); - List<String> cols = columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList()); - return UniqueConstraint.primaryKey(pkName, cols); + List<String> pkFields = Arrays.asList(new String[keySeqColumnName.size()]); // initialize size + keySeqColumnName.forEach(pkFields::set); + if (!pkFields.isEmpty()) { + if (pkName == null) { + // PK_NAME maybe null according to the javadoc, + // generate an unique name for the primary key + pkName = "pk_" + String.join("_", pkFields); + } + return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields)); + } else { Review comment: This else could be removed, we could keep just the return statement..or do the code style guidelines to proceed in this way? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org