This is an automated email from the ASF dual-hosted git repository.
loserwang1024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new b3acffe77 [FLINK-39582][postgres] Allow logical messages (#4387)
b3acffe77 is described below
commit b3acffe776e6078a3859624ee90566be73f7fbf6
Author: Joao Boto <[email protected]>
AuthorDate: Mon Jun 1 12:05:15 2026 +0200
[FLINK-39582][postgres] Allow logical messages (#4387)
---
.../reader/PostgresPipelineRecordEmitter.java | 6 +-
.../reader/IncrementalSourceRecordEmitter.java | 8 +-
.../external/IncrementalSourceStreamFetcher.java | 2 +-
.../source/reader/MongoDBRecordEmitter.java | 6 +-
.../postgres/source/PostgresSourceBuilder.java | 37 ++++++-
.../source/config/PostgresSourceConfig.java | 10 +-
.../source/config/PostgresSourceConfigFactory.java | 10 +-
.../fetch/PostgresSourceFetchTaskContext.java | 7 ++
.../source/reader/PostgresSourceRecordEmitter.java | 45 +++++++-
.../source/utils/PostgresSourceRecordUtils.java | 68 ++++++++++++
.../source/reader/PostgresSourceReaderTest.java | 96 +++++++++++++++++
.../reader/PostgresSourceRecordUtilsTest.java | 115 +++++++++++++++++++++
.../postgres/testutils/RecordsFormatter.java | 55 +++++++---
13 files changed, 436 insertions(+), 29 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 49b20e324..1030263e8 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -57,7 +57,6 @@ import java.util.Set;
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
-import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
import static
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
import static
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.inferSchemaChangeEvent;
@@ -66,7 +65,6 @@ import static
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.to
/** The {@link RecordEmitter} implementation for PostgreSQL pipeline
connector. */
public class PostgresPipelineRecordEmitter<T> extends
PostgresSourceRecordEmitter<T> {
private static final Logger LOG =
LoggerFactory.getLogger(PostgresPipelineRecordEmitter.class);
- private final PostgresSourceConfig sourceConfig;
private final PostgresDialect postgresDialect;
// Used when startup mode is initial
@@ -88,8 +86,8 @@ public class PostgresPipelineRecordEmitter<T> extends
PostgresSourceRecordEmitte
debeziumDeserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges(),
- offsetFactory);
- this.sourceConfig = sourceConfig;
+ offsetFactory,
+ sourceConfig);
this.includeDatabaseInTableId =
sourceConfig.isIncludeDatabaseInTableId();
this.postgresDialect = postgresDialect;
this.alreadySendCreateTableTables = new HashSet<>();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
index 984472803..35297ec10 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -24,6 +24,7 @@ import
org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -46,7 +47,6 @@ import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.Waterm
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getFetchTimestamp;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getMessageTimestamp;
-import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isHeartbeatEvent;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
@@ -126,13 +126,17 @@ public class IncrementalSourceRecordEmitter<T>
}
}
+ protected boolean isDataChangeRecord(SourceRecord record) {
+ return SourceRecordUtils.isDataChangeRecord(record);
+ }
+
protected TableChanges getTableChangeRecord(SourceRecord element) throws
IOException {
HistoryRecord historyRecord = getHistoryRecord(element);
Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
}
- private void updateStreamSplitState(SourceSplitState splitState,
SourceRecord element) {
+ protected void updateStreamSplitState(SourceSplitState splitState,
SourceRecord element) {
if (splitState.isStreamSplitState()) {
Offset position = getOffsetPosition(element);
splitState.asStreamSplitState().setStartingOffset(position);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 72c62d600..9ddb41576 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -185,7 +185,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
* only the change event belong to [1024, 2048) and offset is after
highWatermark1 should send.
* </pre>
*/
- private boolean shouldEmit(SourceRecord sourceRecord) {
+ protected boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
TableId tableId = taskContext.getTableId(sourceRecord);
Offset position = taskContext.getStreamOffset(sourceRecord);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
index deabf6469..bfdf02391 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java
@@ -40,7 +40,6 @@ import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.Waterm
import static
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.getFetchTimestamp;
import static
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.getMessageTimestamp;
import static
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.getResumeToken;
-import static
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.isDataChangeRecord;
import static
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.isHeartbeatEvent;
/**
@@ -122,4 +121,9 @@ public final class MongoDBRecordEmitter<T> extends
IncrementalSourceRecordEmitte
}
}
}
+
+ @Override
+ protected boolean isDataChangeRecord(SourceRecord record) {
+ return MongoRecordUtils.isDataChangeRecord(record);
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index 53bb066b5..b8c9a902c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -315,6 +315,40 @@ public class PostgresSourceBuilder<T> {
return this;
}
+ /**
+ * Enable emitting Postgres logical decoding messages (records produced by
{@code
+ * pg_logical_emit_message}) to the deserializer.
+ *
+ * <p>Unlike normal table changes, logical messages are <b>not bound to
any table</b> and are
+ * <b>not filtered by PUBLICATION</b>. They are written as standalone
{@code LOGICAL MESSAGE}
+ * records in the WAL and streamed to all replication slots created with
the {@code pgoutput}
+ * plugin. Both transactional ({@code pg_logical_emit_message(true, ...)})
and non-transactional
+ * ({@code pg_logical_emit_message(false, ...)}) messages are delivered.
+ *
+ * <p>Once enabled, this connector performs <b>client-side prefix
filtering</b>: only messages
+ * whose {@code prefix} starts with one of the given {@code prefixes} are
forwarded to the
+ * deserializer; messages with non-matching prefixes are silently dropped.
+ *
+ * <p>Requirements:
+ *
+ * <ol>
+ * <li>PostgreSQL 14+ with {@code wal_level=logical} configured in
{@code postgresql.conf}
+ * (the {@code pgoutput} plugin started supporting the {@code
messages} streaming option
+ * since PG 14).
+ * <li>The decoding plugin must be {@code pgoutput} (configured via
{@link
+ * #decodingPluginName}).
+ * <li>{@code prefixes} must be non-null and non-empty; otherwise no
logical message will be
+ * emitted.
+ * </ol>
+ *
+ * @param prefixes the list of message prefixes to be included; messages
whose prefix matches
+ * any of these will be emitted, all others are dropped
+ */
+ public PostgresSourceBuilder<T> includeLogicalMessages(List<String>
prefixes) {
+ this.configFactory.includeLogicalMessages(prefixes);
+ return this;
+ }
+
/** Whether to infer schema change event on relation message. */
public PostgresSourceBuilder<T> includeSchemaChanges(boolean
includeSchemaChanges) {
this.configFactory.includeSchemaChanges(includeSchemaChanges);
@@ -462,7 +496,8 @@ public class PostgresSourceBuilder<T> {
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges(),
- offsetFactory);
+ offsetFactory,
+ (PostgresSourceConfig) sourceConfig);
}
public static <T> PostgresSourceBuilder<T> builder() {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 4ad4b3e7e..8891c976d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -40,6 +40,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private final int lsnCommitCheckpointsDelay;
private final boolean includePartitionedTables;
private final boolean includeDatabaseInTableId;
+ private final List<String> logicalMessagePrefixes;
public PostgresSourceConfig(
int subtaskId,
@@ -71,7 +72,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst,
boolean includePartitionedTables,
- boolean includeDatabaseInTableId) {
+ boolean includeDatabaseInTableId,
+ List<String> logicalMessagePrefixes) {
super(
startupOptions,
databaseList,
@@ -103,6 +105,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.includePartitionedTables = includePartitionedTables;
this.includeDatabaseInTableId = includeDatabaseInTableId;
+ this.logicalMessagePrefixes = logicalMessagePrefixes;
}
/**
@@ -156,4 +159,9 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
public boolean isIncludeDatabaseInTableId() {
return includeDatabaseInTableId;
}
+
+ /** Returns the prefixes for Postgres logical decoding messages. */
+ public List<String> getLogicalMessagePrefixes() {
+ return logicalMessagePrefixes;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 847b15474..9731cf1cb 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -57,6 +57,8 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
private boolean includeDatabaseInTableId =
PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
+ private List<String> logicalMessagePrefixes;
+
/** Creates a new {@link PostgresSourceConfig} for the given subtask
{@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
@@ -140,7 +142,8 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
includePartitionedTables,
- includeDatabaseInTableId);
+ includeDatabaseInTableId,
+ logicalMessagePrefixes);
}
/**
@@ -198,4 +201,9 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
this.includeDatabaseInTableId = includeDatabaseInTableId;
}
+
+ /** Set the prefixes for Postgres logical decoding messages. */
+ public void includeLogicalMessages(List<String> logicalMessagePrefixes) {
+ this.logicalMessagePrefixes = logicalMessagePrefixes;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index 3a275ff6b..ff94ecfcc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -32,6 +32,7 @@ import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtil
import
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
import
org.apache.flink.cdc.connectors.postgres.source.schema.RelationAwarePostgresSchema;
import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
import org.apache.flink.table.types.logical.RowType;
import io.debezium.DebeziumException;
@@ -334,6 +335,12 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
return new TableId(null, schemaName, tableName);
}
+ @Override
+ public boolean isDataChangeRecord(SourceRecord record) {
+ // logical message (which op is 'm') is not a data change record.
+ return PostgresSourceRecordUtils.isDataChangeRecord(record);
+ }
+
@Override
public Offset getStreamOffset(SourceRecord sourceRecord) {
return PostgresOffset.of(sourceRecord);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
index 7689eccf7..4c4146c23 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
@@ -17,10 +17,14 @@
package org.apache.flink.cdc.connectors.postgres.source.reader;
+import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.relational.Table;
@@ -28,19 +32,44 @@ import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
/** Record emitter that recognizes {@link PostgresSchemaRecord} as schema
change events. */
public class PostgresSourceRecordEmitter<T> extends
IncrementalSourceRecordEmitter<T> {
+ protected final PostgresSourceConfig sourceConfig;
+
public PostgresSourceRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
SourceReaderMetrics sourceReaderMetrics,
boolean includeSchemaChanges,
- OffsetFactory offsetFactory) {
+ OffsetFactory offsetFactory,
+ PostgresSourceConfig sourceConfig) {
super(
debeziumDeserializationSchema,
sourceReaderMetrics,
includeSchemaChanges,
offsetFactory);
+ this.sourceConfig = sourceConfig;
+ }
+
+ @Override
+ protected void processElement(
+ SourceRecord element, SourceOutput<T> output, SourceSplitState
splitState)
+ throws Exception {
+ if (isIncludeEmitLogicalMessage(element)) {
+ updateStreamSplitState(splitState, element);
+ emitElement(element, output);
+ return;
+ }
+ super.processElement(element, output, splitState);
+ }
+
+ @Override
+ protected boolean isDataChangeRecord(SourceRecord record) {
+ // logical message (which op is 'm') is not a data change record.
+ return PostgresSourceRecordUtils.isDataChangeRecord(record);
}
@Override
@@ -53,4 +82,18 @@ public class PostgresSourceRecordEmitter<T> extends
IncrementalSourceRecordEmitt
return super.getTableChangeRecord(element);
}
}
+
+ private boolean isIncludeEmitLogicalMessage(SourceRecord record) {
+ List<String> prefixes = sourceConfig.getLogicalMessagePrefixes();
+ if (prefixes == null
+ || prefixes.isEmpty()
+ || !PostgresSourceRecordUtils.isLogicalMessage(record)) {
+ return false;
+ }
+
+ String prefix =
PostgresSourceRecordUtils.getLogicalMessagePrefix(record);
+ checkState(prefix != null, "message_prefix can not be null for
logical_message");
+
+ return prefixes.stream().anyMatch(prefix::startsWith);
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresSourceRecordUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresSourceRecordUtils.java
new file mode 100644
index 000000000..99bec4d6d
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresSourceRecordUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.utils;
+
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A utility class for processing {@link SourceRecord}s from a Postgres
source. */
+public final class PostgresSourceRecordUtils {
+
+ private PostgresSourceRecordUtils() {}
+
+ /**
+ * Checks whether a {@link SourceRecord} is a logical message.
+ *
+ * @param record the source record
+ * @return true if the record is a logical message, false otherwise
+ */
+ public static boolean isLogicalMessage(SourceRecord record) {
+ if (record.value() instanceof Struct) {
+ Struct struct = (Struct) record.value();
+ return struct.schema().field(Envelope.FieldName.OPERATION) != null
+ &&
"m".equals(struct.getString(Envelope.FieldName.OPERATION));
+ }
+ return false;
+ }
+
+ public static boolean isDataChangeRecord(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ Struct value = (Struct) record.value();
+ return value != null
+ && valueSchema != null
+ && valueSchema.field(Envelope.FieldName.OPERATION) != null
+ && value.getString(Envelope.FieldName.OPERATION) != null
+ && !isLogicalMessage(record);
+ }
+
+ /**
+ * Returns the prefix of a logical message.
+ *
+ * @param record the source record
+ * @return the prefix of the logical message
+ */
+ public static String getLogicalMessagePrefix(SourceRecord record) {
+ if (record.value() instanceof Struct) {
+ Struct struct = (Struct) record.key();
+ return struct.getString("prefix");
+ }
+ return null;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 77b845074..81d33aff5 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -90,6 +90,75 @@ public class PostgresSourceReaderTest extends
PostgresTestBase {
customDatabase.removeSlot(slotName);
}
+ @Test
+ public void testReceiveLogicalMessagesWithPrefix() throws Exception {
+ final DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("Id", DataTypes.BIGINT()),
+ DataTypes.FIELD("Name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+ // Discover table schemas and create a stream split
+ PostgresSourceConfigFactory configFactory = createConfigFactory();
+
configFactory.includeLogicalMessages(Collections.singletonList("test_prefix"));
+ PostgresSourceConfig sourceConfig = configFactory.create(0);
+ try (PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ PostgresSourceReader reader =
+ createStreamReaderWithLogicalMessage(
+ Collections.singletonList("test_prefix"))) {
+ reader.start();
+ Map<TableId, TableChanges.TableChange> tableSchemas =
+ dialect.discoverDataCollectionSchemas(sourceConfig);
+
+ PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
+ StreamSplit streamSplit =
+ new StreamSplit(
+ StreamSplit.STREAM_SPLIT_ID,
+ offsetFactory.createInitialOffset(),
+ offsetFactory.createNoStoppingOffset(),
+ Collections.emptyList(),
+ tableSchemas,
+ 0);
+ reader.addSplits(Collections.singletonList(streamSplit));
+
+ // Wait for the reader to start consuming
+ Thread.sleep(1000L);
+
+ try (Connection conn =
+ getJdbcConnection(
+ POSTGRES_CONTAINER,
customDatabase.getDatabaseName());
+ Statement stmt = conn.createStatement()) {
+ // Emit logical message -> insert data -> emit another logical
message
+ stmt.execute("SELECT pg_logical_emit_message(false,
'test_prefix', 'message1')");
+ stmt.execute(
+ "INSERT INTO customer.\"Customers\" VALUES (3001,
'between_msg', 'Beijing', '111')");
+ stmt.execute("SELECT pg_logical_emit_message(false,
'other_prefix', 'message2')");
+ stmt.execute(
+ "INSERT INTO customer.\"Customers\" VALUES (3002,
'after_other', 'Shanghai', '222')");
+ stmt.execute("SELECT pg_logical_emit_message(false,
'test_prefix', 'message3')");
+ }
+
+ // Wait for the logical message events to be processed
+ Thread.sleep(2000L);
+
+ // Poll records so the emitter processes all events
+ List<String> results = consumeStreamRecords(reader, dataType, 4);
+ // Verify ordering and filtering:
+ // - message1 (test_prefix) is captured
+ // - insert between_msg is captured
+ // - message2 (other_prefix) is filtered out
+ // - insert after_other is captured
+ // - message3 (test_prefix) is captured
+ assertThat(results)
+ .containsExactly(
+ "M[test_prefix, message1]",
+ "+I[3001, between_msg, Beijing, 111]",
+ "+I[3002, after_other, Shanghai, 222]",
+ "M[test_prefix, message3]");
+ }
+ }
+
@Test
void testNotifyCheckpointWindowSizeOne() throws Exception {
final PostgresSourceReader reader = createReader(1);
@@ -308,6 +377,18 @@ public class PostgresSourceReaderTest extends
PostgresTestBase {
return formatter.format(output.getResults());
}
+ private List<String> consumeStreamRecords(
+ PostgresSourceReader sourceReader, DataType recordType, int size)
throws Exception {
+ // Poll all the n records of the single split.
+ final SimpleReaderOutput output = new SimpleReaderOutput();
+ InputStatus status = MORE_AVAILABLE;
+ while (MORE_AVAILABLE == status || output.getResults().size() < size) {
+ status = sourceReader.pollNext(output);
+ }
+ final RecordsFormatter formatter = new RecordsFormatter(recordType);
+ return formatter.format(output.getResults());
+ }
+
private PostgresSourceReader createReader(final int
lsnCommitCheckpointsDelay)
throws Exception {
return createReader(lsnCommitCheckpointsDelay, false);
@@ -338,6 +419,20 @@ public class PostgresSourceReaderTest extends
PostgresTestBase {
return source.createReader(new TestingReaderContext());
}
+ private PostgresSourceReader
createStreamReaderWithLogicalMessage(List<String> prefixes)
+ throws Exception {
+ final PostgresOffsetFactory offsetFactory = new
PostgresOffsetFactory();
+ final PostgresSourceConfigFactory configFactory =
createConfigFactory();
+ configFactory.startupOptions(StartupOptions.latest());
+ configFactory.setLsnCommitCheckpointsDelay(1);
+ configFactory.includeLogicalMessages(prefixes);
+ PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
+ final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
+ new PostgresSourceBuilder.PostgresIncrementalSource<>(
+ configFactory, new ForwardDeserializeSchema(),
offsetFactory, dialect);
+ return source.createReader(new TestingReaderContext());
+ }
+
private PostgresSourceConfigFactory createConfigFactory() {
final PostgresSourceConfigFactory configFactory = new
PostgresSourceConfigFactory();
configFactory.hostname(customDatabase.getHost());
@@ -346,6 +441,7 @@ public class PostgresSourceReaderTest extends
PostgresTestBase {
configFactory.tableList(SCHEMA_NAME + ".Customers");
configFactory.username(customDatabase.getUsername());
configFactory.password(customDatabase.getPassword());
+ configFactory.slotName(slotName);
configFactory.decodingPluginName("pgoutput");
return configFactory;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordUtilsTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordUtilsTest.java
new file mode 100644
index 000000000..dcc5919cf
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordUtilsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
+
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresSourceRecordUtils}. */
+class PostgresSourceRecordUtilsTest {
+
+ private static final Schema ENVELOPE_WITH_OP =
+ SchemaBuilder.struct()
+ .field(Envelope.FieldName.OPERATION,
Schema.OPTIONAL_STRING_SCHEMA)
+ .build();
+
+ private static final Schema ENVELOPE_WITHOUT_OP =
+ SchemaBuilder.struct().field("source",
Schema.OPTIONAL_STRING_SCHEMA).build();
+
+ @Test
+ void logicalMessageRecordIsDetected() {
+ SourceRecord record = recordWithOp("m");
+
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isTrue();
+ }
+
+ @Test
+ void dataChangeRecordsAreNotLogicalMessages() {
+ for (String op : new String[] {"c", "u", "d", "r", "t"}) {
+ SourceRecord record = recordWithOp(op);
+ assertThat(PostgresSourceRecordUtils.isLogicalMessage(record))
+ .as("op=%s should not be detected as logical message", op)
+ .isFalse();
+ }
+ }
+
+ @Test
+ void recordWithNullValueIsNotLogicalMessage() {
+ SourceRecord record =
+ new SourceRecord(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ "topic",
+ ENVELOPE_WITH_OP,
+ null);
+
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+ }
+
+ @Test
+ void recordWithoutOperationFieldIsNotLogicalMessage() {
+ Struct value = new Struct(ENVELOPE_WITHOUT_OP).put("source",
"anything");
+ SourceRecord record =
+ new SourceRecord(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ "topic",
+ ENVELOPE_WITHOUT_OP,
+ value);
+
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+ }
+
+ @Test
+ void recordWithNullOperationValueIsNotLogicalMessage() {
+ Struct value = new Struct(ENVELOPE_WITH_OP);
+ SourceRecord record =
+ new SourceRecord(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ "topic",
+ ENVELOPE_WITH_OP,
+ value);
+
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+ }
+
+ @Test
+ void recordWithNonStructValueIsNotLogicalMessage() {
+ SourceRecord record =
+ new SourceRecord(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ "topic",
+ Schema.STRING_SCHEMA,
+ "not-a-struct");
+
assertThat(PostgresSourceRecordUtils.isLogicalMessage(record)).isFalse();
+ }
+
+ private static SourceRecord recordWithOp(String op) {
+ Struct value = new
Struct(ENVELOPE_WITH_OP).put(Envelope.FieldName.OPERATION, op);
+ return new SourceRecord(
+ Collections.emptyMap(), Collections.emptyMap(), "topic",
ENVELOPE_WITH_OP, value);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
index 7c1d9c001..e09ccb8d2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/RecordsFormatter.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.postgres.testutils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresSourceRecordUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.table.data.RowData;
@@ -26,15 +27,15 @@ import
org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
/** Formatter that formats the {@link
org.apache.kafka.connect.source.SourceRecord} to String. */
public class RecordsFormatter {
@@ -66,22 +67,42 @@ public class RecordsFormatter {
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
}
+ /**
+ * Formats records preserving the original order of both data change
records and logical
+ * messages.
+ */
public List<String> format(List<SourceRecord> records) {
- records.stream()
- // Keep DataChangeEvent only
- .filter(SourceRecordUtils::isDataChangeRecord)
- .forEach(
- r -> {
- try {
- deserializationSchema.deserialize(r,
collector);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- return collector.list.stream()
- .map(rowRowConverter::toExternal)
- .map(Row::toString)
- .collect(Collectors.toList());
+ List<String> result = new ArrayList<>();
+ for (SourceRecord r : records) {
+ if (PostgresSourceRecordUtils.isLogicalMessage(r)) {
+ result.add(formatLogicalMessage(r));
+ } else if (SourceRecordUtils.isDataChangeRecord(r)) {
+ try {
+ collector.list.clear();
+ deserializationSchema.deserialize(r, collector);
+ for (RowData rowData : collector.list) {
+
result.add(rowRowConverter.toExternal(rowData).toString());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return result;
+ }
+
+ private String formatLogicalMessage(SourceRecord record) {
+ Struct message = ((Struct) record.value()).getStruct("message");
+ String prefix = message.getString("prefix");
+ Object contentObj = message.get("content");
+ String content;
+ if (contentObj instanceof ByteBuffer) {
+ content = new String(((ByteBuffer) contentObj).array());
+ } else {
+ content = String.valueOf(contentObj);
+ }
+
+ return "M[" + prefix + ", " + content + "]";
}
private static class SimpleCollector implements Collector<RowData> {