Flink CDC Issue Import created FLINK-34810:
----------------------------------------------
Summary: [mysql] Add a module to periodically check ip on DNS (for
DB Switching)
Key: FLINK-34810
URL: https://issues.apache.org/jira/browse/FLINK-34810
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Reporter: Flink CDC Issue Import
### Search before asking
- [X] I searched in the
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found
nothing similar.
### Motivation
First, we'd like to thank everyone who contributes to Flink CDC.
Our team uses Flink CDC to perform MySql CDC.
In the real-world DB use case, DBs are organized in a master-slave structure
and DB switching occurs periodically.
When DB Switching occurs, **the connected slave server is promoted to the
master server.**
---
The problem is that flink CDC connector keeps the connection, but **in the
real-world, there is a policy that it should not be attached to the master
server of DB except for some permitted processes.**
For resolve this issue, our team implemented DnsIpChecker modules. (We saw
[similar issue and
answer|https://github.com/ververica/flink-cdc-connectors/issues/2419))
Here's how it works
1. Allocate one thread at the time of `MySqlSourceReader(subtaskId=0)`
creation. (to share lifecycle of `MySqlSourceReader`)
2. That thread periodically checks the IP in DNS based on the FQDN.
3. If the IP changes, it raises a `FlinkRuntimeException` by sending a
`SourceEvent` to `MySqlSourceEnumerator` to restart the Flink Job to reset the
DB connection.
4. Add a close function to `MySqlSourceReader`, so that the thread is also
terminated when the Flink CDC Job is terminated,
This is portion of our code. We assumed that we only capture 1 table per flink
cdc job
```scala
def getMySQLSourceOperator(]: MySqlSource[String] = {
MySqlSource.builder[String|)
.hostname(mySqlConfig.host)
.port(mySqlConfig.port)
.serverTimeZone(mySqlConfig.timeZone)
.databaseList(mySqlConfig.database)
.tableList(mySqlConfig.table)
.username(mySqlConfig.user)
.serverId(mySqlConfig.serverIdRange)
.password(mySqlConfig.password)
.startupOptions(mySqlConfig.startupMode)
.fetchSize(mySqlConfig.fetchSize)
.splitSize(mySqlConfig.splitSize)
.chunkKeyColumn(new ObjectPath(mySqlConfig.database, mySqlConfig.table),
mySqlConfig.chunkKeyColumn)
.connectionPoolSize(mySqlConfig.poolSize)
.scanNewlyAddedTableEnabled(false)
.includeSchemaChanges(false)
.debeziumProperties(mySqlConfig.dbzProps)
.closeIdleReaders(true)
.restartOnDbSwitch(true) // here what we implemented
.deserializer(new JsonDebeziumDeserializationSchema(true,
mySqlConfig.jsonConverterProps))
.build()
}
```
We considered just resetting the DB connection only...
but it was difficult to get a recovery point, so we chose to restart the Flink
Job for using Flink's checkpoints.
---
Like the log below, we also take periodic logs to check if the IP scan is
performing normally.
(Hostname and ip masked]
```d
2023-09-02 07:10:45.003 INFO
com.ververica.cdc.connectors.mysql.source.utils.DNSIpChecker [] - [264361]
Current IP address for hostname test_host_name.db.server is XXX
...
2023-09-02 07:11:45.026 INFO
com.ververica.cdc.connectors.mysql.source.utils.DNSIpChecker [] - [264421]
Current IP address for hostname test_host_name.db.server is XXX
```
### Solution
Our implementation and PR is here.
https://github.com/ververica/flink-cdc-connectors/pull/2458
### Alternatives
_No response_
### Anything else?
_No response_
### Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2456
Created by: [SML0127|https://github.com/SML0127]
Labels: enhancement,
Created at: Sat Sep 02 15:17:58 CST 2023
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)