Hi, 3pang zhu.

This `Scan Newly added tables` feature requires restarting the job from the
savepoint. We cannot add new tables to the running job without restarting
by now.

Best,
Hang

3pang zhu <zhu3p...@gmail.com> 于2024年3月20日周三 15:22写道:

> this link has describe the usage for [Scan Newly Added Tables]
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables
> .
> if we can use if without restarting job. i have try this patch, use a
> schedule task in MysqlSnapshotSplitAssigner#open(), when added table more
> than twice, it occur this issue
> https://github.com/apache/flink-cdc/issues/2282
>
>
>  
> .../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>                  | 26 +++++++++++++++++++++++---
>  
> .../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
>  |  5 +++--
>  2 files changed, 26 insertions(+), 5 deletions(-)
>
> diff --cc
> flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav
> index 0536a262,0536a262..d52acc26
> ---
> a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
> +++
> b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
> @@@ -56,7 -56,7 +56,9 @@@ import java.util.Set
>   import java.util.concurrent.CopyOnWriteArrayList;
>   import java.util.concurrent.ExecutorService;
>   import java.util.concurrent.Executors;
> ++import java.util.concurrent.ScheduledExecutorService;
>   import java.util.concurrent.ThreadFactory;
> ++import java.util.concurrent.TimeUnit;
>   import java.util.stream.Collectors;
>
>   import static
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
> @@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne
>       private MySqlChunkSplitter chunkSplitter;
>       private boolean isTableIdCaseSensitive;
>       private ExecutorService executor;
> ++    private ScheduledExecutorService scheduledExecutor;
>
>       @Nullable private Long checkpointIdToFinish;
>
> @@@ -179,12 -179,12 +182,24 @@@
>       @Override
>       public void open() {
>           chunkSplitter.open();
> --        discoveryCaptureTables();
> --        captureNewlyAddedTables();
> --        startAsynchronouslySplit();
> ++        if (scheduledExecutor == null) {
> ++            ThreadFactory threadFactory =
> ++                    new
> ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
> ++            this.scheduledExecutor =
> Executors.newSingleThreadScheduledExecutor(threadFactory);
> ++        }
> ++        scheduledExecutor.scheduleAtFixedRate(
> ++                () -> {
> ++                    discoveryCaptureTables();
> ++                    captureNewlyAddedTables();
> ++                    startAsynchronouslySplit();
> ++                },
> ++                0,
> ++                1,
> ++                TimeUnit.MINUTES);
>       }
>
>       private void discoveryCaptureTables() {
> ++        LOG.info("start discovery capture tables");
>           // discovery the tables lazily
>           if (needToDiscoveryTables()) {
>               long start = System.currentTimeMillis();
> @@@ -216,6 -216,6 +231,7 @@@
>       }
>
>       private void captureNewlyAddedTables() {
> ++        LOG.info("start to capture newly added tables");
>           if (sourceConfig.isScanNewlyAddedTableEnabled()) {
>               // check whether we got newly added tables
>               try (JdbcConnection jdbc = openJdbcConnection(sourceConfig))
> {
> @@@ -282,6 -282,6 +298,7 @@@
>       }
>
>       private void startAsynchronouslySplit() {
> ++        LOG.info("start asynchronously split");
>           if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
>               if (executor == null) {
>                   ThreadFactory threadFactory =
> @@@ -497,6 -497,6 +514,9 @@@
>           if (executor != null) {
>               executor.shutdown();
>           }
> ++        if (scheduledExecutor != null) {
> ++            scheduledExecutor.shutdown();
> ++        }
>       }
>

Reply via email to