This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.2 by this push: new 908949bc7 [FLINK-36326][source-connector/mysql] Fix auto scan newly-added table failure 908949bc7 is described below commit 908949bc7220693f1edb358f13b009e6d2a016db Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Mon Nov 18 19:13:12 2024 +0800 [FLINK-36326][source-connector/mysql] Fix auto scan newly-added table failure This closes #3661. Co-authored-by: Hang Ruan <ruanhang1...@hotmail.com> --- .../source/enumerator/MySqlSourceEnumerator.java | 6 +- .../mysql/source/NewlyAddedTableITCase.java | 147 +++++++++++++++++++++ 2 files changed, 152 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java index 3ab2ab509..3b603d202 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java @@ -81,6 +81,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin @Nullable private Integer binlogSplitTaskId; + private boolean isBinlogSplitUpdateRequestAlreadySent = false; + public MySqlSourceEnumerator( SplitEnumeratorContext<MySqlSplit> context, MySqlSourceConfig sourceConfig, @@ -273,7 +275,9 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin } private void requestBinlogSplitUpdateIfNeed() { - if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) { + if (!isBinlogSplitUpdateRequestAlreadySent + && isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) { + isBinlogSplitUpdateRequestAlreadySent = true; for (int subtaskId : getRegisteredReader()) { LOG.info( "The enumerator requests subtask {} to update the binlog split after newly added table.", diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java index ac23918d2..d1eba52ef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java @@ -78,6 +78,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static java.lang.String.format; +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart; import static org.apache.flink.util.Preconditions.checkState; /** IT tests to cover various newly added tables during capture process. */ @@ -511,6 +512,12 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { temporaryFolder.delete(); } + @Test + public void testNewlyAddedEmptyTableAndInsertAfterJobStart() throws Exception { + testNewlyAddedTableOneByOneWithCreateBeforeStart( + 1, new HashMap<>(), "address_hangzhou", "address_beijing"); + } + /** Add a collect sink in the job. */ protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) { TypeSerializer<RowData> serializer = @@ -1108,4 +1115,144 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { } } } + + private void testNewlyAddedTableOneByOneWithCreateBeforeStart( + int parallelism, Map<String, String> sourceOptions, String... captureAddressTables) + throws Exception { + final TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final String savepointDirectory = temporaryFolder.newFolder().toURI().toString(); + String finishedSavePointPath = null; + List<String> fetchedDataList = new ArrayList<>(); + for (int round = 0; round < captureAddressTables.length; round++) { + boolean insertData = round == 0; + initialAddressTables(getConnection(), captureAddressTables, round, insertData); + String[] captureTablesThisRound = + Arrays.asList(captureAddressTables) + .subList(0, round + 1) + .toArray(new String[0]); + String newlyAddedTable = captureAddressTables[round]; + StreamExecutionEnvironment env = + getStreamExecutionEnvironment(finishedSavePointPath, parallelism); + env.setRestartStrategy(noRestart()); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + String createTableStatement = + getCreateTableStatement(sourceOptions, captureTablesThisRound); + tEnv.executeSql(createTableStatement); + tEnv.executeSql( + "CREATE TABLE sink (" + + " table_name STRING," + + " id BIGINT," + + " country STRING," + + " city STRING," + + " detail_address STRING," + + " primary key (city, id) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + TableResult tableResult = tEnv.executeSql("insert into sink select * from address"); + JobClient jobClient = tableResult.getJobClient().get(); + Thread.sleep(3_000); + String tableName = captureAddressTables[round]; + if (!insertData) { + insertData( + getConnection(), + customDatabase.getDatabaseName() + "." + tableName, + tableName.split("_")[1]); + } + // step 2: assert fetched snapshot data in this round + String cityName = newlyAddedTable.split("_")[1]; + List<String> expectedSnapshotDataThisRound = + Arrays.asList( + format( + "+I[%s, 416874195632735147, China, %s, %s West Town address 1]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 416927583791428523, China, %s, %s West Town address 2]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", + newlyAddedTable, cityName, cityName)); + fetchedDataList.addAll(expectedSnapshotDataThisRound); + waitForUpsertSinkSize("sink", fetchedDataList.size()); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + // step 3: make some binlog data for this round + makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable); + makeSecondPartBinlogForAddressTable(getConnection(), newlyAddedTable); + // step 4: assert fetched binlog data in this round + // retract the old data with id 416874195632735147 + fetchedDataList = + fetchedDataList.stream() + .filter( + r -> + !r.contains( + format( + "%s, 416874195632735147", + newlyAddedTable))) + .collect(Collectors.toList()); + List<String> expectedBinlogUpsertDataThisRound = + Arrays.asList( + // add the new data with id 416874195632735147 + format( + "+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614380, China, %s, %s West Town address 4]", + newlyAddedTable, cityName, cityName)); + // step 5: assert fetched binlog data in this round + fetchedDataList.addAll(expectedBinlogUpsertDataThisRound); + waitForUpsertSinkSize("sink", fetchedDataList.size()); + // the result size of sink may arrive fetchedDataList.size() with old data, wait one + // checkpoint to wait retract old record and send new record + Thread.sleep(1000); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + // step 6: trigger savepoint + if (round != captureAddressTables.length - 1) { + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + } + jobClient.cancel().get(); + } + } + + private void initialAddressTables( + JdbcConnection connection, String[] addressTables, int round, boolean insertData) + throws SQLException { + try { + connection.setAutoCommit(false); + String tableName = addressTables[round]; + String tableId = customDatabase.getDatabaseName() + "." + tableName; + String cityName = tableName.split("_")[1]; + connection.execute( + "CREATE TABLE " + + tableId + + "(" + + " id BIGINT UNSIGNED NOT NULL PRIMARY KEY," + + " country VARCHAR(255) NOT NULL," + + " city VARCHAR(255) NOT NULL," + + " detail_address VARCHAR(1024)" + + ");"); + if (insertData) { + insertData(connection, tableId, cityName); + } + connection.commit(); + } finally { + connection.close(); + } + } + + private void insertData(JdbcConnection connection, String tableId, String cityName) + throws SQLException { + try { + connection.execute( + format( + "INSERT INTO %s " + + "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1')," + + " (416927583791428523, 'China', '%s', '%s West Town address 2')," + + " (417022095255614379, 'China', '%s', '%s West Town address 3');", + tableId, cityName, cityName, cityName, cityName, cityName, cityName)); + } finally { + connection.close(); + } + } }