this link has describe the usage for [Scan Newly Added Tables]
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables
.
if we can use if without restarting job. i have try this patch, use a
schedule task in MysqlSnapshotSplitAssigner#open(), when added table more
than twice, it occur this issue
https://github.com/apache/flink-cdc/issues/2282
.../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
| 26 +++++++++++++++++++++++---
.../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
| 5 +++--
2 files changed, 26 insertions(+), 5 deletions(-)
diff --cc
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav
index 0536a262,0536a262..d52acc26
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@@ -56,7 -56,7 +56,9 @@@ import java.util.Set
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
++import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
@@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne
private MySqlChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive;
private ExecutorService executor;
++ private ScheduledExecutorService scheduledExecutor;
@Nullable private Long checkpointIdToFinish;
@@@ -179,12 -179,12 +182,24 @@@
@Override
public void open() {
chunkSplitter.open();
-- discoveryCaptureTables();
-- captureNewlyAddedTables();
-- startAsynchronouslySplit();
++ if (scheduledExecutor == null) {
++ ThreadFactory threadFactory =
++ new
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
++ this.scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
++ }
++ scheduledExecutor.scheduleAtFixedRate(
++ () -> {
++ discoveryCaptureTables();
++ captureNewlyAddedTables();
++ startAsynchronouslySplit();
++ },
++ 0,
++ 1,
++ TimeUnit.MINUTES);
}
private void discoveryCaptureTables() {
++ LOG.info("start discovery capture tables");
// discovery the tables lazily
if (needToDiscoveryTables()) {
long start = System.currentTimeMillis();
@@@ -216,6 -216,6 +231,7 @@@
}
private void captureNewlyAddedTables() {
++ LOG.info("start to capture newly added tables");
if (sourceConfig.isScanNewlyAddedTableEnabled()) {
// check whether we got newly added tables
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
@@@ -282,6 -282,6 +298,7 @@@
}
private void startAsynchronouslySplit() {
++ LOG.info("start asynchronously split");
if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
if (executor == null) {
ThreadFactory threadFactory =
@@@ -497,6 -497,6 +514,9 @@@
if (executor != null) {
executor.shutdown();
}
++ if (scheduledExecutor != null) {
++ scheduledExecutor.shutdown();
++ }
}