Hongshun Wang created FLINK-35387:
-------------------------------------

             Summary: PG CDC source support heart beat
                 Key: FLINK-35387
                 URL: https://issues.apache.org/jira/browse/FLINK-35387
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.1.0
            Reporter: Hongshun Wang
             Fix For: cdc-3.2.0


Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. 
The reason is bellow.

In debezium dos says: For the connector to detect and process events from a 
heartbeat table, you must add the table to the PostgreSQL publication specified 
by the 
[publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name]
 property. If this publication predates your Debezium deployment, the connector 
uses the publications as defined. If the publication is not already configured 
to automatically replicate changes {{FOR ALL TABLES}} in the database, you must 
explicitly add the heartbeat table to the publication[2].

Thus, if you want use heart beat in cdc:

1. add a heartbeat table to publication: ALTER PUBLICATION _<publicationName>_ 
ADD TABLE {_}<heartbeatTableName>{_};

2. set heartbeatInterval

3. add 
debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query]
 [3]

 

However, when I use it it CDC, some exception occurs:
{code:java}
Caused by: java.lang.NullPointerException
at 
io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:127)
at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:94){code}
!https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931!

 

It seems CDC don't add  a HeartbeatConnectionProvider  when configure 
PostgresEventDispatcher:
{code:java}
//org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure
this.postgresDispatcher =
                new PostgresEventDispatcher<>(
                        dbzConfig,
                        topicSelector,
                        schema,
                        queue,
                        dbzConfig.getTableFilters().dataCollectionFilter(),
                        DataChangeEvent::new,
                        metadataProvider,
                        schemaNameAdjuster); {code}
in debezium, when PostgresConnectorTask start, it will  do it
{code:java}
//io.debezium.connector.postgresql.PostgresConnectorTask#start
  final PostgresEventDispatcher<TableId> dispatcher = new 
PostgresEventDispatcher<>(
                    connectorConfig,
                    topicNamingStrategy,
                    schema,
                    queue,
                    connectorConfig.getTableFilters().dataCollectionFilter(),
                    DataChangeEvent::new,
                    PostgresChangeRecordEmitter::updateSchema,
                    metadataProvider,
                    connectorConfig.createHeartbeat(
                            topicNamingStrategy,
                            schemaNameAdjuster,
                            () -> new 
PostgresConnection(connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_GENERAL),
                            exception -> {
                                String sqlErrorId = exception.getSQLState();
                                switch (sqlErrorId) {
                                    case "57P01":
                                        // Postgres error admin_shutdown, see 
https://www.postgresql.org/docs/12/errcodes-appendix.html
                                        throw new DebeziumException("Could not 
execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
                                    case "57P03":
                                        // Postgres error cannot_connect_now, 
see https://www.postgresql.org/docs/12/errcodes-appendix.html
                                        throw new RetriableException("Could not 
execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
                                    default:
                                        break;
                                }
                            }),
                    schemaNameAdjuster,
                    signalProcessor); {code}
Thus, this jira will add this.

 

 [1] 
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/

[2] 
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms

[3] 
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to