This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 99c00f39d [FLINK-38840][postgres] Postgres YAML connector supports 
emitting complete Table ID (#4209)
99c00f39d is described below

commit 99c00f39d9dc4d8b7db071befd5ca1f515e621da
Author: ouyangwulin <[email protected]>
AuthorDate: Wed Jan 7 09:14:44 2026 +0800

    [FLINK-38840][postgres] Postgres YAML connector supports emitting complete 
Table ID (#4209)
    
    added a new option `table-id.include-database` to emit (db, schema, table) 
style table id.
---
 .../connectors/pipeline-connectors/postgres.md     | 12 ++++++
 .../connectors/pipeline-connectors/postgres.md     | 12 ++++++
 .../factory/PostgresDataSourceFactory.java         |  5 +++
 .../postgres/source/PostgresDataSource.java        |  8 +++-
 .../postgres/source/PostgresDataSourceOptions.java |  9 ++++
 .../postgres/source/PostgresEventDeserializer.java | 27 +++++++++++-
 .../reader/PostgresPipelineRecordEmitter.java      | 49 +++++++++++-----------
 .../postgres/utils/PostgresSchemaUtils.java        | 20 +++++++--
 .../source/PostgresPipelineITCaseTest.java         | 38 ++++++++++-------
 .../postgres/source/PostgresSourceBuilder.java     |  6 +++
 .../source/config/PostgresSourceConfig.java        | 14 ++++++-
 .../source/config/PostgresSourceConfigFactory.java | 11 ++++-
 .../source/config/PostgresSourceOptions.java       |  9 ++++
 .../fetch/PostgresSourceFetchTaskContext.java      | 26 +++++++-----
 14 files changed, 188 insertions(+), 58 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index 50206761a..561b1efac 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -270,6 +270,18 @@ pipeline:
         此为实验性选项,默认值为 false。
       </td>
     </tr>
+    <tr>
+      <td>table-id.include-database</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        是否在生成的 Table ID 中包含数据库名称。<br>
+        如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。<br>
+        如果设置为 false,Table ID 的格式为 (模式, 表)。<br>
+        默认值为 false。
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index 03dd4e5a3..fa5ce5b10 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -262,6 +262,18 @@ pipeline:
         Experimental option, defaults to false.
       </td>
     </tr>
+    <tr>
+      <td>table-id.include-database</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        Whether to include database in the generated Table ID.<br>
+        If set to true, the Table ID will be in the format (database, schema, 
table).<br>
+        If set to false, the Table ID will be in the format (schema, 
table).<br>
+        Defaults to false.
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index 918d479f6..5e6c446c5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -80,6 +80,7 @@ import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME;
 import static 
org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
 import static 
org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -129,6 +130,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
         int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         int lsnCommitCheckpointsDelay = 
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
+        boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
 
         validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 
1);
         validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -169,6 +171,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
                         .skipSnapshotBackfill(skipSnapshotBackfill)
                         .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
                         .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
+                        .includeDatabaseInTableId(tableIdIncludeDatabase)
                         .getConfigFactory();
 
         List<TableId> tableIds = 
PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -197,6 +200,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
         String metadataList = config.get(METADATA_LIST);
         List<PostgreSQLReadableMetadata> readableMetadataList = 
listReadableMetadata(metadataList);
 
+        // Create a custom PostgresDataSource that passes the 
includeDatabaseInTableId flag
         return new PostgresDataSource(configFactory, readableMetadataList);
     }
 
@@ -257,6 +261,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
         options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
         options.add(METADATA_LIST);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
