João Boto created FLINK-39582:
---------------------------------

             Summary: [cdc-connector][postgres] Support emitting 
pg_logical_emit_message records to deserialization
                 Key: FLINK-39582
                 URL: https://issues.apache.org/jira/browse/FLINK-39582
             Project: Flink
          Issue Type: New Feature
          Components: Flink CDC
            Reporter: João Boto


Records produced by pg_logical_emit_message(transactional, prefix, content) 
never reach the user's DebeziumDeserializationSchema. Debezium emits them with 
op="m" and they appear in the WAL stream, but the Flink CDC pipeline silently 
drops them before deserialization.

Reproduction:
{code:sql}
SELECT pg_logical_emit_message(false, 'some_prefix', 'some_message');
{code}
The expected op="m" record is never observed by the deserializer.
h3. Root cause

Two filtering layers drop the record:

{{IncrementalSourceStreamFetcher.shouldEmit()}} classifies the logical message 
as a data-change record (because {{SourceRecordUtils.isDataChangeRecord}} 
returns true for any non-null {{{}op{}}}), then applies table-based watermark 
filtering. Logical messages have no associated table — {{TableId(null, null, 
null)}} doesn't match any finished snapshot split — so the record is filtered 
out.
Even if it passed (1), {{IncrementalSourceRecordEmitter.processElement()}} has 
no branch for {{{}op="m"{}}}; the record would fall into the "unknown element" 
branch and be logged + counted as an error.
h3. Related

{{pg_logical_emit_message}} reference: 
[https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-REPLICATION]
Debezium logical decoding messages: 
[https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-message-events]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to