Copilot commented on code in PR #4255:
URL: https://github.com/apache/flink-cdc/pull/4255#discussion_r2762591029


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java:
##########
@@ -968,6 +968,115 @@ void testUpsertMode(boolean parallelismSnapshot) throws 
Exception {
         result.getJobClient().get().cancel().get();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testArrayTypes(boolean parallelismSnapshot) throws Throwable {
+        setup(parallelismSnapshot);
+        initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE array_types ("
+                                + "    id INTEGER NOT NULL,"
+                                + "    text_a1 ARRAY<STRING>,"
+                                + "    int_a1 ARRAY<INT>,"
+                                + "    int_s1 ARRAY<INT>,"
+                                + "    uuid_a1 ARRAY<STRING>"
+                                + ") WITH ("
+                                + " 'connector' = 'postgres-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
+                                + " 'slot.name' = '%s'"
+                                + ")",
+                        POSTGIS_CONTAINER.getHost(),
+                        POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+                        POSTGIS_CONTAINER.getUsername(),
+                        POSTGIS_CONTAINER.getPassword(),
+                        POSTGIS_CONTAINER.getDatabaseName(),
+                        "inventory",
+                        "array_types",
+                        parallelismSnapshot,
+                        getSlotName());
+
+        tEnv.executeSql(sourceDDL);
+
+        String sinkDDL =
+                "CREATE TABLE array_sink ("
+                        + "    id INTEGER NOT NULL,"
+                        + "    text_a1 ARRAY<STRING>,"
+                        + "    int_a1 ARRAY<INT>,"
+                        + "    int_s1 ARRAY<INT>,"
+                        + "    uuid_a1 ARRAY<STRING>,"
+                        + "    PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ("
+                        + "  'connector' = 'values',"
+                        + "  'sink-insert-only' = 'false'"
+                        + ")";
+        tEnv.executeSql(sinkDDL);
+
+        // async submit job
+        TableResult tableResult =
+                tEnv.executeSql("INSERT INTO array_sink SELECT * FROM 
array_types");
+
+        // wait for snapshot to complete
+        waitForSinkSize("array_sink", 1);
+
+        // verify snapshot data
+        List<String> snapshotResults = 
TestValuesTableFactory.getRawResultsAsStrings("array_sink");
+        Assertions.assertThat(snapshotResults).hasSize(1);
+
+        // verify snapshot contains expected array data patterns (insert 
record)
+        String snapshotRow = snapshotResults.get(0);
+        Assertions.assertThat(snapshotRow).startsWith("+I(");
+        Assertions.assertThat(snapshotRow).contains("electronics");
+        Assertions.assertThat(snapshotRow).contains("gadget");
+        Assertions.assertThat(snapshotRow).contains("sale");
+        Assertions.assertThat(snapshotRow).contains("85");
+        Assertions.assertThat(snapshotRow).contains("90");
+        Assertions.assertThat(snapshotRow).contains("78");
+        Assertions.assertThat(snapshotRow).contains("42");

Review Comment:
   The test verifies all array types (text_a1, int_a1, int_s1) during both 
snapshot and incremental phases, but it doesn't explicitly verify that the UUID 
array (uuid_a1) values are correctly captured and deserialized. Consider adding 
explicit assertions for the UUID array values to ensure they are properly 
handled in both snapshot and WAL paths, similar to how other array elements are 
verified.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java:
##########
@@ -968,6 +968,115 @@ void testUpsertMode(boolean parallelismSnapshot) throws 
Exception {
         result.getJobClient().get().cancel().get();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testArrayTypes(boolean parallelismSnapshot) throws Throwable {
+        setup(parallelismSnapshot);
+        initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE array_types ("
+                                + "    id INTEGER NOT NULL,"
+                                + "    text_a1 ARRAY<STRING>,"
+                                + "    int_a1 ARRAY<INT>,"
+                                + "    int_s1 ARRAY<INT>,"
+                                + "    uuid_a1 ARRAY<STRING>"
+                                + ") WITH ("
+                                + " 'connector' = 'postgres-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
+                                + " 'slot.name' = '%s'"
+                                + ")",
+                        POSTGIS_CONTAINER.getHost(),
+                        POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+                        POSTGIS_CONTAINER.getUsername(),
+                        POSTGIS_CONTAINER.getPassword(),
+                        POSTGIS_CONTAINER.getDatabaseName(),
+                        "inventory",
+                        "array_types",
+                        parallelismSnapshot,
+                        getSlotName());
+
+        tEnv.executeSql(sourceDDL);
+
+        String sinkDDL =
+                "CREATE TABLE array_sink ("
+                        + "    id INTEGER NOT NULL,"
+                        + "    text_a1 ARRAY<STRING>,"
+                        + "    int_a1 ARRAY<INT>,"
+                        + "    int_s1 ARRAY<INT>,"
+                        + "    uuid_a1 ARRAY<STRING>,"
+                        + "    PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ("
+                        + "  'connector' = 'values',"
+                        + "  'sink-insert-only' = 'false'"
+                        + ")";
+        tEnv.executeSql(sinkDDL);
+
+        // async submit job
+        TableResult tableResult =
+                tEnv.executeSql("INSERT INTO array_sink SELECT * FROM 
array_types");
+
+        // wait for snapshot to complete
+        waitForSinkSize("array_sink", 1);
+
+        // verify snapshot data
+        List<String> snapshotResults = 
TestValuesTableFactory.getRawResultsAsStrings("array_sink");
+        Assertions.assertThat(snapshotResults).hasSize(1);
+
+        // verify snapshot contains expected array data patterns (insert 
record)
+        String snapshotRow = snapshotResults.get(0);
+        Assertions.assertThat(snapshotRow).startsWith("+I(");
+        Assertions.assertThat(snapshotRow).contains("electronics");
+        Assertions.assertThat(snapshotRow).contains("gadget");
+        Assertions.assertThat(snapshotRow).contains("sale");
+        Assertions.assertThat(snapshotRow).contains("85");
+        Assertions.assertThat(snapshotRow).contains("90");
+        Assertions.assertThat(snapshotRow).contains("78");
+        Assertions.assertThat(snapshotRow).contains("42");
+
+        // wait a bit to make sure the replication slot is ready
+        Thread.sleep(5000);
+
+        // Test incremental (WAL) path - UPDATE array data
+        try (Connection connection = getJdbcConnection(POSTGIS_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "UPDATE inventory.array_types SET text_a1=ARRAY['updated', 
'array'], "
+                            + "int_a1='{100, 200}' WHERE id=1;");

Review Comment:
   The UPDATE statement doesn't include uuid_a1 column, so the UUID array 
behavior during incremental updates is not tested. Consider updating the test 
to also modify the uuid_a1 column and verify that UUID array updates are 
properly captured through the WAL replication path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to