Flink CDC Issue Import created FLINK-34869:
----------------------------------------------

             Summary: [Bug][mysql] Remove all previous table and add new added 
table will throw Exception.
                 Key: FLINK-34869
                 URL: https://issues.apache.org/jira/browse/FLINK-34869
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
nothing similar.


### Flink version

1.18

### Flink CDC version

3.0.1

### Database and its version

anyone 

### Minimal reproduce step

1. Stop job in savepoint.
2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList with 
tables which not includes in last time.
3. Then assign status will be chaos.
Take a test case for example:
```java
public class NewlyAddedTableITCase extends MySqlSourceTestBase {
    @Test
    public void testRemoveAndAddTablesOneByOne() throws Exception {
        testRemoveAndAddTablesOneByOne(
                1, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    private void testRemoveAndAddTablesOneByOne(int parallelism, String... 
captureAddressTables)
            throws Exception {

        MySqlConnection connection = getConnection();
        // step 1: create mysql tables with all tables included
        initialAddressTables(connection, captureAddressTables);

        final TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        final String savepointDirectory = 
temporaryFolder.newFolder().toURI().toString();

        // get all expected data
        List<String> fetchedDataList = new ArrayList<>();

        String finishedSavePointPath = null;
        // test removing and adding table one by one
        for (int round = 0; round < captureAddressTables.length; round++] {
            String captureTableThisRound = captureAddressTables[round];
            String cityName = captureTableThisRound.split("_")[1];
            StreamExecutionEnvironment env =
                    getStreamExecutionEnvironment(finishedSavePointPath, 
parallelism);
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

            String createTableStatement =
                    getCreateTableStatement(new HashMap<>(), 
captureTableThisRound);
            tEnv.executeSql(createTableStatement);
            tEnv.executeSql(
                    "CREATE TABLE sink ("
                            + " table_name STRING,"
                            + " id BIGINT,"
                            + " country STRING,"
                            + " city STRING,"
                            + " detail_address STRING,"
                            + " primary key (table_name,id) not enforced"
                            + ") WITH ("
                            + " 'connector' = 'values',"
                            + " 'sink-insert-only' = 'false'"
                            + ")");
            TableResult tableResult = tEnv.executeSql("insert into sink select 
* from address");
            JobClient jobClient = tableResult.getJobClient().get();

            // this round's snapshot data
            fetchedDataList.addAll(
                    Arrays.asList(
                            format(
                                    "+I[%s, 416874195632735147, China, %s, %s 
West Town address 1]",
                                    captureTableThisRound, cityName, cityName),
                            format(
                                    "+I[%s, 416927583791428523, China, %s, %s 
West Town address 2]",
                                    captureTableThisRound, cityName, cityName),
                            format(
                                    "+I[%s, 417022095255614379, China, %s, %s 
West Town address 3]",
                                    captureTableThisRound, cityName, 
cityName)));
            waitForSinkSize("sink", fetchedDataList.size());
            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));

            // only this round table's data is captured.
            // step 3: make binlog data for all tables before this round(also 
includes this round)
            for (int i = 0; i <= round; i++) {
                String tableName = captureAddressTables[i];
                makeBinlogForAddressTable(connection, tableName, round);
            }
            // this round's binlog data
            fetchedDataList.addAll(
                    Arrays.asList(
                            format(
                                    "-U[%s, 416874195632735147, China, %s, %s 
West Town address 1]",
                                    captureTableThisRound, cityName, cityName),
                            format(
                                    "+U[%s, 416874195632735147, CHINA_%s, %s, 
%s West Town address 1]",
                                    captureTableThisRound, round, cityName, 
cityName),
                            format(
                                    "+I[%s, %d, China, %s, %s West Town address 
4]",
                                    captureTableThisRound,
                                    417022095255614380L + round,
                                    cityName,
                                    cityName)));

            // step 4: assert fetched binlog data in this round
            waitForSinkSize("sink", fetchedDataList.size());

            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
            // step 5: trigger savepoint
            finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
            jobClient.cancel().get();
        }
    }

  // setting  primary key as id rather than <id, city> is more more realistic.
   private String getCreateTableStatement(
            Map<String, String> otherOptions, String... captureTableNames) {
        return format(
                "CREATE TABLE address ("
                        + " table_name STRING METADATA VIRTUAL,"
                        + " id BIGINT NOT NULL,"
                        + " country STRING,"
                        + " city STRING,"
                        + " detail_address STRING,"
                        + " primary key (id) not enforced"
                        + ") WITH ("
                        + " 'connector' = 'mysql-cdc',"
                        + " 'scan.incremental.snapshot.enabled' = 'true',"
                        + " 'hostname' = '%s',"
                        + " 'port' = '%s',"
                        + " 'username' = '%s',"
                        + " 'password' = '%s',"
                        + " 'database-name' = '%s',"
                        + " 'table-name' = '%s',"
                        + " 'scan.incremental.snapshot.chunk.size' = '2',"
                        + " 'server-time-zone' = 'UTC',"
                        + " 'server-id' = '%s',"
                        + " 'scan.newly-added-table.enabled' = 'true'"
                        + " %s"
                        + ")",
                MYSQL_CONTAINER.getHost(),
                MYSQL_CONTAINER.getDatabasePort(),
                customDatabase.getUsername(),
                customDatabase.getPassword(),
                customDatabase.getDatabaseName(),
                getTableNameRegex(captureTableNames),
                getServerId(),
                otherOptions.isEmpty()
                        ? ""
                        : ","
                        + otherOptions.entrySet().stream()
                        .map(
                                e ->
                                        String.format(
                                                "'%s'='%s'",
                                                e.getKey(), e.getValue()))
                        .collect(Collectors.joining(",")));
    }

}
```

