João Boto created FLINK-39582:
---------------------------------
Summary: [cdc-connector][postgres] Support emitting
pg_logical_emit_message records to deserialization
Key: FLINK-39582
URL: https://issues.apache.org/jira/browse/FLINK-39582
Project: Flink
Issue Type: New Feature
Components: Flink CDC
Reporter: João Boto
Records produced by pg_logical_emit_message(transactional, prefix, content)
never reach the user's DebeziumDeserializationSchema. Debezium emits them with
op="m" and they appear in the WAL stream, but the Flink CDC pipeline silently
drops them before deserialization.
Reproduction:
{code:sql}
SELECT pg_logical_emit_message(false, 'some_prefix', 'some_message');
{code}
The expected op="m" record is never observed by the deserializer.
h3. Root cause
Two filtering layers drop the record:
{{IncrementalSourceStreamFetcher.shouldEmit()}} classifies the logical message
as a data-change record (because {{SourceRecordUtils.isDataChangeRecord}}
returns true for any non-null {{{}op{}}}), then applies table-based watermark
filtering. Logical messages have no associated table — {{TableId(null, null,
null)}} doesn't match any finished snapshot split — so the record is filtered
out.
Even if it passed (1), {{IncrementalSourceRecordEmitter.processElement()}} has
no branch for {{{}op="m"{}}}; the record would fall into the "unknown element"
branch and be logged + counted as an error.
h3. Related
{{pg_logical_emit_message}} reference:
[https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-REPLICATION]
Debezium logical decoding messages:
[https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-message-events]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)