Flink CDC Issue Import created FLINK-34810:
----------------------------------------------

             Summary: [mysql] Add a module to periodically check ip on DNS (for 
DB Switching)
                 Key: FLINK-34810
                 URL: https://issues.apache.org/jira/browse/FLINK-34810
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
nothing similar.


### Motivation

First, we'd like to thank everyone who contributes to Flink CDC.

Our team uses Flink CDC to perform MySql CDC.

In the real-world DB use case, DBs are organized in a master-slave structure 
and DB switching occurs periodically.
When DB Switching occurs, **the connected slave server is promoted to the 
master server.**

--- 

The problem is that flink CDC connector keeps the connection, but **in the 
real-world, there is a policy that it should not be attached to the master 
server of DB except for some permitted processes.**

For resolve this issue, our team implemented DnsIpChecker modules. (We saw 
[similar issue and 
answer|https://github.com/ververica/flink-cdc-connectors/issues/2419))

Here's how it works
1. Allocate one thread at the time of `MySqlSourceReader(subtaskId=0)` 
creation. (to share lifecycle of `MySqlSourceReader`)
2. That thread periodically checks the IP in DNS based on the FQDN. 
3. If the IP changes, it raises a `FlinkRuntimeException` by sending a 
`SourceEvent` to `MySqlSourceEnumerator` to restart the Flink Job to reset the 
DB connection.
4. Add a close function to `MySqlSourceReader`, so that the thread is also 
terminated when the Flink CDC Job is terminated, 

This is portion of our code. We assumed that we only capture 1 table per flink 
cdc job

```scala
  def getMySQLSourceOperator(]: MySqlSource[String] = {
    MySqlSource.builder[String|)
      .hostname(mySqlConfig.host)
      .port(mySqlConfig.port)
      .serverTimeZone(mySqlConfig.timeZone)
      .databaseList(mySqlConfig.database)
      .tableList(mySqlConfig.table)
      .username(mySqlConfig.user)
      .serverId(mySqlConfig.serverIdRange)
      .password(mySqlConfig.password)
      .startupOptions(mySqlConfig.startupMode)
      .fetchSize(mySqlConfig.fetchSize)
      .splitSize(mySqlConfig.splitSize)
      .chunkKeyColumn(new ObjectPath(mySqlConfig.database, mySqlConfig.table), 
mySqlConfig.chunkKeyColumn)
      .connectionPoolSize(mySqlConfig.poolSize)
      .scanNewlyAddedTableEnabled(false)
      .includeSchemaChanges(false)
      .debeziumProperties(mySqlConfig.dbzProps)
      .closeIdleReaders(true)
      .restartOnDbSwitch(true) // here what we implemented
      .deserializer(new JsonDebeziumDeserializationSchema(true, 
mySqlConfig.jsonConverterProps))
      .build()
  }
```

We considered just resetting the DB connection only...
but it was difficult to get a recovery point, so we chose to restart the Flink 
Job for using Flink's checkpoints.

---

Like the log below, we also take periodic logs to check if the IP scan is 
performing normally. 
(Hostname and ip masked]
```d
2023-09-02 07:10:45.003 INFO  
com.ververica.cdc.connectors.mysql.source.utils.DNSIpChecker [] - [264361] 
Current IP address for hostname test_host_name.db.server is XXX
...
2023-09-02 07:11:45.026 INFO  
com.ververica.cdc.connectors.mysql.source.utils.DNSIpChecker [] - [264421] 
Current IP address for hostname test_host_name.db.server is XXX
```


### Solution

Our implementation and PR is here. 
https://github.com/ververica/flink-cdc-connectors/pull/2458

### Alternatives

_No response_

### Anything else?

_No response_

### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2456
Created by: [SML0127|https://github.com/SML0127]
Labels: enhancement, 
Created at: Sat Sep 02 15:17:58 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to