+        options.add(TABLE_ID_INCLUDE_DATABASE);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
index 767fb5fc3..6084df1f3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
@@ -65,8 +65,14 @@ public class PostgresDataSource implements DataSource {
 
     @Override
     public EventSourceProvider getEventSourceProvider() {
+        String databaseName = postgresSourceConfig.getDatabaseList().get(0);
+        boolean includeDatabaseInTableId = 
postgresSourceConfig.isIncludeDatabaseInTableId();
         DebeziumEventDeserializationSchema deserializer =
-                new PostgresEventDeserializer(DebeziumChangelogMode.ALL, 
readableMetadataList);
+                new PostgresEventDeserializer(
+                        DebeziumChangelogMode.ALL,
+                        readableMetadataList,
+                        includeDatabaseInTableId,
+                        databaseName);
 
         PostgresOffsetFactory postgresOffsetFactory = new 
PostgresOffsetFactory();
         PostgresDialect postgresDialect = new 
PostgresDialect(postgresSourceConfig);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
index ec51de8b6..7e9ac4b0c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
@@ -264,4 +264,13 @@ public class PostgresDataSourceOptions {
                             .defaultValue(false)
                             .withDescription(
                                     "Whether to assign the unbounded chunks 
first during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.  Defaults to false.");
+
+    public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
+            ConfigOptions.key("table-id.include-database")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to include database in the generated 
Table ID. "
+                                    + "If set to true, the Table ID will be in 
the format (database, schema, table). "
+                                    + "If set to false, the Table ID will be 
in the format (schema, table). Defaults to false.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
index a37c35c52..c31dbc73b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
@@ -50,18 +50,37 @@ public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchem
 
     private static final long serialVersionUID = 1L;
     private List<PostgreSQLReadableMetadata> readableMetadataList;
+    private final boolean includeDatabaseInTableId;
+    private final String databaseName;
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
-        super(new PostgresSchemaDataTypeInference(), changelogMode);
+        this(changelogMode, new ArrayList<>(), false, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList) {
+        this(changelogMode, readableMetadataList, false, null);
+    }
+
+    public PostgresEventDeserializer(
+            DebeziumChangelogMode changelogMode,
+            List<PostgreSQLReadableMetadata> readableMetadataList,
+            boolean includeDatabaseInTableId) {
+        this(changelogMode, readableMetadataList, includeDatabaseInTableId, 
null);
+    }
+
+    public PostgresEventDeserializer(
+            DebeziumChangelogMode changelogMode,
+            List<PostgreSQLReadableMetadata> readableMetadataList,
+            boolean includeDatabaseInTableId,
+            String databaseName) {
         super(new PostgresSchemaDataTypeInference(), changelogMode);
         this.readableMetadataList = readableMetadataList;
+        this.includeDatabaseInTableId = includeDatabaseInTableId;
+        this.databaseName = databaseName;
     }
 
     @Override
@@ -87,7 +106,11 @@ public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchem
     @Override
     protected TableId getTableId(SourceRecord record) {
         String[] parts = record.topic().split("\\.");
-        return TableId.tableId(parts[1], parts[2]);
+        if (includeDatabaseInTableId && databaseName != null) {
+            return TableId.tableId(databaseName, parts[1], parts[2]);
+        } else {
+            return TableId.tableId(parts[1], parts[2]);
+        }
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 02761d8f3..dd862354c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -61,6 +61,7 @@ import static 
io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
 import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+import static 
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
 
 /** The {@link RecordEmitter} implementation for PostgreSQL pipeline 
connector. */
 public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
@@ -73,6 +74,7 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
     // Used when startup mode is not initial
     private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
     private boolean isBounded = false;
+    private boolean includeDatabaseInTableId = false;
 
     private final Map<TableId, CreateTableEvent> createTableEventCache;
 
@@ -88,6 +90,7 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
                 sourceConfig.isIncludeSchemaChanges(),
                 offsetFactory);
         this.sourceConfig = sourceConfig;
+        this.includeDatabaseInTableId = 
sourceConfig.isIncludeDatabaseInTableId();
         this.postgresDialect = postgresDialect;
         this.alreadySendCreateTableTables = new HashSet<>();
         this.createTableEventCache =
@@ -103,10 +106,17 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
             // TableSchemas in SnapshotSplit only contains one table.
             
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
         } else {
-            for (TableChanges.TableChange tableChange : 
split.getTableSchemas().values()) {
+            for (Map.Entry<TableId, TableChanges.TableChange> entry :
+                    split.getTableSchemas().entrySet()) {
+                TableId tableId =
+                        entry.getKey(); // Use the TableId from the map key 
which contains full info
+                TableChanges.TableChange tableChange = entry.getValue();
                 CreateTableEvent createTableEvent =
                         new CreateTableEvent(
-                                toCdcTableId(tableChange.getId()),
+                                toCdcTableId(
+                                        tableId,
+                                        sourceConfig.getDatabaseList().get(0),
+                                        includeDatabaseInTableId),
                                 buildSchemaFromTable(tableChange.getTable()));
                 ((DebeziumEventDeserializationSchema) 
debeziumDeserializationSchema)
                         .applyChangeEvent(createTableEvent);
@@ -128,10 +138,8 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
         } else if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
             TableId tableId = 
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
             if (!alreadySendCreateTableTables.contains(tableId)) {
-                try (PostgresConnection jdbc = 
postgresDialect.openJdbcConnection()) {
-                    sendCreateTableEvent(jdbc, tableId, (SourceOutput<Event>) 
output);
-                    alreadySendCreateTableTables.add(tableId);
-                }
+                sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
+                alreadySendCreateTableTables.add(tableId);
             }
         } else {
             boolean isDataChangeRecord = isDataChangeRecord(element);
@@ -189,21 +197,8 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
         return tableBuilder.build();
     }
 
-    private void sendCreateTableEvent(
-            PostgresConnection jdbc, TableId tableId, SourceOutput<Event> 
output) {
-        Schema schema = PostgresSchemaUtils.getTableSchema(tableId, 
sourceConfig, jdbc);
-        output.collect(
-                new CreateTableEvent(
-                        org.apache.flink.cdc.common.event.TableId.tableId(
-                                tableId.schema(), tableId.table()),
-                        schema));
-    }
-
-    private org.apache.flink.cdc.common.event.TableId toCdcTableId(
-            io.debezium.relational.TableId dbzTableId) {
-        String schemaName =
-                dbzTableId.catalog() == null ? dbzTableId.schema() : 
dbzTableId.catalog();
-        return org.apache.flink.cdc.common.event.TableId.tableId(schemaName, 
dbzTableId.table());
+    private void sendCreateTableEvent(TableId tableId, SourceOutput<Event> 
output) {
+        output.collect(getCreateTableEvent(sourceConfig, tableId));
     }
 
     private CreateTableEvent getCreateTableEvent(
@@ -211,8 +206,10 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
         try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
             Schema schema = PostgresSchemaUtils.getTableSchema(tableId, 
sourceConfig, jdbc);
             return new CreateTableEvent(
-                    org.apache.flink.cdc.common.event.TableId.tableId(
-                            tableId.schema(), tableId.table()),
+                    toCdcTableId(
+                            tableId,
+                            sourceConfig.getDatabaseList().get(0),
+                            includeDatabaseInTableId),
                     schema);
         }
     }
@@ -244,8 +241,10 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
                 createTableEventCache.put(
                         tableId,
                         new CreateTableEvent(
-                                
org.apache.flink.cdc.common.event.TableId.tableId(
-                                        tableId.schema(), tableId.table()),
+                                toCdcTableId(
+                                        tableId,
+                                        
this.sourceConfig.getDatabaseList().get(0),
+                                        includeDatabaseInTableId),
                                 schema));
             }
             return createTableEventCache;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
index 0f3b05806..93a7099ac 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
@@ -208,12 +208,26 @@ public class PostgresSchemaUtils {
 
     public static io.debezium.relational.TableId toDbzTableId(TableId tableId) 
{
         return new io.debezium.relational.TableId(
-                tableId.getSchemaName(), null, tableId.getTableName());
+                tableId.getNamespace(), tableId.getSchemaName(), 
tableId.getTableName());
     }
 
     public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
             io.debezium.relational.TableId dbzTableId) {
-        return org.apache.flink.cdc.common.event.TableId.tableId(
-                dbzTableId.schema(), dbzTableId.table());
+        return toCdcTableId(dbzTableId, null, false);
+    }
+
+    public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
+            io.debezium.relational.TableId dbzTableId,
+            String databaseName,
+            boolean includeDatabaseInTableId) {
+        String schema = dbzTableId.schema();
+        String table = dbzTableId.table();
+        if (includeDatabaseInTableId && databaseName != null) {
+            return 
org.apache.flink.cdc.common.event.TableId.tableId(databaseName, schema, table);
+        } else if (schema != null) {
+            return org.apache.flink.cdc.common.event.TableId.tableId(schema, 
table);
+        } else {
+            return org.apache.flink.cdc.common.event.TableId.tableId(table);
+        }
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index bd6c46e25..b2aef9eed 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -57,7 +57,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +75,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static 
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
@@ -103,6 +105,14 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
         inventoryDatabase.removeSlot(slotName);
     }
 
+    static Stream<Arguments> provideParameters() {
+        return Stream.of(
+                Arguments.of(true, true),
+                Arguments.of(false, false),
+                Arguments.of(true, false),
+                Arguments.of(false, true));
+    }
+
     @Test
     public void testInitialStartupMode() throws Exception {
         inventoryDatabase.createAndInitialize();
@@ -341,9 +351,10 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
         return iterator;
     }
 
-    @ParameterizedTest(name = "unboundedChunkFirst: {0}")
-    @ValueSource(booleans = {true, false})
-    public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) 
throws Exception {
+    @ParameterizedTest
+    @MethodSource("provideParameters")
+    public void testInitialStartupModeWithOpts(
+            boolean unboundedChunkFirst, boolean isTableIdIncludeDatabase) 
throws Exception {
         inventoryDatabase.createAndInitialize();
         org.apache.flink.cdc.common.configuration.Configuration 
sourceConfiguration =
                 new org.apache.flink.cdc.common.configuration.Configuration();
@@ -365,6 +376,8 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
         sourceConfiguration.set(
                 
PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED,
                 unboundedChunkFirst);
+        sourceConfiguration.set(
+                PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE, 
isTableIdIncludeDatabase);
 
         Factory.Context context =
                 new FactoryHelper.DefaultContext(
@@ -384,7 +397,12 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
                                 new EventTypeInfo())
                         .executeAndCollect();
 
-        TableId tableId = TableId.tableId("inventory", "products");
+        TableId tableId;
+        if (isTableIdIncludeDatabase) {
+            tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), 
"inventory", "products");
+        } else {
+            tableId = TableId.tableId("inventory", "products");
+        }
         CreateTableEvent createTableEvent = 
