leonardBang commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894249878


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws 
SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String 
table)

Review Comment:
   The `catalog` meaning in Database is different with `catalog` meaning in 
Flink SQL, we'd better make it more clear, we can refer other popular projects 
as well.



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java:
##########
@@ -23,30 +23,34 @@
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Test base for {@link MySqlCatalog}. */
 public class MySqlCatalogTestBase {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(MySqlCatalogTestBase.class);
 
-    protected static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
+    protected static final List<String> DOCKER_IMAGE_NAMES =
+            Arrays.asList("mysql:5.7.34", "mysql:8.0.16");

Review Comment:
   Please also add a test for 5.6.x version ?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws 
SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String 
table)
+            throws SQLException {
 
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(catalog, schema, table);
 
         Map<Integer, String> keySeqColumnName = new HashMap<>();
         String pkName = null;
         while (rs.next()) {
             String columnName = rs.getString("COLUMN_NAME");
             pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the 
same
             int keySeq = rs.getInt("KEY_SEQ");
+            Preconditions.checkState(
+                    !keySeqColumnName.containsKey(keySeq - 1),
+                    "The PK constraint should be unique.");

Review Comment:
   Could we improve the exception description? It looks like we meet duplicated 
records with same primary key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to