chl-wxp commented on code in PR #10236:
URL: https://github.com/apache/seatunnel/pull/10236#discussion_r2647455158


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java:
##########
@@ -247,4 +257,121 @@ public List<ConstraintKey> getUniqueKeys(JdbcConnection 
jdbcConnection, TableId
             return new ArrayList<ConstraintKey>();
         }
     }
+
+    private class TestSourceDialectWithUniqueKey implements 
JdbcDataSourceDialect {

Review Comment:
   I suggest adding more boundary condition test cases. In order to reduce test 
code redundancy, you can consider introducing an abstract class
   `BaseJdbcDataSourceDialectTest` implements `JdbcDataSourceDialect`,
   This only requires overriding `getPrimaryKey()` and `getUniqueKeys()` in 
each test dialog.



##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java:
##########
@@ -247,4 +257,121 @@ public List<ConstraintKey> getUniqueKeys(JdbcConnection 
jdbcConnection, TableId
             return new ArrayList<ConstraintKey>();
         }
     }
+
+    private class TestSourceDialectWithUniqueKey implements 
JdbcDataSourceDialect {
+
+        @Override
+        public String getName() {
+            return null;
+        }
+
+        @Override
+        public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig 
sourceConfig) {
+            return false;
+        }
+
+        @Override
+        public ChunkSplitter createChunkSplitter(JdbcSourceConfig 
sourceConfig) {
+            return null;
+        }
+
+        @Override
+        public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
+            return null;
+        }
+
+        @Override
+        public JdbcConnection openJdbcConnection(JdbcSourceConfig 
sourceConfig) {
+            return null;
+        }
+
+        @Override
+        public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+            return null;
+        }
+
+        @Override
+        public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
+
+            Table table =
+                    Table.editor()
+                            .tableId(tableId)
+                            .addColumns(
+                                    Column.editor()
+                                            .name("string_col")
+                                            .jdbcType(Types.VARCHAR)
+                                            .type("varchar")
+                                            .create(),
+                                    Column.editor()
+                                            .name("smallint")
+                                            .jdbcType(Types.SMALLINT)
+                                            .type("smallint")
+                                            .create(),
+                                    Column.editor()
+                                            .name("int")
+                                            .jdbcType(Types.INTEGER)
+                                            .type("int")
+                                            .create(),
+                                    Column.editor()
+                                            .name("decimal")
+                                            .jdbcType(Types.DECIMAL)
+                                            .type("decimal")
+                                            .create(),
+                                    Column.editor()
+                                            .name("tinyint_col")
+                                            .jdbcType(Types.TINYINT)
+                                            .type("tinyint")
+                                            .create(),
+                                    Column.editor()
+                                            .name("bigint_col")
+                                            .jdbcType(Types.BIGINT)
+                                            .type("bigint")
+                                            .create())
+                            .create();
+            return new 
TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
+        }
+
+        @Override
+        public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase 
sourceSplitBase) {
+            return null;
+        }
+
+        @Override
+        public JdbcSourceFetchTaskContext createFetchTaskContext(
+                SourceSplitBase sourceSplitBase, JdbcSourceConfig 
taskSourceConfig) {
+            return null;
+        }
+
+        @Override
+        public Optional<PrimaryKey> getPrimaryKey(JdbcConnection 
jdbcConnection, TableId tableId)
+                throws SQLException {
+            return Optional.of(PrimaryKey.of("pkName", 
Arrays.asList("bigint_col")));
+        }
+
+        @Override
+        public List<ConstraintKey> getUniqueKeys(JdbcConnection 
jdbcConnection, TableId tableId)
+                throws SQLException {
+            List<ConstraintKey> keys = new ArrayList<>();
+
+            keys.add(
+                    ConstraintKey.of(
+                            ConstraintKey.ConstraintType.UNIQUE_KEY,
+                            "uk_1",
+                            Arrays.asList(
+                                    ConstraintKey.ConstraintKeyColumn.of(
+                                            "string_col", 
ConstraintKey.ColumnSortType.ASC))));
+
+            keys.add(
+                    ConstraintKey.of(
+                            ConstraintKey.ConstraintType.UNIQUE_KEY,
+                            "uk_2",
+                            Arrays.asList(
+                                    ConstraintKey.ConstraintKeyColumn.of(
+                                            "int", 
ConstraintKey.ColumnSortType.ASC),
+                                    ConstraintKey.ConstraintKeyColumn.of(
+                                            "smallint", 
ConstraintKey.ColumnSortType.ASC))));
+

Review Comment:
   The tests should first cover the exact case from the issue.
   In #9949, the table definition is essentially:
       primary key: (addr_id BIGINT)
       unique key: (address VARCHAR, coin_type INT/SMALLINT)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to