[ https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840201#comment-17840201 ]
Josh Mahonin commented on FLINK-34869: -------------------------------------- We seem to be hitting this issue as well. Is this issue possibly fixed by another commit, or is it still outstanding [~loserwang1024] ? If the issue is still outstanding, with some guidance we could attempt a patch. Thanks. > [Bug][mysql] Remove all previous table and add new added table will throw > Exception. > ------------------------------------------------------------------------------------ > > Key: FLINK-34869 > URL: https://issues.apache.org/jira/browse/FLINK-34869 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Reporter: Flink CDC Issue Import > Priority: Major > Labels: github-import > > ### Search before asking > - [X] I searched in the > [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found > nothing similar. > ### Flink version > 1.18 > ### Flink CDC version > 3.0.1 > ### Database and its version > anyone > ### Minimal reproduce step > 1. Stop job in savepoint. > 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList > with tables which not includes in last time. > 3. Then assign status will be chaos. > Take a test case for example: > ```java > public class NewlyAddedTableITCase extends MySqlSourceTestBase { > @Test > public void testRemoveAndAddTablesOneByOne() throws Exception { > testRemoveAndAddTablesOneByOne( > 1, "address_hangzhou", "address_beijing", "address_shanghai"); > } > private void testRemoveAndAddTablesOneByOne(int parallelism, String... > captureAddressTables) > throws Exception { > MySqlConnection connection = getConnection(); > // step 1: create mysql tables with all tables included > initialAddressTables(connection, captureAddressTables); > final TemporaryFolder temporaryFolder = new TemporaryFolder(); > temporaryFolder.create(); > final String savepointDirectory = > temporaryFolder.newFolder().toURI().toString(); > // get all expected data > List<String> fetchedDataList = new ArrayList<>(); > String finishedSavePointPath = null; > // test removing and adding table one by one > for (int round = 0; round < captureAddressTables.length; round++] { > String captureTableThisRound = captureAddressTables[round]; > String cityName = captureTableThisRound.split("_")[1]; > StreamExecutionEnvironment env = > getStreamExecutionEnvironment(finishedSavePointPath, > parallelism); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > String createTableStatement = > getCreateTableStatement(new HashMap<>(), > captureTableThisRound); > tEnv.executeSql(createTableStatement); > tEnv.executeSql( > "CREATE TABLE sink (" > + " table_name STRING," > + " id BIGINT," > + " country STRING," > + " city STRING," > + " detail_address STRING," > + " primary key (table_name,id) not enforced" > + ") WITH (" > + " 'connector' = 'values'," > + " 'sink-insert-only' = 'false'" > + ")"); > TableResult tableResult = tEnv.executeSql("insert into sink > select * from address"); > JobClient jobClient = tableResult.getJobClient().get(); > // this round's snapshot data > fetchedDataList.addAll( > Arrays.asList( > format( > "+I[%s, 416874195632735147, China, %s, %s > West Town address 1]", > captureTableThisRound, cityName, > cityName), > format( > "+I[%s, 416927583791428523, China, %s, %s > West Town address 2]", > captureTableThisRound, cityName, > cityName), > format( > "+I[%s, 417022095255614379, China, %s, %s > West Town address 3]", > captureTableThisRound, cityName, > cityName))); > waitForSinkSize("sink", fetchedDataList.size()); > assertEqualsInAnyOrder(fetchedDataList, > TestValuesTableFactory.getRawResults("sink")); > // only this round table's data is captured. > // step 3: make binlog data for all tables before this round(also > includes this round) > for (int i = 0; i <= round; i++) { > String tableName = captureAddressTables[i]; > makeBinlogForAddressTable(connection, tableName, round); > } > // this round's binlog data > fetchedDataList.addAll( > Arrays.asList( > format( > "-U[%s, 416874195632735147, China, %s, %s > West Town address 1]", > captureTableThisRound, cityName, > cityName), > format( > "+U[%s, 416874195632735147, CHINA_%s, %s, > %s West Town address 1]", > captureTableThisRound, round, cityName, > cityName), > format( > "+I[%s, %d, China, %s, %s West Town > address 4]", > captureTableThisRound, > 417022095255614380L + round, > cityName, > cityName))); > // step 4: assert fetched binlog data in this round > waitForSinkSize("sink", fetchedDataList.size()); > assertEqualsInAnyOrder(fetchedDataList, > TestValuesTableFactory.getRawResults("sink")); > // step 5: trigger savepoint > finishedSavePointPath = triggerSavepointWithRetry(jobClient, > savepointDirectory); > jobClient.cancel().get(); > } > } > // setting primary key as id rather than <id, city> is more more realistic. > private String getCreateTableStatement( > Map<String, String> otherOptions, String... captureTableNames) { > return format( > "CREATE TABLE address (" > + " table_name STRING METADATA VIRTUAL," > + " id BIGINT NOT NULL," > + " country STRING," > + " city STRING," > + " detail_address STRING," > + " primary key (id) not enforced" > + ") WITH (" > + " 'connector' = 'mysql-cdc'," > + " 'scan.incremental.snapshot.enabled' = 'true'," > + " 'hostname' = '%s'," > + " 'port' = '%s'," > + " 'username' = '%s'," > + " 'password' = '%s'," > + " 'database-name' = '%s'," > + " 'table-name' = '%s'," > + " 'scan.incremental.snapshot.chunk.size' = '2'," > + " 'server-time-zone' = 'UTC'," > + " 'server-id' = '%s'," > + " 'scan.newly-added-table.enabled' = 'true'" > + " %s" > + ")", > MYSQL_CONTAINER.getHost(), > MYSQL_CONTAINER.getDatabasePort(), > customDatabase.getUsername(), > customDatabase.getPassword(), > customDatabase.getDatabaseName(), > getTableNameRegex(captureTableNames), > getServerId(), > otherOptions.isEmpty() > ? "" > : "," > + otherOptions.entrySet().stream() > .map( > e -> > String.format( > "'%s'='%s'", > e.getKey(), e.getValue())) > .collect(Collectors.joining(","))); > } > } > ``` > ### What did you expect to see? > return true > ### What did you see instead? > An exception will occurs: > ```java > org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to > offer finished split information, this should not be called > at > com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:379) > ~[classes/:?] > at > com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139) > ~[classes/:?] > at > com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleLatestFinishedSplitNumberRequest(MySqlSourceEnumerator.java:324) > ~[classes/:?] > at > com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:170) > ~[classes/:?] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleSourceEvent(SourceCoordinator.java:590) > ~[flink-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:297) > ~[flink-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469) > ~[flink-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > [flink-core-1.18.0.jar:1.18.0] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_362] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [?:1.8.0_362] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_362] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_362] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_362] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_362] > at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362] > ``` > ### Reason > When restarted with newly added table, > MySqlBinlogSplit#filterOutdatedSplitInfos will filter previous table' > FinishedSnapshotSplitInfo. In this case, list of FinishedSnapshotSplitInfo > will be empty. > Then when add binlog split back to split reader, > com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#configureFilter > cBinlogSplitReader#shouldEmit will seen an empty list of > FinishedSnapshotSplitInfo as binlog-only job > ```java > // specific offset mode > if (finishedSplitInfos.isEmpty()) { > for (TableId tableId : > currentBinlogSplit.getTableSchemas().keySet()) { > tableIdBinlogPositionMap.put(tableId, > currentBinlogSplit.getStartingOffset()); > } > } > // initial mode > else { > for (FinishedSnapshotSplitInfo finishedSplitInfo : > finishedSplitInfos) { > TableId tableId = finishedSplitInfo.getTableId(); > List<FinishedSnapshotSplitInfo> list = > splitsInfoMap.getOrDefault(tableId, new > ArrayList<>()); > list.add(finishedSplitInfo); > splitsInfoMap.put(tableId, list); > BinlogOffset highWatermark = > finishedSplitInfo.getHighWatermark(); > BinlogOffset maxHighWatermark = > tableIdBinlogPositionMap.get(tableId); > if (maxHighWatermark == null || > highWatermark.isAfter(maxHighWatermark)) { > tableIdBinlogPositionMap.put(tableId, highWatermark); > } > } > } > ``` > ### Are you willing to submit a PR? > - [x] I'm willing to submit a PR! > ---------------- Imported from GitHub ---------------- > Url: https://github.com/apache/flink-cdc/issues/3051 > Created by: [loserwang1024|https://github.com/loserwang1024] > Labels: bug, > Created at: Wed Jan 31 17:12:21 CST 2024 > State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)