[
https://issues.apache.org/jira/browse/FLINK-39582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39582:
-----------------------------------
Labels: pull-request-available (was: )
> [cdc-connector][postgres] Support emitting pg_logical_emit_message records to
> deserialization
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-39582
> URL: https://issues.apache.org/jira/browse/FLINK-39582
> Project: Flink
> Issue Type: New Feature
> Components: Flink CDC
> Reporter: João Boto
> Priority: Major
> Labels: pull-request-available
>
> Records produced by pg_logical_emit_message(transactional, prefix, content)
> never reach the user's DebeziumDeserializationSchema. Debezium emits them
> with op="m" and they appear in the WAL stream, but the Flink CDC pipeline
> silently drops them before deserialization.
> Reproduction:
> {code:sql}
> SELECT pg_logical_emit_message(false, 'some_prefix', 'some_message');
> {code}
> The expected op="m" record is never observed by the deserializer.
> h3. Root cause
> Two filtering layers drop the record:
> {{IncrementalSourceStreamFetcher.shouldEmit()}} classifies the logical
> message as a data-change record (because
> {{SourceRecordUtils.isDataChangeRecord}} returns true for any non-null
> {{{}op{}}}), then applies table-based watermark filtering. Logical messages
> have no associated table — {{TableId(null, null, null)}} doesn't match any
> finished snapshot split — so the record is filtered out.
> Even if it passed (1), {{IncrementalSourceRecordEmitter.processElement()}}
> has no branch for {{{}op="m"{}}}; the record would fall into the "unknown
> element" branch and be logged + counted as an error.
> h3. Related
> {{pg_logical_emit_message}} reference:
> [https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-REPLICATION]
> Debezium logical decoding messages:
> [https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-message-events]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)