This is an automated email from the ASF dual-hosted git repository.

loserwang1024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b3acffe77 [FLINK-39582][postgres] Allow logical messages (#4387)
b3acffe77 is described below

commit b3acffe776e6078a3859624ee90566be73f7fbf6
Author: Joao Boto <[email protected]>
AuthorDate: Mon Jun 1 12:05:15 2026 +0200

    [FLINK-39582][postgres] Allow logical messages (#4387)
---
 .../reader/PostgresPipelineRecordEmitter.java      |   6 +-
 .../reader/IncrementalSourceRecordEmitter.java     |   8 +-
 .../external/IncrementalSourceStreamFetcher.java   |   2 +-
 .../source/reader/MongoDBRecordEmitter.java        |   6 +-
 .../postgres/source/PostgresSourceBuilder.java     |  37 ++++++-
 .../source/config/PostgresSourceConfig.java        |  10 +-
 .../source/config/PostgresSourceConfigFactory.java |  10 +-
 .../fetch/PostgresSourceFetchTaskContext.java      |   7 ++
 .../source/reader/PostgresSourceRecordEmitter.java |  45 +++++++-
 .../source/utils/PostgresSourceRecordUtils.java    |  68 ++++++++++++
 .../source/reader/PostgresSourceReaderTest.java    |  96 +++++++++++++++++
 .../reader/PostgresSourceRecordUtilsTest.java      | 115 +++++++++++++++++++++
 .../postgres/testutils/RecordsFormatter.java       |  55 +++++++---
 13 files changed, 436 insertions(+), 29 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 49b20e324..1030263e8 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -57,7 +57,6 @@ import java.util.Set;
 import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
 import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
 import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
-import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
 import static 
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
 import static 
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.inferSchemaChangeEvent;
@@ -66,7 +65,6 @@ import static 
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.to
 /** The {@link RecordEmitter} implementation for PostgreSQL pipeline 
connector. */
 public class PostgresPipelineRecordEmitter<T> extends 
PostgresSourceRecordEmitter<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(PostgresPipelineRecordEmitter.class);
-    private final PostgresSourceConfig sourceConfig;
     private final PostgresDialect postgresDialect;
 
     // Used when startup mode is initial
@@ -88,8 +86,8 @@ public class PostgresPipelineRecordEmitter<T> extends 
PostgresSourceRecordEmitte
                 debeziumDeserializationSchema,
                 sourceReaderMetrics,
                 sourceConfig.isIncludeSchemaChanges(),
-                offsetFactory);
-        this.sourceConfig = sourceConfig;
+                offsetFactory,
+                sourceConfig);
         this.includeDatabaseInTableId = 
sourceConfig.isIncludeDatabaseInTableId();
         this.postgresDialect = postgresDialect;
         this.alreadySendCreateTableTables = new HashSet<>();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
index 984472803..35297ec10 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
 import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -46,7 +47,6 @@ import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.Waterm
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getFetchTimestamp;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getMessageTimestamp;
-import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isHeartbeatEvent;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
 
@@ -126,13 +126,17 @@ public class IncrementalSourceRecordEmitter<T>
         }
     }
 
+    protected boolean isDataChangeRecord(SourceRecord record) {
+        return SourceRecordUtils.isDataChangeRecord(record);
+    }
+
     protected TableChanges getTableChangeRecord(SourceRecord element) throws 
IOException {
         HistoryRecord historyRecord = getHistoryRecord(element);
         Array tableChanges = 
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
         return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
     }
 
