[ 
https://issues.apache.org/jira/browse/FLINK-39582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39582:
-----------------------------------
    Labels: pull-request-available  (was: )

> [cdc-connector][postgres] Support emitting pg_logical_emit_message records to 
> deserialization
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39582
>                 URL: https://issues.apache.org/jira/browse/FLINK-39582
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>            Reporter: João Boto
>            Priority: Major
>              Labels: pull-request-available
>
> Records produced by pg_logical_emit_message(transactional, prefix, content) 
> never reach the user's DebeziumDeserializationSchema. Debezium emits them 
> with op="m" and they appear in the WAL stream, but the Flink CDC pipeline 
> silently drops them before deserialization.
> Reproduction:
> {code:sql}
> SELECT pg_logical_emit_message(false, 'some_prefix', 'some_message');
> {code}
> The expected op="m" record is never observed by the deserializer.
> h3. Root cause
> Two filtering layers drop the record:
> {{IncrementalSourceStreamFetcher.shouldEmit()}} classifies the logical 
> message as a data-change record (because 
> {{SourceRecordUtils.isDataChangeRecord}} returns true for any non-null 
> {{{}op{}}}), then applies table-based watermark filtering. Logical messages 
> have no associated table — {{TableId(null, null, null)}} doesn't match any 
> finished snapshot split — so the record is filtered out.
> Even if it passed (1), {{IncrementalSourceRecordEmitter.processElement()}} 
> has no branch for {{{}op="m"{}}}; the record would fall into the "unknown 
> element" branch and be logged + counted as an error.
> h3. Related
> {{pg_logical_emit_message}} reference: 
> [https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-REPLICATION]
> Debezium logical decoding messages: 
> [https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-message-events]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to