### What did you expect to see?

return true

### What did you see instead?

An exception will occurs: 
```java
org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to offer 
finished split information, this should not be called
        at 
com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:379)
 ~[classes/:?]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139)
 ~[classes/:?]
        at 
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleLatestFinishedSplitNumberRequest(MySqlSourceEnumerator.java:324)
 ~[classes/:?]
        at 
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:170)
 ~[classes/:?]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleSourceEvent(SourceCoordinator.java:590)
 ~[flink-runtime-1.18.0.jar:1.18.0]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:297)
 ~[flink-runtime-1.18.0.jar:1.18.0]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469)
 ~[flink-runtime-1.18.0.jar:1.18.0]
        at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 [flink-core-1.18.0.jar:1.18.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_362]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_362]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_362]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_362]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_362]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_362]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362]

```

### Reason
When restarted with newly added table, 
MySqlBinlogSplit#filterOutdatedSplitInfos will filter previous table' 
FinishedSnapshotSplitInfo. In this case, list of FinishedSnapshotSplitInfo will 
be empty.

Then when add binlog split back to split reader, 
com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#configureFilter
 cBinlogSplitReader#shouldEmit will seen an empty list of 
FinishedSnapshotSplitInfo as binlog-only job
```java
    // specific offset mode
        if (finishedSplitInfos.isEmpty()) {
            for (TableId tableId : 
currentBinlogSplit.getTableSchemas().keySet()) {
                tableIdBinlogPositionMap.put(tableId, 
currentBinlogSplit.getStartingOffset());
            }
        }
        // initial mode
        else {
            for (FinishedSnapshotSplitInfo finishedSplitInfo : 
finishedSplitInfos) {
                TableId tableId = finishedSplitInfo.getTableId();
                List<FinishedSnapshotSplitInfo> list =
                        splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
                list.add(finishedSplitInfo);
                splitsInfoMap.put(tableId, list);

                BinlogOffset highWatermark = 
finishedSplitInfo.getHighWatermark();
                BinlogOffset maxHighWatermark = 
tableIdBinlogPositionMap.get(tableId);
                if (maxHighWatermark == null || 
highWatermark.isAfter(maxHighWatermark)) {
                    tableIdBinlogPositionMap.put(tableId, highWatermark);
                }
            }
        }
```



### Are you willing to submit a PR?

- [x] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/3051
Created by: [loserwang1024|https://github.com/loserwang1024]
Labels: bug, 
Created at: Wed Jan 31 17:12:21 CST 2024
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to