This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.2 by this push:
     new 908949bc7 [FLINK-36326][source-connector/mysql] Fix auto scan 
newly-added table failure
908949bc7 is described below

commit 908949bc7220693f1edb358f13b009e6d2a016db
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Mon Nov 18 19:13:12 2024 +0800

    [FLINK-36326][source-connector/mysql] Fix auto scan newly-added table 
failure
    
     This closes #3661.
    
    Co-authored-by: Hang Ruan <ruanhang1...@hotmail.com>
---
 .../source/enumerator/MySqlSourceEnumerator.java   |   6 +-
 .../mysql/source/NewlyAddedTableITCase.java        | 147 +++++++++++++++++++++
 2 files changed, 152 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
index 3ab2ab509..3b603d202 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
@@ -81,6 +81,8 @@ public class MySqlSourceEnumerator implements 
SplitEnumerator<MySqlSplit, Pendin
 
     @Nullable private Integer binlogSplitTaskId;
 
+    private boolean isBinlogSplitUpdateRequestAlreadySent = false;
+
     public MySqlSourceEnumerator(
             SplitEnumeratorContext<MySqlSplit> context,
             MySqlSourceConfig sourceConfig,
@@ -273,7 +275,9 @@ public class MySqlSourceEnumerator implements 
SplitEnumerator<MySqlSplit, Pendin
     }
 
     private void requestBinlogSplitUpdateIfNeed() {
-        if 
(isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
+        if (!isBinlogSplitUpdateRequestAlreadySent
+                && 
isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
+            isBinlogSplitUpdateRequestAlreadySent = true;
             for (int subtaskId : getRegisteredReader()) {
                 LOG.info(
                         "The enumerator requests subtask {} to update the 
binlog split after newly added table.",
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
index ac23918d2..d1eba52ef 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
@@ -78,6 +78,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** IT tests to cover various newly added tables during capture process. */
@@ -511,6 +512,12 @@ public class NewlyAddedTableITCase extends 
MySqlSourceTestBase {
         temporaryFolder.delete();
     }
 
+    @Test
+    public void testNewlyAddedEmptyTableAndInsertAfterJobStart() throws 
Exception {
+        testNewlyAddedTableOneByOneWithCreateBeforeStart(
+                1, new HashMap<>(), "address_hangzhou", "address_beijing");
+    }
+
     /** Add a collect sink in the job. */
     protected CollectResultIterator<RowData> 
addCollectSink(DataStream<RowData> stream) {
         TypeSerializer<RowData> serializer =
@@ -1108,4 +1115,144 @@ public class NewlyAddedTableITCase extends 
MySqlSourceTestBase {
             }
         }
     }
+
+    private void testNewlyAddedTableOneByOneWithCreateBeforeStart(
+            int parallelism, Map<String, String> sourceOptions, String... 
captureAddressTables)
+            throws Exception {
+        final TemporaryFolder temporaryFolder = new TemporaryFolder();
+        temporaryFolder.create();
+        final String savepointDirectory = 
temporaryFolder.newFolder().toURI().toString();
+        String finishedSavePointPath = null;
+        List<String> fetchedDataList = new ArrayList<>();
+        for (int round = 0; round < captureAddressTables.length; round++) {
+            boolean insertData = round == 0;
+            initialAddressTables(getConnection(), captureAddressTables, round, 
insertData);
+            String[] captureTablesThisRound =
+                    Arrays.asList(captureAddressTables)
+                            .subList(0, round + 1)
+                            .toArray(new String[0]);
+            String newlyAddedTable = captureAddressTables[round];
+            StreamExecutionEnvironment env =
+                    getStreamExecutionEnvironment(finishedSavePointPath, 
parallelism);
+            env.setRestartStrategy(noRestart());
+            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+            String createTableStatement =
+                    getCreateTableStatement(sourceOptions, 
captureTablesThisRound);
+            tEnv.executeSql(createTableStatement);
+            tEnv.executeSql(
+                    "CREATE TABLE sink ("
+                            + " table_name STRING,"
+                            + " id BIGINT,"
+                            + " country STRING,"
+                            + " city STRING,"
+                            + " detail_address STRING,"
+                            + " primary key (city, id) not enforced"
+                            + ") WITH ("
+                            + " 'connector' = 'values',"
+                            + " 'sink-insert-only' = 'false'"
+                            + ")");
+            TableResult tableResult = tEnv.executeSql("insert into sink select 
* from address");
+            JobClient jobClient = tableResult.getJobClient().get();
+            Thread.sleep(3_000);
+            String tableName = captureAddressTables[round];
+            if (!insertData) {
+                insertData(
+                        getConnection(),
+                        customDatabase.getDatabaseName() + "." + tableName,
+                        tableName.split("_")[1]);
+            }
+            // step 2: assert fetched snapshot data in this round
+            String cityName = newlyAddedTable.split("_")[1];
+            List<String> expectedSnapshotDataThisRound =
+                    Arrays.asList(
+                            format(
+                                    "+I[%s, 416874195632735147, China, %s, %s 
West Town address 1]",
+                                    newlyAddedTable, cityName, cityName),
+                            format(
+                                    "+I[%s, 416927583791428523, China, %s, %s 
West Town address 2]",
+                                    newlyAddedTable, cityName, cityName),
+                            format(
+                                    "+I[%s, 417022095255614379, China, %s, %s 
West Town address 3]",
+                                    newlyAddedTable, cityName, cityName));
+            fetchedDataList.addAll(expectedSnapshotDataThisRound);
+            waitForUpsertSinkSize("sink", fetchedDataList.size());
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getResults("sink"));
+            // step 3: make some binlog data for this round
+            makeFirstPartBinlogForAddressTable(getConnection(), 
newlyAddedTable);
+            makeSecondPartBinlogForAddressTable(getConnection(), 
newlyAddedTable);
+            // step 4: assert fetched binlog data in this round
+            // retract the old data with id 416874195632735147
+            fetchedDataList =
+                    fetchedDataList.stream()
+                            .filter(
+                                    r ->
+                                            !r.contains(
+                                                    format(
+                                                            "%s, 
416874195632735147",
+                                                            newlyAddedTable)))
+                            .collect(Collectors.toList());
+            List<String> expectedBinlogUpsertDataThisRound =
+                    Arrays.asList(
+                            // add the new data with id 416874195632735147
+                            format(
+                                    "+I[%s, 416874195632735147, CHINA, %s, %s 
West Town address 1]",
+                                    newlyAddedTable, cityName, cityName),
+                            format(
+                                    "+I[%s, 417022095255614380, China, %s, %s 
West Town address 4]",
+                                    newlyAddedTable, cityName, cityName));
+            // step 5: assert fetched binlog data in this round
+            fetchedDataList.addAll(expectedBinlogUpsertDataThisRound);
+            waitForUpsertSinkSize("sink", fetchedDataList.size());
+            // the result size of sink may arrive fetchedDataList.size() with 
old data, wait one
+            // checkpoint to wait retract old record and send new record
+            Thread.sleep(1000);
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getResults("sink"));
+            // step 6: trigger savepoint
+            if (round != captureAddressTables.length - 1) {
+                finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
+            }
+            jobClient.cancel().get();
+        }
+    }
+
+    private void initialAddressTables(
+            JdbcConnection connection, String[] addressTables, int round, 
boolean insertData)
+            throws SQLException {
+        try {
+            connection.setAutoCommit(false);
+            String tableName = addressTables[round];
+            String tableId = customDatabase.getDatabaseName() + "." + 
tableName;
+            String cityName = tableName.split("_")[1];
+            connection.execute(
+                    "CREATE TABLE "
+                            + tableId
+                            + "("
+                            + "  id BIGINT UNSIGNED NOT NULL PRIMARY KEY,"
+                            + "  country VARCHAR(255) NOT NULL,"
+                            + "  city VARCHAR(255) NOT NULL,"
+                            + "  detail_address VARCHAR(1024)"
+                            + ");");
+            if (insertData) {
+                insertData(connection, tableId, cityName);
+            }
+            connection.commit();
+        } finally {
+            connection.close();
+        }
+    }
+
+    private void insertData(JdbcConnection connection, String tableId, String 
cityName)
+            throws SQLException {
+        try {
+            connection.execute(
+                    format(
+                            "INSERT INTO  %s "
+                                    + "VALUES (416874195632735147, 'China', 
'%s', '%s West Town address 1'),"
+                                    + "       (416927583791428523, 'China', 
'%s', '%s West Town address 2'),"
+                                    + "       (417022095255614379, 'China', 
'%s', '%s West Town address 3');",
+                            tableId, cityName, cityName, cityName, cityName, 
cityName, cityName));
+        } finally {
+            connection.close();
+        }
+    }
 }

Reply via email to