Flink CDC Issue Import created FLINK-34869: ----------------------------------------------
Summary: [Bug][mysql] Remove all previous table and add new added table will throw Exception. Key: FLINK-34869 URL: https://issues.apache.org/jira/browse/FLINK-34869 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Flink CDC Issue Import ### Search before asking - [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found nothing similar. ### Flink version 1.18 ### Flink CDC version 3.0.1 ### Database and its version anyone ### Minimal reproduce step 1. Stop job in savepoint. 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList with tables which not includes in last time. 3. Then assign status will be chaos. Take a test case for example: ```java public class NewlyAddedTableITCase extends MySqlSourceTestBase { @Test public void testRemoveAndAddTablesOneByOne() throws Exception { testRemoveAndAddTablesOneByOne( 1, "address_hangzhou", "address_beijing", "address_shanghai"); } private void testRemoveAndAddTablesOneByOne(int parallelism, String... captureAddressTables) throws Exception { MySqlConnection connection = getConnection(); // step 1: create mysql tables with all tables included initialAddressTables(connection, captureAddressTables); final TemporaryFolder temporaryFolder = new TemporaryFolder(); temporaryFolder.create(); final String savepointDirectory = temporaryFolder.newFolder().toURI().toString(); // get all expected data List<String> fetchedDataList = new ArrayList<>(); String finishedSavePointPath = null; // test removing and adding table one by one for (int round = 0; round < captureAddressTables.length; round++] { String captureTableThisRound = captureAddressTables[round]; String cityName = captureTableThisRound.split("_")[1]; StreamExecutionEnvironment env = getStreamExecutionEnvironment(finishedSavePointPath, parallelism); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String createTableStatement = getCreateTableStatement(new HashMap<>(), captureTableThisRound); tEnv.executeSql(createTableStatement); tEnv.executeSql( "CREATE TABLE sink (" + " table_name STRING," + " id BIGINT," + " country STRING," + " city STRING," + " detail_address STRING," + " primary key (table_name,id) not enforced" + ") WITH (" + " 'connector' = 'values'," + " 'sink-insert-only' = 'false'" + ")"); TableResult tableResult = tEnv.executeSql("insert into sink select * from address"); JobClient jobClient = tableResult.getJobClient().get(); // this round's snapshot data fetchedDataList.addAll( Arrays.asList( format( "+I[%s, 416874195632735147, China, %s, %s West Town address 1]", captureTableThisRound, cityName, cityName), format( "+I[%s, 416927583791428523, China, %s, %s West Town address 2]", captureTableThisRound, cityName, cityName), format( "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", captureTableThisRound, cityName, cityName))); waitForSinkSize("sink", fetchedDataList.size()); assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); // only this round table's data is captured. // step 3: make binlog data for all tables before this round(also includes this round) for (int i = 0; i <= round; i++) { String tableName = captureAddressTables[i]; makeBinlogForAddressTable(connection, tableName, round); } // this round's binlog data fetchedDataList.addAll( Arrays.asList( format( "-U[%s, 416874195632735147, China, %s, %s West Town address 1]", captureTableThisRound, cityName, cityName), format( "+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]", captureTableThisRound, round, cityName, cityName), format( "+I[%s, %d, China, %s, %s West Town address 4]", captureTableThisRound, 417022095255614380L + round, cityName, cityName))); // step 4: assert fetched binlog data in this round waitForSinkSize("sink", fetchedDataList.size()); assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); // step 5: trigger savepoint finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); } } // setting primary key as id rather than <id, city> is more more realistic. private String getCreateTableStatement( Map<String, String> otherOptions, String... captureTableNames) { return format( "CREATE TABLE address (" + " table_name STRING METADATA VIRTUAL," + " id BIGINT NOT NULL," + " country STRING," + " city STRING," + " detail_address STRING," + " primary key (id) not enforced" + ") WITH (" + " 'connector' = 'mysql-cdc'," + " 'scan.incremental.snapshot.enabled' = 'true'," + " 'hostname' = '%s'," + " 'port' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'database-name' = '%s'," + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.chunk.size' = '2'," + " 'server-time-zone' = 'UTC'," + " 'server-id' = '%s'," + " 'scan.newly-added-table.enabled' = 'true'" + " %s" + ")", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), customDatabase.getUsername(), customDatabase.getPassword(), customDatabase.getDatabaseName(), getTableNameRegex(captureTableNames), getServerId(), otherOptions.isEmpty() ? "" : "," + otherOptions.entrySet().stream() .map( e -> String.format( "'%s'='%s'", e.getKey(), e.getValue())) .collect(Collectors.joining(","))); } } ``` ### What did you expect to see? return true ### What did you see instead? An exception will occurs: ```java org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to offer finished split information, this should not be called at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:379) ~[classes/:?] at com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139) ~[classes/:?] at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleLatestFinishedSplitNumberRequest(MySqlSourceEnumerator.java:324) ~[classes/:?] at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:170) ~[classes/:?] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleSourceEvent(SourceCoordinator.java:590) ~[flink-runtime-1.18.0.jar:1.18.0] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:297) ~[flink-runtime-1.18.0.jar:1.18.0] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469) ~[flink-runtime-1.18.0.jar:1.18.0] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-core-1.18.0.jar:1.18.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_362] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_362] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_362] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_362] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_362] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_362] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362] ``` ### Reason When restarted with newly added table, MySqlBinlogSplit#filterOutdatedSplitInfos will filter previous table' FinishedSnapshotSplitInfo. In this case, list of FinishedSnapshotSplitInfo will be empty. Then when add binlog split back to split reader, com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#configureFilter cBinlogSplitReader#shouldEmit will seen an empty list of FinishedSnapshotSplitInfo as binlog-only job ```java // specific offset mode if (finishedSplitInfos.isEmpty()) { for (TableId tableId : currentBinlogSplit.getTableSchemas().keySet()) { tableIdBinlogPositionMap.put(tableId, currentBinlogSplit.getStartingOffset()); } } // initial mode else { for (FinishedSnapshotSplitInfo finishedSplitInfo : finishedSplitInfos) { TableId tableId = finishedSplitInfo.getTableId(); List<FinishedSnapshotSplitInfo> list = splitsInfoMap.getOrDefault(tableId, new ArrayList<>()); list.add(finishedSplitInfo); splitsInfoMap.put(tableId, list); BinlogOffset highWatermark = finishedSplitInfo.getHighWatermark(); BinlogOffset maxHighWatermark = tableIdBinlogPositionMap.get(tableId); if (maxHighWatermark == null || highWatermark.isAfter(maxHighWatermark)) { tableIdBinlogPositionMap.put(tableId, highWatermark); } } } ``` ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! ---------------- Imported from GitHub ---------------- Url: https://github.com/apache/flink-cdc/issues/3051 Created by: [loserwang1024|https://github.com/loserwang1024] Labels: bug, Created at: Wed Jan 31 17:12:21 CST 2024 State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)