This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3580bf414 [FLINK-38844][pipeline-connector][postgres]Add metadata 
column support (#4202)
3580bf414 is described below

commit 3580bf414d1cc4744336f050f0c2a759c7800cec
Author: Lanny Boarts <[email protected]>
AuthorDate: Thu Mar 12 09:51:22 2026 +0800

    [FLINK-38844][pipeline-connector][postgres]Add metadata column support 
(#4202)
---
 .../connectors/pipeline-connectors/postgres.md     |  76 ++++++++++++++-
 .../connectors/pipeline-connectors/postgres.md     |  76 ++++++++++++++-
 .../source/DatabaseNameMetadataColumn.java         |  52 ++++++++++
 .../postgres/source/OpTsMetadataColumn.java        |  51 ++++++++++
 .../postgres/source/PostgresDataSource.java        |  18 ++++
 .../postgres/source/SchemaNameMetadataColumn.java  |  52 ++++++++++
 .../postgres/source/TableNameMetadataColumn.java   |  51 ++++++++++
 .../factory/PostgresDataSourceFactoryTest.java     |  33 +++++++
 .../postgres/source/PostgresFullTypesITCase.java   | 106 +++++++++++++++++++++
 9 files changed, 511 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index 561b1efac..c3614401f 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -253,10 +253,10 @@ pipeline:
     <tr>
       <td>metadata.list</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>
-        源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。
+        
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts、table_name、database_name、schema_name。详见<a
 href="#支持的元数据列">支持的元数据列</a>。
       </td>
     </tr>
     <tr>
@@ -316,6 +316,78 @@ pipeline:
 注意:
 1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 
schema 名称, `table` 是实际的表名称。
 
+## 支持的元数据列
+
+PostgreSQL CDC 连接器支持从源记录中读取元数据列。这些元数据列可以在转换操作中使用或传递给下游 Sink。
+
+**注意:** 部分元数据信息也可以通过 Transform 表达式获取(例如 
`__namespace_name__`、`__schema_name__`、`__table_name__`)。主要区别如下:
+- **`op_ts`**:仅可通过 `metadata.list` 获取 - 提供数据库中实际的操作时间戳。
+- **`table_name`、`database_name`、`schema_name`**:可通过 `metadata.list` 或 
Transform 表达式获取。使用 `metadata.list` 可以直接将这些值传递给下游 Sink,无需编写转换规则,对于基本用例更加简单。
+
+要启用元数据列,请使用逗号分隔的元数据列名称列表配置 `metadata.list` 选项:
+
+```yaml
+source:
+  type: postgres
+  # ... 其他配置
+  metadata.list: op_ts,table_name,database_name,schema_name
+```
+
+支持以下元数据列:
+
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 20%">元数据列</th>
+        <th class="text-left" style="width: 15%">数据类型</th>
+        <th class="text-left" style="width: 65%">描述</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>op_ts</td>
+      <td>BIGINT NOT NULL</td>
+      <td>数据变更事件在数据库中发生的时间戳(自纪元以来的毫秒数)。对于快照记录,此值为 0。</td>
+    </tr>
+    <tr>
+      <td>table_name</td>
+      <td>STRING NOT NULL</td>
+      <td>包含变更行的表名称。替代方案:在 Transform 表达式中使用 <code>__table_name__</code>。</td>
+    </tr>
+    <tr>
+      <td>database_name</td>
+      <td>STRING NOT NULL</td>
+      <td>包含变更行的数据库名称。替代方案:在 Transform 表达式中使用 
<code>__namespace_name__</code>。</td>
+    </tr>
+    <tr>
+      <td>schema_name</td>
+      <td>STRING NOT NULL</td>
+      <td>包含变更行的 Schema 名称。这是 PostgreSQL 特有的。替代方案:在 Transform 表达式中使用 
<code>__schema_name__</code>。</td>
+    </tr>
+    </tbody>
+</table>
+</div>
+
+**使用示例:**
+
+```yaml
+source:
+  type: postgres
+  hostname: localhost
+  port: 5432
+  username: postgres
+  password: postgres
+  tables: mydb.public.orders
+  slot.name: flink_slot
+  metadata.list: op_ts,table_name,schema_name
+
+transform:
+  - source-table: mydb.public.orders
+    projection: order_id, customer_id, op_ts, table_name, schema_name
+    description: 在输出中包含元数据列
+```
+
 ## 数据类型映射
 
 <div class="wy-table-responsive">
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index fa5ce5b10..b660b991e 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -245,10 +245,10 @@ pipeline:
     <tr>
       <td>metadata.list</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
+      <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>
-        List of readable metadata from SourceRecord to be passed to downstream 
and could be used in transform module, split by `,`. Available readable 
metadata are: op_ts.
+        List of readable metadata from SourceRecord to be passed to downstream 
and could be used in transform module, split by `,`. Available readable 
metadata are: op_ts, table_name, database_name, schema_name. See <a 
href="#supported-metadata-columns">Supported Metadata Columns</a> for more 
details.
       </td>
     </tr>
     <tr>
@@ -311,6 +311,78 @@ Metrics can help understand the progress of assignments, 
and the following are t
 Notice:
 1. The group name is `namespace.schema.table`, where `namespace` is the actual 
database name, `schema` is the actual schema name, and `table` is the actual 
table name.
 
+## Supported Metadata Columns
+
+PostgreSQL CDC connector supports reading metadata columns from source 
records. These metadata columns can be used in transform operations or passed 
to downstream sinks.
+
+**Note:** Some metadata information is also available through Transform 
expressions (e.g., `__namespace_name__`, `__schema_name__`, `__table_name__`). 
The key differences are:
+- **`op_ts`**: Only available via `metadata.list` - provides the actual 
operation timestamp from the database.
+- **`table_name`, `database_name`, `schema_name`**: Can be obtained via either 
`metadata.list` or Transform expressions. Using `metadata.list` allows you to 
pass these values directly to downstream sinks without writing transform rules, 
which is simpler for basic use cases.
+
+To enable metadata columns, configure the `metadata.list` option with a 
comma-separated list of metadata column names:
+
+```yaml
+source:
+  type: postgres
+  # ... other configurations
+  metadata.list: op_ts,table_name,database_name,schema_name
+```
+
+The following metadata columns are supported:
+
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 20%">Metadata Column</th>
+        <th class="text-left" style="width: 15%">Data Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>op_ts</td>
+      <td>BIGINT NOT NULL</td>
+      <td>The timestamp (in milliseconds since epoch) when the change event 
occurred in the database. For snapshot records, this value is 0.</td>
+    </tr>
+    <tr>
+      <td>table_name</td>
+      <td>STRING NOT NULL</td>
+      <td>The name of the table that contains the changed row. Alternative: 
use <code>__table_name__</code> in Transform expressions.</td>
+    </tr>
+    <tr>
+      <td>database_name</td>
+      <td>STRING NOT NULL</td>
+      <td>The name of the database that contains the changed row. Alternative: 
use <code>__namespace_name__</code> in Transform expressions.</td>
+    </tr>
+    <tr>
+      <td>schema_name</td>
+      <td>STRING NOT NULL</td>
+      <td>The name of the schema that contains the changed row. This is 
specific to PostgreSQL. Alternative: use <code>__schema_name__</code> in 
Transform expressions.</td>
+    </tr>
+    </tbody>
+</table>
+</div>
+
+**Example Usage:**
+
+```yaml
+source:
+  type: postgres
+  hostname: localhost
+  port: 5432
+  username: postgres
+  password: postgres
+  tables: mydb.public.orders
+  slot.name: flink_slot
+  metadata.list: op_ts,table_name,schema_name
+
+transform:
+  - source-table: mydb.public.orders
+    projection: order_id, customer_id, op_ts, table_name, schema_name
+    description: Include metadata columns in output
+```
+
 ## Data Type Mapping
 
 <div class="wy-table-responsive">
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java
new file mode 100644
index 000000000..25f7141ba
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for database_name. */
+public class DatabaseNameMetadataColumn implements SupportedMetadataColumn {
+
+    @Override
+    public String getName() {
+        return "database_name";
+    }
+
+    @Override
+    public DataType getType() {
+        return DataTypes.STRING().notNull();
+    }
+
+    @Override
+    public Class<?> getJavaClass() {
+        return String.class;
+    }
+
+    @Override
+    public Object read(Map<String, String> metadata) {
+        if (metadata.containsKey(getName())) {
+            return metadata.get(getName());
+        }
+        throw new IllegalArgumentException(
+                "database_name doesn't exist in the metadata: " + metadata);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java
new file mode 100644
index 000000000..9a9217969
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for op_ts. */
+public class OpTsMetadataColumn implements SupportedMetadataColumn {
+
+    @Override
+    public String getName() {
+        return "op_ts";
+    }
+
+    @Override
+    public DataType getType() {
+        return DataTypes.BIGINT().notNull();
+    }
+
+    @Override
+    public Class<?> getJavaClass() {
+        return Long.class;
+    }
+
+    @Override
+    public Object read(Map<String, String> metadata) {
+        if (metadata.containsKey(getName())) {
+            return Long.parseLong(metadata.get(getName()));
+        }
+        throw new IllegalArgumentException("op_ts doesn't exist in the 
metadata: " + metadata);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
index 6084df1f3..c3055198a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.common.source.EventSourceProvider;
 import org.apache.flink.cdc.common.source.FlinkSourceProvider;
 import org.apache.flink.cdc.common.source.MetadataAccessor;
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
 import org.apache.flink.cdc.connectors.base.config.SourceConfig;
 import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
@@ -98,6 +99,23 @@ public class PostgresDataSource implements DataSource {
         return postgresSourceConfig;
     }
 
+    @Override
+    public SupportedMetadataColumn[] supportedMetadataColumns() {
+        return new SupportedMetadataColumn[] {
+            new OpTsMetadataColumn(),
+            new TableNameMetadataColumn(),
+            new DatabaseNameMetadataColumn(),
+            new SchemaNameMetadataColumn()
+        };
+    }
+
+    @Override
+    public boolean isParallelMetadataSource() {
+        // During incremental stage, PostgreSQL never emits schema change 
events on different
+        // partitions (since it has one WAL stream only.)
+        return false;
+    }
+
     /** The {@link JdbcIncrementalSource} implementation for Postgres. */
     public static class PostgresPipelineSource<T>
             extends PostgresSourceBuilder.PostgresIncrementalSource<T> {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java
new file mode 100644
index 000000000..ac61a090a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for schema_name. */
+public class SchemaNameMetadataColumn implements SupportedMetadataColumn {
+
+    @Override
+    public String getName() {
+        return "schema_name";
+    }
+
+    @Override
+    public DataType getType() {
+        return DataTypes.STRING().notNull();
+    }
+
+    @Override
+    public Class<?> getJavaClass() {
+        return String.class;
+    }
+
+    @Override
+    public Object read(Map<String, String> metadata) {
+        if (metadata.containsKey(getName())) {
+            return metadata.get(getName());
+        }
+        throw new IllegalArgumentException(
+                "schema_name doesn't exist in the metadata: " + metadata);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java
new file mode 100644
index 000000000..f2ad0fb8a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for table_name. */
+public class TableNameMetadataColumn implements SupportedMetadataColumn {
+
+    @Override
+    public String getName() {
+        return "table_name";
+    }
+
+    @Override
+    public DataType getType() {
+        return DataTypes.STRING().notNull();
+    }
+
+    @Override
+    public Class<?> getJavaClass() {
+        return String.class;
+    }
+
+    @Override
+    public Object read(Map<String, String> metadata) {
+        if (metadata.containsKey(getName())) {
+            return metadata.get(getName());
+        }
+        throw new IllegalArgumentException("table_name doesn't exist in the 
metadata: " + metadata);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
index 473c2d61c..338ef6642 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java
@@ -21,8 +21,13 @@ import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.configuration.ConfigOption;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
 import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
+import 
org.apache.flink.cdc.connectors.postgres.source.DatabaseNameMetadataColumn;
+import org.apache.flink.cdc.connectors.postgres.source.OpTsMetadataColumn;
 import org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource;
+import 
org.apache.flink.cdc.connectors.postgres.source.SchemaNameMetadataColumn;
+import org.apache.flink.cdc.connectors.postgres.source.TableNameMetadataColumn;
 import org.apache.flink.table.api.ValidationException;
 
 import org.junit.jupiter.api.AfterEach;
@@ -305,6 +310,34 @@ public class PostgresDataSourceFactoryTest extends 
PostgresTestBase {
                 .hasMessageContaining("Cannot find any table by the option 
'tables'");
     }
 
+    @Test
+    public void testSupportedMetadataColumns() {
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
+        options.put(
+                PG_PORT.key(), 
String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), POSTGRES_CONTAINER.getDatabaseName() + 
".inventory.prod\\.*");
+        options.put(SLOT_NAME.key(), slotName);
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
+        PostgresDataSource dataSource = (PostgresDataSource) 
factory.createDataSource(context);
+
+        SupportedMetadataColumn[] metadataColumns = 
dataSource.supportedMetadataColumns();
+        assertThat(metadataColumns).hasSize(4);
+        assertThat(metadataColumns[0]).isInstanceOf(OpTsMetadataColumn.class);
+        assertThat(metadataColumns[0].getName()).isEqualTo("op_ts");
+        
assertThat(metadataColumns[1]).isInstanceOf(TableNameMetadataColumn.class);
+        assertThat(metadataColumns[1].getName()).isEqualTo("table_name");
+        
assertThat(metadataColumns[2]).isInstanceOf(DatabaseNameMetadataColumn.class);
+        assertThat(metadataColumns[2].getName()).isEqualTo("database_name");
+        
assertThat(metadataColumns[3]).isInstanceOf(SchemaNameMetadataColumn.class);
+        assertThat(metadataColumns[3].getName()).isEqualTo("schema_name");
+
+    }
+
     class MockContext implements Factory.Context {
 
         Configuration factoryConfiguration;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
index d397fb14f..6112741e6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.cdc.connectors.base.options.StartupOptions;
 import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
 import 
org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
 import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
@@ -820,6 +821,111 @@ public class PostgresFullTypesITCase extends 
PostgresTestBase {
         Assertions.assertThat(recordFields(snapshotRecord, 
JSON_TYPES)).isEqualTo(expectedSnapshot);
     }
 
+    @Test
+    public void testAllMetadataColumns() throws Exception {
+        initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
+        List<PostgreSQLReadableMetadata> metadataList = new ArrayList<>();
+        metadataList.add(PostgreSQLReadableMetadata.OP_TS);
+        metadataList.add(PostgreSQLReadableMetadata.DATABASE_NAME);
+        metadataList.add(PostgreSQLReadableMetadata.SCHEMA_NAME);
+        metadataList.add(PostgreSQLReadableMetadata.TABLE_NAME);
+        // Note: ROW_KIND is not included because it requires RowData and 
cannot be read from
+        // SourceRecord
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGIS_CONTAINER.getHost())
+                                
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
+                                .tableList("inventory.full_types")
+                                .startupOptions(StartupOptions.initial())
+                                .serverTimeZone("UTC");
+        configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new PostgresDataSource(configFactory, metadataList)
+                                .getEventSourceProvider();
+
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 
1).f0;
+        DataChangeEvent snapshotEvent = (DataChangeEvent) 
snapshotResults.get(0);
+
+        // Verify all metadata columns exist in snapshot phase
+        Map<String, String> metadata = snapshotEvent.meta();
+
+        // Verify op_ts metadata
+        // According to PostgreSQLReadableMetadata.OP_TS documentation:
+        // "If the record is read from snapshot of the table instead of the 
change stream,
+        // the value is always 0."
+        Assertions.assertThat(metadata).containsKey("op_ts");
+        Long opTs = Long.parseLong(metadata.get("op_ts"));
+        Assertions.assertThat(opTs).isEqualTo(0L);
+
+        // Verify database_name metadata
+        Assertions.assertThat(metadata).containsKey("database_name");
+        Assertions.assertThat(metadata.get("database_name"))
+                .isEqualTo(POSTGRES_CONTAINER.getDatabaseName());
+
+        // Verify schema_name metadata
+        Assertions.assertThat(metadata).containsKey("schema_name");
+        
Assertions.assertThat(metadata.get("schema_name")).isEqualTo("inventory");
+
+        // Verify table_name metadata
+        Assertions.assertThat(metadata).containsKey("table_name");
+        
Assertions.assertThat(metadata.get("table_name")).isEqualTo("full_types");
+
+        // Insert a new row to test incremental phase
+        try (Connection connection =
+                        PostgresTestBase.getJdbcConnection(POSTGIS_CONTAINER, 
"postgres");
+                Statement statement = connection.createStatement()) {
+            // Use a simpler INSERT statement that only includes basic 
required columns
+            statement.execute(
+                    "INSERT INTO inventory.full_types (id, small_c, int_c, 
big_c, "
+                            + "real_c, double_precision, boolean_c, text_c, 
username, status) VALUES ("
+                            + "2, 100, 200, 300, 1.1, 2.2, true, 'test', 
'testuser', 'pending')");
+        }
+
+        // Fetch the incremental event
+        List<Event> incrementalResults = 
fetchResultsAndCreateTableEvent(events, 1).f0;
+        DataChangeEvent incrementalEvent = (DataChangeEvent) 
incrementalResults.get(0);
+
+        // Verify all metadata columns in incremental phase
+        Map<String, String> incrementalMetadata = incrementalEvent.meta();
+
+        // Verify op_ts metadata in incremental phase
+        // In incremental phase (change stream), op_ts should be a valid 
timestamp > 0
+        Assertions.assertThat(incrementalMetadata).containsKey("op_ts");
+        Long incrementalOpTs = 
Long.parseLong(incrementalMetadata.get("op_ts"));
+        Assertions.assertThat(incrementalOpTs)
+                .as("op_ts in incremental phase should be greater than 0")
+                .isGreaterThan(0L);
+
+        // Verify database_name metadata in incremental phase
+        
Assertions.assertThat(incrementalMetadata).containsKey("database_name");
+        Assertions.assertThat(incrementalMetadata.get("database_name"))
+                .isEqualTo(POSTGRES_CONTAINER.getDatabaseName());
+
+        // Verify schema_name metadata in incremental phase
+        Assertions.assertThat(incrementalMetadata).containsKey("schema_name");
+        
Assertions.assertThat(incrementalMetadata.get("schema_name")).isEqualTo("inventory");
+
+        // Verify table_name metadata in incremental phase
+        Assertions.assertThat(incrementalMetadata).containsKey("table_name");
+        
Assertions.assertThat(incrementalMetadata.get("table_name")).isEqualTo("full_types");
+    }
+
     @Test
     public void testArrayTypes() throws Exception {
         initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");

Reply via email to