This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 3580bf414 [FLINK-38844][pipeline-connector][postgres]Add metadata
column support (#4202)
3580bf414 is described below
commit 3580bf414d1cc4744336f050f0c2a759c7800cec
Author: Lanny Boarts <[email protected]>
AuthorDate: Thu Mar 12 09:51:22 2026 +0800
[FLINK-38844][pipeline-connector][postgres]Add metadata column support
(#4202)
---
.../connectors/pipeline-connectors/postgres.md | 76 ++++++++++++++-
.../connectors/pipeline-connectors/postgres.md | 76 ++++++++++++++-
.../source/DatabaseNameMetadataColumn.java | 52 ++++++++++
.../postgres/source/OpTsMetadataColumn.java | 51 ++++++++++
.../postgres/source/PostgresDataSource.java | 18 ++++
.../postgres/source/SchemaNameMetadataColumn.java | 52 ++++++++++
.../postgres/source/TableNameMetadataColumn.java | 51 ++++++++++
.../factory/PostgresDataSourceFactoryTest.java | 33 +++++++
.../postgres/source/PostgresFullTypesITCase.java | 106 +++++++++++++++++++++
9 files changed, 511 insertions(+), 4 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index 561b1efac..c3614401f 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -253,10 +253,10 @@ pipeline:
<tr>
<td>metadata.list</td>
<td>optional</td>
- <td style="word-wrap: break-word;">false</td>
+ <td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
- 源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。
+
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts、table_name、database_name、schema_name。详见<a
href="#支持的元数据列">支持的元数据列</a>。
</td>
</tr>
<tr>
@@ -316,6 +316,78 @@ pipeline:
注意:
1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的
schema 名称, `table` 是实际的表名称。
+## 支持的元数据列
+
+PostgreSQL CDC 连接器支持从源记录中读取元数据列。这些元数据列可以在转换操作中使用或传递给下游 Sink。
+
+**注意:** 部分元数据信息也可以通过 Transform 表达式获取(例如
`__namespace_name__`、`__schema_name__`、`__table_name__`)。主要区别如下:
+- **`op_ts`**:仅可通过 `metadata.list` 获取 - 提供数据库中实际的操作时间戳。
+- **`table_name`、`database_name`、`schema_name`**:可通过 `metadata.list` 或
Transform 表达式获取。使用 `metadata.list` 可以直接将这些值传递给下游 Sink,无需编写转换规则,对于基本用例更加简单。
+
+要启用元数据列,请使用逗号分隔的元数据列名称列表配置 `metadata.list` 选项:
+
+```yaml
+source:
+ type: postgres
+ # ... 其他配置
+ metadata.list: op_ts,table_name,database_name,schema_name
+```
+
+支持以下元数据列:
+
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">元数据列</th>
+ <th class="text-left" style="width: 15%">数据类型</th>
+ <th class="text-left" style="width: 65%">描述</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>op_ts</td>
+ <td>BIGINT NOT NULL</td>
+ <td>数据变更事件在数据库中发生的时间戳(自纪元以来的毫秒数)。对于快照记录,此值为 0。</td>
+ </tr>
+ <tr>
+ <td>table_name</td>
+ <td>STRING NOT NULL</td>
+ <td>包含变更行的表名称。替代方案:在 Transform 表达式中使用 <code>__table_name__</code>。</td>
+ </tr>
+ <tr>
+ <td>database_name</td>
+ <td>STRING NOT NULL</td>
+ <td>包含变更行的数据库名称。替代方案:在 Transform 表达式中使用
<code>__namespace_name__</code>。</td>
+ </tr>
+ <tr>
+ <td>schema_name</td>
+ <td>STRING NOT NULL</td>
+ <td>包含变更行的 Schema 名称。这是 PostgreSQL 特有的。替代方案:在 Transform 表达式中使用
<code>__schema_name__</code>。</td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+**使用示例:**
+
+```yaml
+source:
+ type: postgres
+ hostname: localhost
+ port: 5432
+ username: postgres
+ password: postgres
+ tables: mydb.public.orders
+ slot.name: flink_slot
+ metadata.list: op_ts,table_name,schema_name
+
+transform:
+ - source-table: mydb.public.orders
+ projection: order_id, customer_id, op_ts, table_name, schema_name
+ description: 在输出中包含元数据列
+```
+
## 数据类型映射
<div class="wy-table-responsive">
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index fa5ce5b10..b660b991e 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -245,10 +245,10 @@ pipeline:
<tr>
<td>metadata.list</td>
<td>optional</td>
- <td style="word-wrap: break-word;">false</td>
+ <td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
- List of readable metadata from SourceRecord to be passed to downstream
and could be used in transform module, split by `,`. Available readable
metadata are: op_ts.
+ List of readable metadata from SourceRecord to be passed to downstream
and could be used in transform module, split by `,`. Available readable
metadata are: op_ts, table_name, database_name, schema_name. See <a
href="#supported-metadata-columns">Supported Metadata Columns</a> for more
details.
</td>
</tr>
<tr>
@@ -311,6 +311,78 @@ Metrics can help understand the progress of assignments,
and the following are t
Notice:
1. The group name is `namespace.schema.table`, where `namespace` is the actual
database name, `schema` is the actual schema name, and `table` is the actual
table name.
+## Supported Metadata Columns
+
+PostgreSQL CDC connector supports reading metadata columns from source
records. These metadata columns can be used in transform operations or passed
to downstream sinks.
+
+**Note:** Some metadata information is also available through Transform
expressions (e.g., `__namespace_name__`, `__schema_name__`, `__table_name__`).
The key differences are:
+- **`op_ts`**: Only available via `metadata.list` - provides the actual
operation timestamp from the database.
+- **`table_name`, `database_name`, `schema_name`**: Can be obtained via either
`metadata.list` or Transform expressions. Using `metadata.list` allows you to
pass these values directly to downstream sinks without writing transform rules,
which is simpler for basic use cases.
+
+To enable metadata columns, configure the `metadata.list` option with a
comma-separated list of metadata column names:
+
+```yaml
+source:
+ type: postgres
+ # ... other configurations
+ metadata.list: op_ts,table_name,database_name,schema_name
+```
+
+The following metadata columns are supported:
+
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Metadata Column</th>
+ <th class="text-left" style="width: 15%">Data Type</th>
+ <th class="text-left" style="width: 65%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>op_ts</td>
+ <td>BIGINT NOT NULL</td>
+ <td>The timestamp (in milliseconds since epoch) when the change event
occurred in the database. For snapshot records, this value is 0.</td>
+ </tr>
+ <tr>
+ <td>table_name</td>
+ <td>STRING NOT NULL</td>
+ <td>The name of the table that contains the changed row. Alternative:
use <code>__table_name__</code> in Transform expressions.</td>
+ </tr>
+ <tr>
+ <td>database_name</td>
+ <td>STRING NOT NULL</td>
+ <td>The name of the database that contains the changed row. Alternative:
use <code>__namespace_name__</code> in Transform expressions.</td>
+ </tr>
+ <tr>
+ <td>schema_name</td>
+ <td>STRING NOT NULL</td>
+ <td>The name of the schema that contains the changed row. This is
specific to PostgreSQL. Alternative: use <code>__schema_name__</code> in
Transform expressions.</td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+**Example Usage:**
+
+```yaml
+source:
+ type: postgres
+ hostname: localhost
+ port: 5432
+ username: postgres
+ password: postgres
+ tables: mydb.public.orders
+ slot.name: flink_slot
+ metadata.list: op_ts,table_name,schema_name
+
+transform:
+ - source-table: mydb.public.orders
+ projection: order_id, customer_id, op_ts, table_name, schema_name
+ description: Include metadata columns in output
+```
+
## Data Type Mapping
<div class="wy-table-responsive">
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java
new file mode 100644
index 000000000..25f7141ba
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for database_name. */
+public class DatabaseNameMetadataColumn implements SupportedMetadataColumn {
+
+ @Override
+ public String getName() {
+ return "database_name";
+ }
+
+ @Override
+ public DataType getType() {
+ return DataTypes.STRING().notNull();
+ }
+
+ @Override
+ public Class<?> getJavaClass() {
+ return String.class;
+ }
+
+ @Override
+ public Object read(Map<String, String> metadata) {
+ if (metadata.containsKey(getName())) {
+ return metadata.get(getName());
+ }
+ throw new IllegalArgumentException(
+ "database_name doesn't exist in the metadata: " + metadata);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java
new file mode 100644
index 000000000..9a9217969
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for op_ts. */
+public class OpTsMetadataColumn implements SupportedMetadataColumn {
+
+ @Override
+ public String getName() {
+ return "op_ts";
+ }
+
+ @Override
+ public DataType getType() {
+ return DataTypes.BIGINT().notNull();
+ }
+
+ @Override
+ public Class<?> getJavaClass() {
+ return Long.class;
+ }
+
+ @Override
+ public Object read(Map<String, String> metadata) {
+ if (metadata.containsKey(getName())) {
+ return Long.parseLong(metadata.get(getName()));
+ }
+ throw new IllegalArgumentException("op_ts doesn't exist in the
metadata: " + metadata);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
index 6084df1f3..c3055198a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.source.MetadataAccessor;
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
@@ -98,6 +99,23 @@ public class PostgresDataSource implements DataSource {
return postgresSourceConfig;
}
+ @Override
+ public SupportedMetadataColumn[] supportedMetadataColumns() {
+ return new SupportedMetadataColumn[] {
+ new OpTsMetadataColumn(),
+ new TableNameMetadataColumn(),
+ new DatabaseNameMetadataColumn(),
+ new SchemaNameMetadataColumn()
+ };
+ }
+
+ @Override
+ public boolean isParallelMetadataSource() {
+ // During incremental stage, PostgreSQL never emits schema change
events on different
+ // partitions (since it has one WAL stream only.)
+ return false;
+ }
+
/** The {@link JdbcIncrementalSource} implementation for Postgres. */
public static class PostgresPipelineSource<T>
extends PostgresSourceBuilder.PostgresIncrementalSource<T> {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java
new file mode 100644
index 000000000..ac61a090a
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for schema_name. */
+public class SchemaNameMetadataColumn implements SupportedMetadataColumn {
+
+ @Override
+ public String getName() {
+ return "schema_name";
+ }
+
+ @Override
+ public DataType getType() {
+ return DataTypes.STRING().notNull();
+ }
+
+ @Override
+ public Class<?> getJavaClass() {
+ return String.class;
+ }
+
+ @Override
+ public Object read(Map<String, String> metadata) {
+ if (metadata.containsKey(getName())) {
+ return metadata.get(getName());
+ }
+ throw new IllegalArgumentException(
+ "schema_name doesn't exist in the metadata: " + metadata);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java
new file mode 100644
index 000000000..f2ad0fb8a
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for table_name. */
+public class TableNameMetadataColumn implements SupportedMetadataColumn {
+
+ @Override
+ public String getName() {
+ return "table_name";
+ }
+
+ @Override
+ public DataType getType() {
+ return DataTypes.STRING().notNull();
+ }
+
+ @Override
+ public Class<?> getJavaClass() {
+ return String.class;
+ }
+
+ @Override
+ public Object read(Map<String, String> metadata) {
+ if (metadata.containsKey(getName())) {
+ return metadata.get(getName());
+ }
+ throw new IllegalArgumentException("table_name doesn't exist in the
metadata: " + metadata);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
index 473c2d61c..338ef6642 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
@@ -21,8 +21,13 @@ import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
+import
org.apache.flink.cdc.connectors.postgres.source.DatabaseNameMetadataColumn;
+import org.apache.flink.cdc.connectors.postgres.source.OpTsMetadataColumn;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource;
+import
org.apache.flink.cdc.connectors.postgres.source.SchemaNameMetadataColumn;
+import org.apache.flink.cdc.connectors.postgres.source.TableNameMetadataColumn;
import org.apache.flink.table.api.ValidationException;
import org.junit.jupiter.api.AfterEach;
@@ -305,6 +310,34 @@ public class PostgresDataSourceFactoryTest extends
PostgresTestBase {
.hasMessageContaining("Cannot find any table by the option
'tables'");
}
+ @Test
+ public void testSupportedMetadataColumns() {
+ Map<String, String> options = new HashMap<>();
+ options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
+ options.put(
+ PG_PORT.key(),
String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
+ options.put(USERNAME.key(), TEST_USER);
+ options.put(PASSWORD.key(), TEST_PASSWORD);
+ options.put(TABLES.key(), POSTGRES_CONTAINER.getDatabaseName() +
".inventory.prod\\.*");
+ options.put(SLOT_NAME.key(), slotName);
+ Factory.Context context = new
MockContext(Configuration.fromMap(options));
+
+ PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
+ PostgresDataSource dataSource = (PostgresDataSource)
factory.createDataSource(context);
+
+ SupportedMetadataColumn[] metadataColumns =
dataSource.supportedMetadataColumns();
+ assertThat(metadataColumns).hasSize(4);
+ assertThat(metadataColumns[0]).isInstanceOf(OpTsMetadataColumn.class);
+ assertThat(metadataColumns[0].getName()).isEqualTo("op_ts");
+
assertThat(metadataColumns[1]).isInstanceOf(TableNameMetadataColumn.class);
+ assertThat(metadataColumns[1].getName()).isEqualTo("table_name");
+
assertThat(metadataColumns[2]).isInstanceOf(DatabaseNameMetadataColumn.class);
+ assertThat(metadataColumns[2].getName()).isEqualTo("database_name");
+
assertThat(metadataColumns[3]).isInstanceOf(SchemaNameMetadataColumn.class);
+ assertThat(metadataColumns[3].getName()).isEqualTo("schema_name");
+
+ }
+
class MockContext implements Factory.Context {
Configuration factoryConfiguration;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index d397fb14f..6112741e6 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -40,6 +40,7 @@ import
org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
import
org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import
org.apache.flink.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
@@ -820,6 +821,111 @@ public class PostgresFullTypesITCase extends
PostgresTestBase {
Assertions.assertThat(recordFields(snapshotRecord,
JSON_TYPES)).isEqualTo(expectedSnapshot);
}
+ @Test
+ public void testAllMetadataColumns() throws Exception {
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+ List<PostgreSQLReadableMetadata> metadataList = new ArrayList<>();
+ metadataList.add(PostgreSQLReadableMetadata.OP_TS);
+ metadataList.add(PostgreSQLReadableMetadata.DATABASE_NAME);
+ metadataList.add(PostgreSQLReadableMetadata.SCHEMA_NAME);
+ metadataList.add(PostgreSQLReadableMetadata.TABLE_NAME);
+ // Note: ROW_KIND is not included because it requires RowData and
cannot be read from
+ // SourceRecord
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGIS_CONTAINER.getHost())
+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+ .tableList("inventory.full_types")
+ .startupOptions(StartupOptions.initial())
+ .serverTimeZone("UTC");
+ configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new PostgresDataSource(configFactory, metadataList)
+ .getEventSourceProvider();
+
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events,
1).f0;
+ DataChangeEvent snapshotEvent = (DataChangeEvent)
snapshotResults.get(0);
+
+ // Verify all metadata columns exist in snapshot phase
+ Map<String, String> metadata = snapshotEvent.meta();
+
+ // Verify op_ts metadata
+ // According to PostgreSQLReadableMetadata.OP_TS documentation:
+ // "If the record is read from snapshot of the table instead of the
change stream,
+ // the value is always 0."
+ Assertions.assertThat(metadata).containsKey("op_ts");
+ Long opTs = Long.parseLong(metadata.get("op_ts"));
+ Assertions.assertThat(opTs).isEqualTo(0L);
+
+ // Verify database_name metadata
+ Assertions.assertThat(metadata).containsKey("database_name");
+ Assertions.assertThat(metadata.get("database_name"))
+ .isEqualTo(POSTGRES_CONTAINER.getDatabaseName());
+
+ // Verify schema_name metadata
+ Assertions.assertThat(metadata).containsKey("schema_name");
+
Assertions.assertThat(metadata.get("schema_name")).isEqualTo("inventory");
+
+ // Verify table_name metadata
+ Assertions.assertThat(metadata).containsKey("table_name");
+
Assertions.assertThat(metadata.get("table_name")).isEqualTo("full_types");
+
+ // Insert a new row to test incremental phase
+ try (Connection connection =
+ PostgresTestBase.getJdbcConnection(POSTGIS_CONTAINER,
"postgres");
+ Statement statement = connection.createStatement()) {
+ // Use a simpler INSERT statement that only includes basic
required columns
+ statement.execute(
+ "INSERT INTO inventory.full_types (id, small_c, int_c,
big_c, "
+ + "real_c, double_precision, boolean_c, text_c,
username, status) VALUES ("
+ + "2, 100, 200, 300, 1.1, 2.2, true, 'test',
'testuser', 'pending')");
+ }
+
+ // Fetch the incremental event
+ List<Event> incrementalResults =
fetchResultsAndCreateTableEvent(events, 1).f0;
+ DataChangeEvent incrementalEvent = (DataChangeEvent)
incrementalResults.get(0);
+
+ // Verify all metadata columns in incremental phase
+ Map<String, String> incrementalMetadata = incrementalEvent.meta();
+
+ // Verify op_ts metadata in incremental phase
+ // In incremental phase (change stream), op_ts should be a valid
timestamp > 0
+ Assertions.assertThat(incrementalMetadata).containsKey("op_ts");
+ Long incrementalOpTs =
Long.parseLong(incrementalMetadata.get("op_ts"));
+ Assertions.assertThat(incrementalOpTs)
+ .as("op_ts in incremental phase should be greater than 0")
+ .isGreaterThan(0L);
+
+ // Verify database_name metadata in incremental phase
+
Assertions.assertThat(incrementalMetadata).containsKey("database_name");
+ Assertions.assertThat(incrementalMetadata.get("database_name"))
+ .isEqualTo(POSTGRES_CONTAINER.getDatabaseName());
+
+ // Verify schema_name metadata in incremental phase
+ Assertions.assertThat(incrementalMetadata).containsKey("schema_name");
+
Assertions.assertThat(incrementalMetadata.get("schema_name")).isEqualTo("inventory");
+
+ // Verify table_name metadata in incremental phase
+ Assertions.assertThat(incrementalMetadata).containsKey("table_name");
+
Assertions.assertThat(incrementalMetadata.get("table_name")).isEqualTo("full_types");
+ }
+
@Test
public void testArrayTypes() throws Exception {
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");