Flink CDC Issue Import created FLINK-34809:
----------------------------------------------

             Summary: [mysql] Add notifications to Slack when the Snapshot 
phase ends or Binlog stream phase begins
                 Key: FLINK-34809
                 URL: https://issues.apache.org/jira/browse/FLINK-34809
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
nothing similar.


### Motivation

On our team, we use Flink CDC to perform MySql CDC.

Since [there is no Snapshot Only mode for MySql 
yet|https://github.com/ververica/flink-cdc-connectors/issues/1687], we had a 
need to be notified when a snapshot is completed and when the binlog stream is 
started.

To accomplish this, we **implemented a notification when a snapshot ends and 
when a binlog stream starts with  GTIDs.**

--- 

Here's the team's use case in more detail 
1. We set parallelism to 2 or more for large tables.
2. And we send change event log to kafka to use Debezium's JDBC Sink Connector, 
which supports [Schema 
Evolution|https://debezium.io/documentation/reference/stable/connectors/jdbc.html#jdbc-schema-evolution).
3. Sinking to Kafka is slower than MySqlSource operator, so we give more 
paralleisms to sink operator more parallelism than MySqlSource.
4. In this case, the transfer is done in rebalance mode from source operator to 
sink operator, so the order for the same PK is not guaranteed when transferring 
binlogs.
5. So we restart the job based on GTIDs with parallelism equal to 1 at the end 
of the snapshot phase .

To do this, we needed (1) to be notified that the snapshot ended and the binlog 
stream started, and (2) to know from which GTIDs the binlog stream started.

---

This is portion of our code. We **assumed that we only capture 1 table per 
flink cdc job.**
```scala
  def getMySQLSourceOperator(]: MySqlSource[String] = {
    MySqlSource.builder[String|)
      .hostname(mySqlConfig.host)
      .port(mySqlConfig.port)
      .serverTimeZone(mySqlConfig.timeZone)
      .databaseList(mySqlConfig.database)
      .tableList(mySqlConfig.table)
      .username(mySqlConfig.user)
      .serverId(mySqlConfig.serverIdRange)
      .password(mySqlConfig.password)
      .startupOptions(mySqlConfig.startupMode)
      .fetchSize(mySqlConfig.fetchSize)
      .splitSize(mySqlConfig.splitSize)
      .chunkKeyColumn(new ObjectPath(mySqlConfig.database, mySqlConfig.table), 
mySqlConfig.chunkKeyColumn)
      .connectionPoolSize(mySqlConfig.poolSize)
      .scanNewlyAddedTableEnabled(false)
      .includeSchemaChanges(false)
      .debeziumProperties(mySqlConfig.dbzProps)
      .closeIdleReaders(true)
      .notifySnapshotToBinlogSwitch("slack-hook-url") // here what we 
implemented
      .deserializer(new JsonDebeziumDeserializationSchema(true, 
mySqlConfig.jsonConverterProps))
      .build()
  }
```

--- 

We can't share a real picture of the notification, because our company 
recommends using in-house tools rather than Slack(🥲🥲🥲] and has some security 
policy.

But it looks something like the format below!
- Snapshot finished notifiaction.
```
[SNAPSHOT FINISHED]
 Database: test_database
 Table: test_table
```
- Binlog stream start notification.
```
[BINLOG STREAM START]
 Database: test_database
 Table: test_table
 GTIDs: 
3bda59bb-2fc8-11eb-855f-fa163e2550e3:1-128377129,3c3a6a1b-c931-11ed-b0db-b4055dec129e:1-273703345,901d637c-8add-11eb-8e3f-b4055d3355a6:1-3641352422,b46b8251-5254-11ed-a648-d0946637df48:1-1069556331,db449f07-c53e-11e8-b8c2-d094663d3d1d:1-3543229125,e8d62f95-c77a-11e8-9270-d0946637df48:1-5715021533
```


### Solution

Our implementation and PR is here. 
https://github.com/ververica/flink-cdc-connectors/pull/2453

### Alternatives

_No response_

### Anything else?

_No response_

### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2454
Created by: [SML0127|https://github.com/SML0127]
Labels: enhancement, 
Created at: Sat Sep 02 14:34:50 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to