-    private void updateStreamSplitState(SourceSplitState splitState, 
SourceRecord element) {
+    protected void updateStreamSplitState(SourceSplitState splitState, 
SourceRecord element) {
         if (splitState.isStreamSplitState()) {
             Offset position = getOffsetPosition(element);
             splitState.asStreamSplitState().setStartingOffset(position);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 72c62d600..9ddb41576 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -185,7 +185,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
      *  only the change event belong to [1024, 2048) and offset is after 
highWatermark1 should send.
      * </pre>
      */
-    private boolean shouldEmit(SourceRecord sourceRecord) {
+    protected boolean shouldEmit(SourceRecord sourceRecord) {
         if (taskContext.isDataChangeRecord(sourceRecord)) {
             TableId tableId = taskContext.getTableId(sourceRecord);
             Offset position = taskContext.getStreamOffset(sourceRecord);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
index deabf6469..bfdf02391 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
@@ -40,7 +40,6 @@ import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.Waterm
 import static 
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.getFetchTimestamp;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.getMessageTimestamp;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.getResumeToken;
-import static 
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.isDataChangeRecord;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.isHeartbeatEvent;
 
 /**
@@ -122,4 +121,9 @@ public final class MongoDBRecordEmitter<T> extends 
IncrementalSourceRecordEmitte
             }
         }
     }
+
+    @Override
+    protected boolean isDataChangeRecord(SourceRecord record) {
+        return MongoRecordUtils.isDataChangeRecord(record);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index 53bb066b5..b8c9a902c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -315,6 +315,40 @@ public class PostgresSourceBuilder<T> {
         return this;
     }
 
+    /**
+     * Enable emitting Postgres logical decoding messages (records produced by 
{@code
+     * pg_logical_emit_message}) to the deserializer.
+     *
+     * <p>Unlike normal table changes, logical messages are <b>not bound to 
any table</b> and are
+     * <b>not filtered by PUBLICATION</b>. They are written as standalone 
{@code LOGICAL MESSAGE}
+     * records in the WAL and streamed to all replication slots created with 
the {@code pgoutput}
+     * plugin. Both transactional ({@code pg_logical_emit_message(true, ...)}) 
and non-transactional
+     * ({@code pg_logical_emit_message(false, ...)}) messages are delivered.
+     *
+     * <p>Once enabled, this connector performs <b>client-side prefix 
filtering</b>: only messages
+     * whose {@code prefix} starts with one of the given {@code prefixes} are 
forwarded to the
+     * deserializer; messages with non-matching prefixes are silently dropped.
+     *
+     * <p>Requirements:
+     *
+     * <ol>
+     *   <li>PostgreSQL 14+ with {@code wal_level=logical} configured in 
{@code postgresql.conf}
+     *       (the {@code pgoutput} plugin started supporting the {@code 
messages} streaming option
+     *       since PG 14).
+     *   <li>The decoding plugin must be {@code pgoutput} (configured via 
{@link
+     *       #decodingPluginName}).
+     *   <li>{@code prefixes} must be non-null and non-empty; otherwise no 
logical message will be
+     *       emitted.
+     * </ol>
+     *
+     * @param prefixes the list of message prefixes to be included; messages 
whose prefix matches
+     *     any of these will be emitted, all others are dropped
+     */
+    public PostgresSourceBuilder<T> includeLogicalMessages(List<String> 
prefixes) {
+        this.configFactory.includeLogicalMessages(prefixes);
+        return this;
+    }
+
     /** Whether to infer schema change event on relation message. */
     public PostgresSourceBuilder<T> includeSchemaChanges(boolean 
includeSchemaChanges) {
         this.configFactory.includeSchemaChanges(includeSchemaChanges);
@@ -462,7 +496,8 @@ public class PostgresSourceBuilder<T> {
                     deserializationSchema,
                     sourceReaderMetrics,
                     sourceConfig.isIncludeSchemaChanges(),
-                    offsetFactory);
+                    offsetFactory,
+                    (PostgresSourceConfig) sourceConfig);
         }
 
         public static <T> PostgresSourceBuilder<T> builder() {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 4ad4b3e7e..8891c976d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -40,6 +40,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
     private final int lsnCommitCheckpointsDelay;
     private final boolean includePartitionedTables;
     private final boolean includeDatabaseInTableId;
+    private final List<String> logicalMessagePrefixes;
 
     public PostgresSourceConfig(
             int subtaskId,
@@ -71,7 +72,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
             int lsnCommitCheckpointsDelay,
             boolean assignUnboundedChunkFirst,
             boolean includePartitionedTables,
-            boolean includeDatabaseInTableId) {
+            boolean includeDatabaseInTableId,
+            List<String> logicalMessagePrefixes) {
         super(
                 startupOptions,
                 databaseList,
@@ -103,6 +105,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
         this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
         this.includePartitionedTables = includePartitionedTables;
         this.includeDatabaseInTableId = includeDatabaseInTableId;
+        this.logicalMessagePrefixes = logicalMessagePrefixes;
     }
 
     /**
@@ -156,4 +159,9 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
     public boolean isIncludeDatabaseInTableId() {
         return includeDatabaseInTableId;
     }
+
+    /** Returns the prefixes for Postgres logical decoding messages. */
+    public List<String> getLogicalMessagePrefixes() {
+        return logicalMessagePrefixes;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 847b15474..9731cf1cb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -57,6 +57,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     private boolean includeDatabaseInTableId =
             PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
 
+    private List<String> logicalMessagePrefixes;
+
     /** Creates a new {@link PostgresSourceConfig} for the given subtask 
{@code subtaskId}. */
     @Override
     public PostgresSourceConfig create(int subtaskId) {
@@ -140,7 +142,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 lsnCommitCheckpointsDelay,
                 assignUnboundedChunkFirst,
                 includePartitionedTables,
-                includeDatabaseInTableId);
+                includeDatabaseInTableId,
+                logicalMessagePrefixes);
     }
 
     /**
@@ -198,4 +201,9 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
         this.includeDatabaseInTableId = includeDatabaseInTableId;
     }
+
+    /** Set the prefixes for Postgres logical decoding messages. */
+    public void includeLogicalMessages(List<String> logicalMessagePrefixes) {
+        this.logicalMessagePrefixes = logicalMessagePrefixes;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index 3a275ff6b..ff94ecfcc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtil
 import 
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
 import 
org.apache.flink.cdc.connectors.postgres.source.schema.RelationAwarePostgresSchema;
 import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
 import org.apache.flink.table.types.logical.RowType;
 
 import io.debezium.DebeziumException;
@@ -334,6 +335,12 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
         return new TableId(null, schemaName, tableName);
     }
 
+    @Override
+    public boolean isDataChangeRecord(SourceRecord record) {
+        // logical message (which op is 'm') is not a data change record.
+        return PostgresSourceRecordUtils.isDataChangeRecord(record);
+    }
+
     @Override
     public Offset getStreamOffset(SourceRecord sourceRecord) {
         return PostgresOffset.of(sourceRecord);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
index 7689eccf7..4c4146c23 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.cdc.connectors.postgres.source.reader;
 
+import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
 import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import 
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 
 import io.debezium.relational.Table;
@@ -28,19 +32,44 @@ import io.debezium.relational.history.TableChanges;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
 
 /** Record emitter that recognizes {@link PostgresSchemaRecord} as schema 
change events. */
 public class PostgresSourceRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    protected final PostgresSourceConfig sourceConfig;
+
     public PostgresSourceRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
             SourceReaderMetrics sourceReaderMetrics,
             boolean includeSchemaChanges,
-            OffsetFactory offsetFactory) {
+            OffsetFactory offsetFactory,
+            PostgresSourceConfig sourceConfig) {
         super(
                 debeziumDeserializationSchema,
                 sourceReaderMetrics,
                 includeSchemaChanges,
                 offsetFactory);
+        this.sourceConfig = sourceConfig;
+    }
+
+    @Override
+    protected void processElement(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
+            throws Exception {
+        if (isIncludeEmitLogicalMessage(element)) {
+            updateStreamSplitState(splitState, element);
+            emitElement(element, output);
+            return;
+        }
+        super.processElement(element, output, splitState);
+    }
+
+    @Override
+    protected boolean isDataChangeRecord(SourceRecord record) {
+        // logical message (which op is 'm') is not a data change record.
+        return PostgresSourceRecordUtils.isDataChangeRecord(record);
     }
 
     @Override
@@ -53,4 +82,18 @@ public class PostgresSourceRecordEmitter<T> extends 
IncrementalSourceRecordEmitt
             return super.getTableChangeRecord(element);
         }
     }
+
+    private boolean isIncludeEmitLogicalMessage(SourceRecord record) {
+        List<String> prefixes = sourceConfig.getLogicalMessagePrefixes();
+        if (prefixes == null
+                || prefixes.isEmpty()
+                || !PostgresSourceRecordUtils.isLogicalMessage(record)) {
+            return false;
+        }
+
+        String prefix = 
PostgresSourceRecordUtils.getLogicalMessagePrefix(record);
+        checkState(prefix != null, "message_prefix can not be null for 
logical_message");
+
+        return prefixes.stream().anyMatch(prefix::startsWith);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresSourceRecordUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresSourceRecordUtils.java
new file mode 100644
index 000000000..99bec4d6d
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresSourceRecordUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.utils;
+
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A utility class for processing {@link SourceRecord}s from a Postgres 
source. */
+public final class PostgresSourceRecordUtils {
+
+    private PostgresSourceRecordUtils() {}
+
+    /**
+     * Checks whether a {@link SourceRecord} is a logical message.
+     *
+     * @param record the source record
+     * @return true if the record is a logical message, false otherwise
+     */
+    public static boolean isLogicalMessage(SourceRecord record) {
+        if (record.value() instanceof Struct) {
+            Struct struct = (Struct) record.value();
+            return struct.schema().field(Envelope.FieldName.OPERATION) != null
+                    && 
"m".equals(struct.getString(Envelope.FieldName.OPERATION));
+        }
+        return false;
+    }
+
+    public static boolean isDataChangeRecord(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        Struct value = (Struct) record.value();
+        return value != null
+                && valueSchema != null
+                && valueSchema.field(Envelope.FieldName.OPERATION) != null
+                && value.getString(Envelope.FieldName.OPERATION) != null
+                && !isLogicalMessage(record);
+    }
+
+    /**
+     * Returns the prefix of a logical message.
+     *
+     * @param record the source record
+     * @return the prefix of the logical message
+     */
+    public static String getLogicalMessagePrefix(SourceRecord record) {
+        if (record.value() instanceof Struct) {
+            Struct struct = (Struct) record.key();
+            return struct.getString("prefix");
+        }
+        return null;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 77b845074..81d33aff5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -90,6 +90,75 @@ public class PostgresSourceReaderTest extends 
PostgresTestBase {
         customDatabase.removeSlot(slotName);
     }
 
+    @Test
+    public void testReceiveLogicalMessagesWithPrefix() throws Exception {
+        final DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("Id", DataTypes.BIGINT()),
+                        DataTypes.FIELD("Name", DataTypes.STRING()),
+                        DataTypes.FIELD("address", DataTypes.STRING()),
+                        DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+        // Discover table schemas and create a stream split
+        PostgresSourceConfigFactory configFactory = createConfigFactory();
+        
configFactory.includeLogicalMessages(Collections.singletonList("test_prefix"));
+        PostgresSourceConfig sourceConfig = configFactory.create(0);
+        try (PostgresDialect dialect = new PostgresDialect(sourceConfig);
+                PostgresSourceReader reader =
+                        createStreamReaderWithLogicalMessage(
+                                Collections.singletonList("test_prefix"))) {
+            reader.start();
+            Map<TableId, TableChanges.TableChange> tableSchemas =
+                    dialect.discoverDataCollectionSchemas(sourceConfig);
+
+            PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
+            StreamSplit streamSplit =
+                    new StreamSplit(
+                            StreamSplit.STREAM_SPLIT_ID,
+                            offsetFactory.createInitialOffset(),
+                            offsetFactory.createNoStoppingOffset(),
+                            Collections.emptyList(),
+                            tableSchemas,
+                            0);
+            reader.addSplits(Collections.singletonList(streamSplit));
+
+            // Wait for the reader to start consuming
+            Thread.sleep(1000L);
+
+            try (Connection conn =
+                            getJdbcConnection(
+                                    POSTGRES_CONTAINER, 
customDatabase.getDatabaseName());
+                    Statement stmt = conn.createStatement()) {
+                // Emit logical message -> insert data -> emit another logical 
message
+                stmt.execute("SELECT pg_logical_emit_message(false, 
'test_prefix', 'message1')");
+                stmt.execute(
+                        "INSERT INTO customer.\"Customers\" VALUES (3001, 
'between_msg', 'Beijing', '111')");
+                stmt.execute("SELECT pg_logical_emit_message(false, 
'other_prefix', 'message2')");
+                stmt.execute(
+                        "INSERT INTO customer.\"Customers\" VALUES (3002, 
'after_other', 'Shanghai', '222')");
+                stmt.execute("SELECT pg_logical_emit_message(false, 
'test_prefix', 'message3')");
+            }
+
+            // Wait for the logical message events to be processed
+            Thread.sleep(2000L);
+
+            // Poll records so the emitter processes all events
+            List<String> results = consumeStreamRecords(reader, dataType, 4);
+            // Verify ordering and filtering:
+            // - message1 (test_prefix) is captured
+            // - insert between_msg is captured
+            // - message2 (other_prefix) is filtered out
+            // - insert after_other is captured
+            // - message3 (test_prefix) is captured
+            assertThat(results)
+                    .containsExactly(
+                            "M[test_prefix, message1]",
+                            "+I[3001, between_msg, Beijing, 111]",
+                            "+I[3002, after_other, Shanghai, 222]",
+                            "M[test_prefix, message3]");
+        }
+    }
+
     @Test
     void testNotifyCheckpointWindowSizeOne() throws Exception {
         final PostgresSourceReader reader = createReader(1);
@@ -308,6 +377,18 @@ public class PostgresSourceReaderTest extends 
PostgresTestBase {
         return formatter.format(output.getResults());
     }
 
+    private List<String> consumeStreamRecords(
+            PostgresSourceReader sourceReader, DataType recordType, int size) 
throws Exception {
+        // Poll all the n records of the single split.
+        final SimpleReaderOutput output = new SimpleReaderOutput();
+        InputStatus status = MORE_AVAILABLE;
+        while (MORE_AVAILABLE == status || output.getResults().size() < size) {
+            status = sourceReader.pollNext(output);
+        }
+        final RecordsFormatter formatter = new RecordsFormatter(recordType);
+        return formatter.format(output.getResults());
+    }
+
     private PostgresSourceReader createReader(final int 
lsnCommitCheckpointsDelay)
             throws Exception {
         return createReader(lsnCommitCheckpointsDelay, false);
@@ -338,6 +419,20 @@ public class PostgresSourceReaderTest extends 
PostgresTestBase {
         return source.createReader(new TestingReaderContext());
     }
 
+    private PostgresSourceReader 
createStreamReaderWithLogicalMessage(List<String> prefixes)
+            throws Exception {
+        final PostgresOffsetFactory offsetFactory = new 
PostgresOffsetFactory();
+        final PostgresSourceConfigFactory configFactory = 
createConfigFactory();
+        configFactory.startupOptions(StartupOptions.latest());
+        configFactory.setLsnCommitCheckpointsDelay(1);
+        configFactory.includeLogicalMessages(prefixes);
+        PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
+        final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
+                new PostgresSourceBuilder.PostgresIncrementalSource<>(
+                        configFactory, new ForwardDeserializeSchema(), 
offsetFactory, dialect);
+        return source.createReader(new TestingReaderContext());
+    }
+
     private PostgresSourceConfigFactory createConfigFactory() {
         final PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
         configFactory.hostname(customDatabase.getHost());
@@ -346,6 +441,7 @@ public class PostgresSourceReaderTest extends 
PostgresTestBase {
         configFactory.tableList(SCHEMA_NAME + ".Customers");
         configFactory.username(customDatabase.getUsername());
         configFactory.password(customDatabase.getPassword());
+        configFactory.slotName(slotName);
         configFactory.decodingPluginName("pgoutput");
         return configFactory;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordUtilsTest.java
new file mode 100644
index 000000000..dcc5919cf
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordUtilsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
+
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresSourceRecordUtils}. */
+class PostgresSourceRecordUtilsTest {
+
+    private static final Schema ENVELOPE_WITH_OP =
+            SchemaBuilder.struct()
+                    .field(Envelope.FieldName.OPERATION, 
Schema.OPTIONAL_STRING_SCHEMA)
+                    .build();
+
+    private static final Schema ENVELOPE_WITHOUT_OP =
+            SchemaBuilder.struct().field("source", 
Schema.OPTIONAL_STRING_SCHEMA).build();
+
+    @Test
+    void logicalMessageRecordIsDetected() {
+        SourceRecord record = recordWithOp("m");
+        
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isTrue();
+    }
+
+    @Test
+    void dataChangeRecordsAreNotLogicalMessages() {
+        for (String op : new String[] {"c", "u", "d", "r", "t"}) {
+            SourceRecord record = recordWithOp(op);
+            assertThat(PostgresSourceRecordUtils.isLogicalMessage(record))
+                    .as("op=%s should not be detected as logical message", op)
+                    .isFalse();
+        }
+    }
+
+    @Test
+    void recordWithNullValueIsNotLogicalMessage() {
+        SourceRecord record =
+                new SourceRecord(
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        "topic",
+                        ENVELOPE_WITH_OP,
+                        null);
+        
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+    }
+
+    @Test
+    void recordWithoutOperationFieldIsNotLogicalMessage() {
+        Struct value = new Struct(ENVELOPE_WITHOUT_OP).put("source", 
"anything");
+        SourceRecord record =
+                new SourceRecord(
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        "topic",
+                        ENVELOPE_WITHOUT_OP,
+                        value);
+        
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+    }
+
+    @Test
+    void recordWithNullOperationValueIsNotLogicalMessage() {
+        Struct value = new Struct(ENVELOPE_WITH_OP);
+        SourceRecord record =
+                new SourceRecord(
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        "topic",
+                        ENVELOPE_WITH_OP,
+                        value);
+        
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+    }
+
+    @Test
+    void recordWithNonStructValueIsNotLogicalMessage() {
+        SourceRecord record =
+                new SourceRecord(
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        "topic",
+                        Schema.STRING_SCHEMA,
+                        "not-a-struct");
+        
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+    }
+
+    private static SourceRecord recordWithOp(String op) {
+        Struct value = new 
Struct(ENVELOPE_WITH_OP).put(Envelope.FieldName.OPERATION, op);
+        return new SourceRecord(
+                Collections.emptyMap(), Collections.emptyMap(), "topic", 
ENVELOPE_WITH_OP, value);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
index 7c1d9c001..e09ccb8d2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.postgres.testutils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
 import org.apache.flink.table.data.RowData;
@@ -26,15 +27,15 @@ import 
org.apache.flink.table.data.conversion.RowRowConverter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
+import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** Formatter that formats the {@link 
org.apache.kafka.connect.source.SourceRecord} to String. */
 public class RecordsFormatter {
@@ -66,22 +67,42 @@ public class RecordsFormatter {
         rowRowConverter.open(Thread.currentThread().getContextClassLoader());
     }
 
+    /**
+     * Formats records preserving the original order of both data change 
records and logical
+     * messages.
+     */
     public List<String> format(List<SourceRecord> records) {
-        records.stream()
-                // Keep DataChangeEvent only
-                .filter(SourceRecordUtils::isDataChangeRecord)
-                .forEach(
-                        r -> {
-                            try {
-                                deserializationSchema.deserialize(r, 
collector);
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-        return collector.list.stream()
-                .map(rowRowConverter::toExternal)
-                .map(Row::toString)
-                .collect(Collectors.toList());
+        List<String> result = new ArrayList<>();
+        for (SourceRecord r : records) {
+            if (PostgresSourceRecordUtils.isLogicalMessage(r)) {
+                result.add(formatLogicalMessage(r));
+            } else if (SourceRecordUtils.isDataChangeRecord(r)) {
+                try {
+                    collector.list.clear();
+                    deserializationSchema.deserialize(r, collector);
+                    for (RowData rowData : collector.list) {
+                        
result.add(rowRowConverter.toExternal(rowData).toString());
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return result;
+    }
+
+    private String formatLogicalMessage(SourceRecord record) {
+        Struct message = ((Struct) record.value()).getStruct("message");
+        String prefix = message.getString("prefix");
+        Object contentObj = message.get("content");
+        String content;
+        if (contentObj instanceof ByteBuffer) {
+            content = new String(((ByteBuffer) contentObj).array());
+        } else {
+            content = String.valueOf(contentObj);
+        }
+
+        return "M[" + prefix + ", " + content + "]";
     }
 
     private static class SimpleCollector implements Collector<RowData> {


Reply via email to