Flink CDC Issue Import created FLINK-34809: ----------------------------------------------
Summary: [mysql] Add notifications to Slack when the Snapshot phase ends or Binlog stream phase begins Key: FLINK-34809 URL: https://issues.apache.org/jira/browse/FLINK-34809 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: Flink CDC Issue Import ### Search before asking - [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found nothing similar. ### Motivation On our team, we use Flink CDC to perform MySql CDC. Since [there is no Snapshot Only mode for MySql yet|https://github.com/ververica/flink-cdc-connectors/issues/1687], we had a need to be notified when a snapshot is completed and when the binlog stream is started. To accomplish this, we **implemented a notification when a snapshot ends and when a binlog stream starts with GTIDs.** --- Here's the team's use case in more detail 1. We set parallelism to 2 or more for large tables. 2. And we send change event log to kafka to use Debezium's JDBC Sink Connector, which supports [Schema Evolution|https://debezium.io/documentation/reference/stable/connectors/jdbc.html#jdbc-schema-evolution). 3. Sinking to Kafka is slower than MySqlSource operator, so we give more paralleisms to sink operator more parallelism than MySqlSource. 4. In this case, the transfer is done in rebalance mode from source operator to sink operator, so the order for the same PK is not guaranteed when transferring binlogs. 5. So we restart the job based on GTIDs with parallelism equal to 1 at the end of the snapshot phase . To do this, we needed (1) to be notified that the snapshot ended and the binlog stream started, and (2) to know from which GTIDs the binlog stream started. --- This is portion of our code. We **assumed that we only capture 1 table per flink cdc job.** ```scala def getMySQLSourceOperator(]: MySqlSource[String] = { MySqlSource.builder[String|) .hostname(mySqlConfig.host) .port(mySqlConfig.port) .serverTimeZone(mySqlConfig.timeZone) .databaseList(mySqlConfig.database) .tableList(mySqlConfig.table) .username(mySqlConfig.user) .serverId(mySqlConfig.serverIdRange) .password(mySqlConfig.password) .startupOptions(mySqlConfig.startupMode) .fetchSize(mySqlConfig.fetchSize) .splitSize(mySqlConfig.splitSize) .chunkKeyColumn(new ObjectPath(mySqlConfig.database, mySqlConfig.table), mySqlConfig.chunkKeyColumn) .connectionPoolSize(mySqlConfig.poolSize) .scanNewlyAddedTableEnabled(false) .includeSchemaChanges(false) .debeziumProperties(mySqlConfig.dbzProps) .closeIdleReaders(true) .notifySnapshotToBinlogSwitch("slack-hook-url") // here what we implemented .deserializer(new JsonDebeziumDeserializationSchema(true, mySqlConfig.jsonConverterProps)) .build() } ``` --- We can't share a real picture of the notification, because our company recommends using in-house tools rather than Slack(🥲🥲🥲] and has some security policy. But it looks something like the format below! - Snapshot finished notifiaction. ``` [SNAPSHOT FINISHED] Database: test_database Table: test_table ``` - Binlog stream start notification. ``` [BINLOG STREAM START] Database: test_database Table: test_table GTIDs: 3bda59bb-2fc8-11eb-855f-fa163e2550e3:1-128377129,3c3a6a1b-c931-11ed-b0db-b4055dec129e:1-273703345,901d637c-8add-11eb-8e3f-b4055d3355a6:1-3641352422,b46b8251-5254-11ed-a648-d0946637df48:1-1069556331,db449f07-c53e-11e8-b8c2-d094663d3d1d:1-3543229125,e8d62f95-c77a-11e8-9270-d0946637df48:1-5715021533 ``` ### Solution Our implementation and PR is here. https://github.com/ververica/flink-cdc-connectors/pull/2453 ### Alternatives _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! ---------------- Imported from GitHub ---------------- Url: https://github.com/apache/flink-cdc/issues/2454 Created by: [SML0127|https://github.com/SML0127] Labels: enhancement, Created at: Sat Sep 02 14:34:50 CST 2023 State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)