This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 99c00f39d [FLINK-38840][postgres] Postgres YAML connector supports
emitting complete Table ID (#4209)
99c00f39d is described below
commit 99c00f39d9dc4d8b7db071befd5ca1f515e621da
Author: ouyangwulin <[email protected]>
AuthorDate: Wed Jan 7 09:14:44 2026 +0800
[FLINK-38840][postgres] Postgres YAML connector supports emitting complete
Table ID (#4209)
added a new option `table-id.include-database` to emit (db, schema, table)
style table id.
---
.../connectors/pipeline-connectors/postgres.md | 12 ++++++
.../connectors/pipeline-connectors/postgres.md | 12 ++++++
.../factory/PostgresDataSourceFactory.java | 5 +++
.../postgres/source/PostgresDataSource.java | 8 +++-
.../postgres/source/PostgresDataSourceOptions.java | 9 ++++
.../postgres/source/PostgresEventDeserializer.java | 27 +++++++++++-
.../reader/PostgresPipelineRecordEmitter.java | 49 +++++++++++-----------
.../postgres/utils/PostgresSchemaUtils.java | 20 +++++++--
.../source/PostgresPipelineITCaseTest.java | 38 ++++++++++-------
.../postgres/source/PostgresSourceBuilder.java | 6 +++
.../source/config/PostgresSourceConfig.java | 14 ++++++-
.../source/config/PostgresSourceConfigFactory.java | 11 ++++-
.../source/config/PostgresSourceOptions.java | 9 ++++
.../fetch/PostgresSourceFetchTaskContext.java | 26 +++++++-----
14 files changed, 188 insertions(+), 58 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index 50206761a..561b1efac 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -270,6 +270,18 @@ pipeline:
此为实验性选项,默认值为 false。
</td>
</tr>
+ <tr>
+ <td>table-id.include-database</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ 是否在生成的 Table ID 中包含数据库名称。<br>
+ 如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。<br>
+ 如果设置为 false,Table ID 的格式为 (模式, 表)。<br>
+ 默认值为 false。
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index 03dd4e5a3..fa5ce5b10 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -262,6 +262,18 @@ pipeline:
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>table-id.include-database</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to include database in the generated Table ID.<br>
+ If set to true, the Table ID will be in the format (database, schema,
table).<br>
+ If set to false, the Table ID will be in the format (schema,
table).<br>
+ Defaults to false.
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index 918d479f6..5e6c446c5 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -80,6 +80,7 @@ import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE;
+import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME;
import static
org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static
org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -129,6 +130,7 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
int lsnCommitCheckpointsDelay =
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
+ boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize,
1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -169,6 +171,7 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
.skipSnapshotBackfill(skipSnapshotBackfill)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
+ .includeDatabaseInTableId(tableIdIncludeDatabase)
.getConfigFactory();
List<TableId> tableIds =
PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -197,6 +200,7 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
String metadataList = config.get(METADATA_LIST);
List<PostgreSQLReadableMetadata> readableMetadataList =
listReadableMetadata(metadataList);
+ // Create a custom PostgresDataSource that passes the
includeDatabaseInTableId flag
return new PostgresDataSource(configFactory, readableMetadataList);
}
@@ -257,6 +261,7 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
options.add(METADATA_LIST);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
+ options.add(TABLE_ID_INCLUDE_DATABASE);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
index 767fb5fc3..6084df1f3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java
@@ -65,8 +65,14 @@ public class PostgresDataSource implements DataSource {
@Override
public EventSourceProvider getEventSourceProvider() {
+ String databaseName = postgresSourceConfig.getDatabaseList().get(0);
+ boolean includeDatabaseInTableId =
postgresSourceConfig.isIncludeDatabaseInTableId();
DebeziumEventDeserializationSchema deserializer =
- new PostgresEventDeserializer(DebeziumChangelogMode.ALL,
readableMetadataList);
+ new PostgresEventDeserializer(
+ DebeziumChangelogMode.ALL,
+ readableMetadataList,
+ includeDatabaseInTableId,
+ databaseName);
PostgresOffsetFactory postgresOffsetFactory = new
PostgresOffsetFactory();
PostgresDialect postgresDialect = new
PostgresDialect(postgresSourceConfig);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
index ec51de8b6..7e9ac4b0c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
@@ -264,4 +264,13 @@ public class PostgresDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks
first during snapshot reading phase. This might help reduce the risk of the
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of
the largest unbounded chunk. Defaults to false.");
+
+ public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
+ ConfigOptions.key("table-id.include-database")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to include database in the generated
Table ID. "
+ + "If set to true, the Table ID will be in
the format (database, schema, table). "
+ + "If set to false, the Table ID will be
in the format (schema, table). Defaults to false.");
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
index a37c35c52..c31dbc73b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java
@@ -50,18 +50,37 @@ public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchem
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
+ private final boolean includeDatabaseInTableId;
+ private final String databaseName;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- super(new PostgresSchemaDataTypeInference(), changelogMode);
+ this(changelogMode, new ArrayList<>(), false, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
+ this(changelogMode, readableMetadataList, false, null);
+ }
+
+ public PostgresEventDeserializer(
+ DebeziumChangelogMode changelogMode,
+ List<PostgreSQLReadableMetadata> readableMetadataList,
+ boolean includeDatabaseInTableId) {
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ }
+
+ public PostgresEventDeserializer(
+ DebeziumChangelogMode changelogMode,
+ List<PostgreSQLReadableMetadata> readableMetadataList,
+ boolean includeDatabaseInTableId,
+ String databaseName) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
+ this.includeDatabaseInTableId = includeDatabaseInTableId;
+ this.databaseName = databaseName;
}
@Override
@@ -87,7 +106,11 @@ public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchem
@Override
protected TableId getTableId(SourceRecord record) {
String[] parts = record.topic().split("\\.");
- return TableId.tableId(parts[1], parts[2]);
+ if (includeDatabaseInTableId && databaseName != null) {
+ return TableId.tableId(databaseName, parts[1], parts[2]);
+ } else {
+ return TableId.tableId(parts[1], parts[2]);
+ }
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 02761d8f3..dd862354c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -61,6 +61,7 @@ import static
io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+import static
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
/** The {@link RecordEmitter} implementation for PostgreSQL pipeline
connector. */
public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmitter<T> {
@@ -73,6 +74,7 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
// Used when startup mode is not initial
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
private boolean isBounded = false;
+ private boolean includeDatabaseInTableId = false;
private final Map<TableId, CreateTableEvent> createTableEventCache;
@@ -88,6 +90,7 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
sourceConfig.isIncludeSchemaChanges(),
offsetFactory);
this.sourceConfig = sourceConfig;
+ this.includeDatabaseInTableId =
sourceConfig.isIncludeDatabaseInTableId();
this.postgresDialect = postgresDialect;
this.alreadySendCreateTableTables = new HashSet<>();
this.createTableEventCache =
@@ -103,10 +106,17 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
// TableSchemas in SnapshotSplit only contains one table.
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
} else {
- for (TableChanges.TableChange tableChange :
split.getTableSchemas().values()) {
+ for (Map.Entry<TableId, TableChanges.TableChange> entry :
+ split.getTableSchemas().entrySet()) {
+ TableId tableId =
+ entry.getKey(); // Use the TableId from the map key
which contains full info
+ TableChanges.TableChange tableChange = entry.getValue();
CreateTableEvent createTableEvent =
new CreateTableEvent(
- toCdcTableId(tableChange.getId()),
+ toCdcTableId(
+ tableId,
+ sourceConfig.getDatabaseList().get(0),
+ includeDatabaseInTableId),
buildSchemaFromTable(tableChange.getTable()));
((DebeziumEventDeserializationSchema)
debeziumDeserializationSchema)
.applyChangeEvent(createTableEvent);
@@ -128,10 +138,8 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
} else if (isLowWatermarkEvent(element) &&
splitState.isSnapshotSplitState()) {
TableId tableId =
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
if (!alreadySendCreateTableTables.contains(tableId)) {
- try (PostgresConnection jdbc =
postgresDialect.openJdbcConnection()) {
- sendCreateTableEvent(jdbc, tableId, (SourceOutput<Event>)
output);
- alreadySendCreateTableTables.add(tableId);
- }
+ sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
+ alreadySendCreateTableTables.add(tableId);
}
} else {
boolean isDataChangeRecord = isDataChangeRecord(element);
@@ -189,21 +197,8 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
return tableBuilder.build();
}
- private void sendCreateTableEvent(
- PostgresConnection jdbc, TableId tableId, SourceOutput<Event>
output) {
- Schema schema = PostgresSchemaUtils.getTableSchema(tableId,
sourceConfig, jdbc);
- output.collect(
- new CreateTableEvent(
- org.apache.flink.cdc.common.event.TableId.tableId(
- tableId.schema(), tableId.table()),
- schema));
- }
-
- private org.apache.flink.cdc.common.event.TableId toCdcTableId(
- io.debezium.relational.TableId dbzTableId) {
- String schemaName =
- dbzTableId.catalog() == null ? dbzTableId.schema() :
dbzTableId.catalog();
- return org.apache.flink.cdc.common.event.TableId.tableId(schemaName,
dbzTableId.table());
+ private void sendCreateTableEvent(TableId tableId, SourceOutput<Event>
output) {
+ output.collect(getCreateTableEvent(sourceConfig, tableId));
}
private CreateTableEvent getCreateTableEvent(
@@ -211,8 +206,10 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId,
sourceConfig, jdbc);
return new CreateTableEvent(
- org.apache.flink.cdc.common.event.TableId.tableId(
- tableId.schema(), tableId.table()),
+ toCdcTableId(
+ tableId,
+ sourceConfig.getDatabaseList().get(0),
+ includeDatabaseInTableId),
schema);
}
}
@@ -244,8 +241,10 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
createTableEventCache.put(
tableId,
new CreateTableEvent(
-
org.apache.flink.cdc.common.event.TableId.tableId(
- tableId.schema(), tableId.table()),
+ toCdcTableId(
+ tableId,
+
this.sourceConfig.getDatabaseList().get(0),
+ includeDatabaseInTableId),
schema));
}
return createTableEventCache;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
index 0f3b05806..93a7099ac 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java
@@ -208,12 +208,26 @@ public class PostgresSchemaUtils {
public static io.debezium.relational.TableId toDbzTableId(TableId tableId)
{
return new io.debezium.relational.TableId(
- tableId.getSchemaName(), null, tableId.getTableName());
+ tableId.getNamespace(), tableId.getSchemaName(),
tableId.getTableName());
}
public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
io.debezium.relational.TableId dbzTableId) {
- return org.apache.flink.cdc.common.event.TableId.tableId(
- dbzTableId.schema(), dbzTableId.table());
+ return toCdcTableId(dbzTableId, null, false);
+ }
+
+ public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
+ io.debezium.relational.TableId dbzTableId,
+ String databaseName,
+ boolean includeDatabaseInTableId) {
+ String schema = dbzTableId.schema();
+ String table = dbzTableId.table();
+ if (includeDatabaseInTableId && databaseName != null) {
+ return
org.apache.flink.cdc.common.event.TableId.tableId(databaseName, schema, table);
+ } else if (schema != null) {
+ return org.apache.flink.cdc.common.event.TableId.tableId(schema,
table);
+ } else {
+ return org.apache.flink.cdc.common.event.TableId.tableId(table);
+ }
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index bd6c46e25..b2aef9eed 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -57,7 +57,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
@@ -103,6 +105,14 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
inventoryDatabase.removeSlot(slotName);
}
+ static Stream<Arguments> provideParameters() {
+ return Stream.of(
+ Arguments.of(true, true),
+ Arguments.of(false, false),
+ Arguments.of(true, false),
+ Arguments.of(false, true));
+ }
+
@Test
public void testInitialStartupMode() throws Exception {
inventoryDatabase.createAndInitialize();
@@ -341,9 +351,10 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
return iterator;
}
- @ParameterizedTest(name = "unboundedChunkFirst: {0}")
- @ValueSource(booleans = {true, false})
- public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst)
throws Exception {
+ @ParameterizedTest
+ @MethodSource("provideParameters")
+ public void testInitialStartupModeWithOpts(
+ boolean unboundedChunkFirst, boolean isTableIdIncludeDatabase)
throws Exception {
inventoryDatabase.createAndInitialize();
org.apache.flink.cdc.common.configuration.Configuration
sourceConfiguration =
new org.apache.flink.cdc.common.configuration.Configuration();
@@ -365,6 +376,8 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
sourceConfiguration.set(
PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED,
unboundedChunkFirst);
+ sourceConfiguration.set(
+ PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE,
isTableIdIncludeDatabase);
Factory.Context context =
new FactoryHelper.DefaultContext(
@@ -384,7 +397,12 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
new EventTypeInfo())
.executeAndCollect();
- TableId tableId = TableId.tableId("inventory", "products");
+ TableId tableId;
+ if (isTableIdIncludeDatabase) {
+ tableId = TableId.tableId(inventoryDatabase.getDatabaseName(),
"inventory", "products");
+ } else {
+ tableId = TableId.tableId("inventory", "products");
+ }
CreateTableEvent createTableEvent =
getProductsCreateTableEvent(tableId);
// generate snapshot data
@@ -582,16 +600,6 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
return result;
}
- // Helper method to create a temporary directory for savepoint
- private Path createTempSavepointDir() throws Exception {
- return Files.createTempDirectory("postgres-savepoint");
- }
-
- // Helper method to execute the job and create a savepoint
- private String createSavepoint(JobClient jobClient, Path savepointDir)
throws Exception {
- return jobClient.stopWithSavepoint(true,
savepointDir.toAbsolutePath().toString()).get();
- }
-
private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index bbd65b5e9..0f550ca96 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -305,6 +305,12 @@ public class PostgresSourceBuilder<T> {
return this;
}
+ /** Whether to include database in the generated Table ID. */
+ public PostgresSourceBuilder<T> includeDatabaseInTableId(boolean
includeDatabaseInTableId) {
+
this.configFactory.setIncludeDatabaseInTableId(includeDatabaseInTableId);
+ return this;
+ }
+
/**
* Build the {@link PostgresIncrementalSource}.
*
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 302716128..4402219a6 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -39,6 +39,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private final int subtaskId;
private final int lsnCommitCheckpointsDelay;
private final boolean includePartitionedTables;
+ private final boolean includeDatabaseInTableId;
public PostgresSourceConfig(
int subtaskId,
@@ -69,7 +70,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst,
- boolean includePartitionedTables) {
+ boolean includePartitionedTables,
+ boolean includeDatabaseInTableId) {
super(
startupOptions,
databaseList,
@@ -100,6 +102,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
this.subtaskId = subtaskId;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.includePartitionedTables = includePartitionedTables;
+ this.includeDatabaseInTableId = includeDatabaseInTableId;
}
/**
@@ -148,4 +151,13 @@ public class PostgresSourceConfig extends JdbcSourceConfig
{
public PostgresConnectorConfig getDbzConnectorConfig() {
return new PostgresConnectorConfig(getDbzConfiguration());
}
+
+ /**
+ * Returns whether to include database in the generated Table ID.
+ *
+ * @return whether to include database in the generated Table ID
+ */
+ public boolean isIncludeDatabaseInTableId() {
+ return includeDatabaseInTableId;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 670d4f37a..847b15474 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -54,6 +54,9 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
private boolean includePartitionedTables;
+ private boolean includeDatabaseInTableId =
+ PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
+
/** Creates a new {@link PostgresSourceConfig} for the given subtask
{@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
@@ -136,7 +139,8 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
- includePartitionedTables);
+ includePartitionedTables,
+ includeDatabaseInTableId);
}
/**
@@ -189,4 +193,9 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
public void setIncludePartitionedTables(boolean includePartitionedTables) {
this.includePartitionedTables = includePartitionedTables;
}
+
+ /** Set whether to include database in the generated Table ID. */
+ public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
+ this.includeDatabaseInTableId = includeDatabaseInTableId;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
index f498c2645..db04ac142 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
@@ -97,4 +97,13 @@ public class PostgresSourceOptions extends JdbcSourceOptions
{
+ "If enabled:\n"
+ "(1) PUBLICATION must be created
beforehand with parameter publish_via_partition_root=true\n"
+ "(2) Table list (regex or predefined
list) should only match the parent table name, if table list matches both
parent and child tables, snapshot data will be read twice.");
+
+ public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
+ ConfigOptions.key("table-id.include-database")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to include database in the generated
Table ID.\n"
+ + "If set to true, the Table ID will be in
the format (database, schema, table).\n"
+ + "If set to false, the Table ID will be
in the format (schema, table). Defaults to false.");
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index b3b3e778d..49b62cb39 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -132,9 +132,8 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
.edit()
.with(
"table.include.list",
- ((SnapshotSplit) sourceSplitBase)
- .getTableId()
- .toString())
+ getTableList(
+ ((SnapshotSplit)
sourceSplitBase).getTableId()))
.with(
SLOT_NAME.name(),
((PostgresSourceConfig)
sourceConfig)
@@ -151,13 +150,13 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
// when backfilled split, only current table schema should be
scan
builder.with(
"table.include.list",
- sourceSplitBase
- .asStreamSplit()
- .getTableSchemas()
- .keySet()
- .iterator()
- .next()
- .toString());
+ getTableList(
+ sourceSplitBase
+ .asStreamSplit()
+ .getTableSchemas()
+ .keySet()
+ .iterator()
+ .next()));
}
dbzConfig =
@@ -385,4 +384,11 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
&& !StreamSplit.STREAM_SPLIT_ID.equalsIgnoreCase(
sourceSplitBase.asStreamSplit().splitId());
}
+
+ private String getTableList(TableId tableId) {
+ if (tableId.schema() == null || tableId.schema().isEmpty()) {
+ return tableId.table();
+ }
+ return tableId.schema() + "." + tableId.table();
+ }
}