getProductsCreateTableEvent(tableId);
 
         // generate snapshot data
@@ -582,16 +600,6 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
         return result;
     }
 
-    // Helper method to create a temporary directory for savepoint
-    private Path createTempSavepointDir() throws Exception {
-        return Files.createTempDirectory("postgres-savepoint");
-    }
-
-    // Helper method to execute the job and create a savepoint
-    private String createSavepoint(JobClient jobClient, Path savepointDir) 
throws Exception {
-        return jobClient.stopWithSavepoint(true, 
savepointDir.toAbsolutePath().toString()).get();
-    }
-
     private List<Event> getSnapshotExpected(TableId tableId) {
         RowType rowType =
                 RowType.of(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index bbd65b5e9..0f550ca96 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -305,6 +305,12 @@ public class PostgresSourceBuilder<T> {
         return this;
     }
 
+    /** Whether to include database in the generated Table ID. */
+    public PostgresSourceBuilder<T> includeDatabaseInTableId(boolean 
includeDatabaseInTableId) {
+        
this.configFactory.setIncludeDatabaseInTableId(includeDatabaseInTableId);
+        return this;
+    }
+
     /**
      * Build the {@link PostgresIncrementalSource}.
      *
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 302716128..4402219a6 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -39,6 +39,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
     private final int subtaskId;
     private final int lsnCommitCheckpointsDelay;
     private final boolean includePartitionedTables;
+    private final boolean includeDatabaseInTableId;
 
     public PostgresSourceConfig(
             int subtaskId,
@@ -69,7 +70,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
             boolean isScanNewlyAddedTableEnabled,
             int lsnCommitCheckpointsDelay,
             boolean assignUnboundedChunkFirst,
-            boolean includePartitionedTables) {
+            boolean includePartitionedTables,
+            boolean includeDatabaseInTableId) {
         super(
                 startupOptions,
                 databaseList,
@@ -100,6 +102,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
         this.subtaskId = subtaskId;
         this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
         this.includePartitionedTables = includePartitionedTables;
+        this.includeDatabaseInTableId = includeDatabaseInTableId;
     }
 
     /**
@@ -148,4 +151,13 @@ public class PostgresSourceConfig extends JdbcSourceConfig 
{
     public PostgresConnectorConfig getDbzConnectorConfig() {
         return new PostgresConnectorConfig(getDbzConfiguration());
     }
+
+    /**
+     * Returns whether to include database in the generated Table ID.
+     *
+     * @return whether to include database in the generated Table ID
+     */
+    public boolean isIncludeDatabaseInTableId() {
+        return includeDatabaseInTableId;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 670d4f37a..847b15474 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -54,6 +54,9 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
 
     private boolean includePartitionedTables;
 
+    private boolean includeDatabaseInTableId =
+            PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
+
     /** Creates a new {@link PostgresSourceConfig} for the given subtask 
{@code subtaskId}. */
     @Override
     public PostgresSourceConfig create(int subtaskId) {
@@ -136,7 +139,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 scanNewlyAddedTableEnabled,
                 lsnCommitCheckpointsDelay,
                 assignUnboundedChunkFirst,
-                includePartitionedTables);
+                includePartitionedTables,
+                includeDatabaseInTableId);
     }
 
     /**
@@ -189,4 +193,9 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     public void setIncludePartitionedTables(boolean includePartitionedTables) {
         this.includePartitionedTables = includePartitionedTables;
     }
+
+    /** Set whether to include database in the generated Table ID. */
+    public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
+        this.includeDatabaseInTableId = includeDatabaseInTableId;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
index f498c2645..db04ac142 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
@@ -97,4 +97,13 @@ public class PostgresSourceOptions extends JdbcSourceOptions 
{
                                     + "If enabled:\n"
                                     + "(1) PUBLICATION must be created 
beforehand with parameter publish_via_partition_root=true\n"
                                     + "(2) Table list (regex or predefined 
list) should only match the parent table name, if table list matches both 
parent and child tables, snapshot data will be read twice.");
+
+    public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
+            ConfigOptions.key("table-id.include-database")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to include database in the generated 
Table ID.\n"
+                                    + "If set to true, the Table ID will be in 
the format (database, schema, table).\n"
+                                    + "If set to false, the Table ID will be 
in the format (schema, table). Defaults to false.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index b3b3e778d..49b62cb39 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -132,9 +132,8 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                                     .edit()
                                     .with(
                                             "table.include.list",
-                                            ((SnapshotSplit) sourceSplitBase)
-                                                    .getTableId()
-                                                    .toString())
+                                            getTableList(
+                                                    ((SnapshotSplit) 
sourceSplitBase).getTableId()))
                                     .with(
                                             SLOT_NAME.name(),
                                             ((PostgresSourceConfig) 
sourceConfig)
@@ -151,13 +150,13 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                 // when backfilled split, only current table schema should be 
scan
                 builder.with(
                         "table.include.list",
-                        sourceSplitBase
-                                .asStreamSplit()
-                                .getTableSchemas()
-                                .keySet()
-                                .iterator()
-                                .next()
-                                .toString());
+                        getTableList(
+                                sourceSplitBase
+                                        .asStreamSplit()
+                                        .getTableSchemas()
+                                        .keySet()
+                                        .iterator()
+                                        .next()));
             }
 
             dbzConfig =
@@ -385,4 +384,11 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                 && !StreamSplit.STREAM_SPLIT_ID.equalsIgnoreCase(
                         sourceSplitBase.asStreamSplit().splitId());
     }
+
+    private String getTableList(TableId tableId) {
+        if (tableId.schema() == null || tableId.schema().isEmpty()) {
+            return tableId.table();
+        }
+        return tableId.schema() + "." + tableId.table();
+    }
 }


Reply via email to