Copilot commented on code in PR #60560: URL: https://github.com/apache/doris/pull/60560#discussion_r2771936633
########## fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java: ########## @@ -0,0 +1,932 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.spi.SlotCreationResult; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcConnectionException; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.core.ServerVersion; +import org.postgresql.replication.PGReplicationStream; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.postgresql.util.PSQLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.time.Duration; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.Math.toIntExact; + +/** + * Copied from Flink Cdc 3.5.0 + * + * <p>Line 192~199: add publish_via_partition_root for partition table. + */ +public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { + + private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); + + private final String slotName; + private final String publicationName; + private final RelationalTableFilters tableFilter; + private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; + private final PostgresConnectorConfig.LogicalDecoder plugin; + private final boolean dropSlotOnClose; + private final PostgresConnectorConfig connectorConfig; + private final Duration statusUpdateInterval; + private final MessageDecoder messageDecoder; + private final PostgresConnection jdbcConnection; + private final TypeRegistry typeRegistry; + private final Properties streamParams; + + private Lsn defaultStartingPos; + private SlotCreationResult slotCreationInfo; + private boolean hasInitedSlot; + + private Lsn endingPos; + + /** + * Creates a new replication connection with the given params. + * + * @param config the JDBC configuration for the connection; may not be null + * @param slotName the name of the DB slot for logical replication; may not be null + * @param publicationName the name of the DB publication for logical replication; may not be + * null + * @param tableFilter the tables to watch of the DB publication for logical replication; may not + * be null + * @param publicationAutocreateMode the mode for publication autocreation; may not be null + * @param plugin decoder matching the server side plug-in used for streaming changes; may not be + * null + * @param dropSlotOnClose whether the replication slot should be dropped once the connection is + * closed + * @param statusUpdateInterval the interval at which the replication connection should + * periodically send status + * @param doSnapshot whether the connector is doing snapshot + * @param jdbcConnection general PostgreSQL JDBC connection + * @param typeRegistry registry with PostgreSQL types + * @param streamParams additional parameters to pass to the replication stream + * @param schema the schema; must not be null + * <p>updates to the server + */ + private PostgresReplicationConnection( + PostgresConnectorConfig config, + String slotName, + String publicationName, + RelationalTableFilters tableFilter, + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, + PostgresConnectorConfig.LogicalDecoder plugin, + boolean dropSlotOnClose, + boolean doSnapshot, + Duration statusUpdateInterval, + PostgresConnection jdbcConnection, + TypeRegistry typeRegistry, + Properties streamParams, + PostgresSchema schema) { + super( + addDefaultSettings(config.getJdbcConfig()), + PostgresConnection.FACTORY, + null, + null, + "\"", + "\""); + + this.connectorConfig = config; + this.slotName = slotName; + this.publicationName = publicationName; + this.tableFilter = tableFilter; + this.publicationAutocreateMode = publicationAutocreateMode; + this.plugin = plugin; + this.dropSlotOnClose = dropSlotOnClose; + this.statusUpdateInterval = statusUpdateInterval; + this.messageDecoder = + plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); + this.jdbcConnection = jdbcConnection; + this.typeRegistry = typeRegistry; + this.streamParams = streamParams; + this.slotCreationInfo = null; + this.hasInitedSlot = false; + } + + private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) { + // first copy the parent's default settings... + // then set some additional replication specific settings + return JdbcConfiguration.adapt( + PostgresConnection.addDefaultSettings( + configuration, PostgresConnection.CONNECTION_STREAMING) + .edit() + .with("replication", "database") + .with( + "preferQueryMode", + "simple") // replication protocol only supports simple query mode + .build()); + } + + private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException { + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) { + return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName()); + } + } + + protected void initPublication() { + String createPublicationStmt; + String tableFilterString = null; + if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) { + LOGGER.info("Initializing PgOutput logical decoder publication"); + try { + // Unless the autocommit is disabled the SELECT publication query will stay running + Connection conn = pgConnection(); + conn.setAutoCommit(false); + + String selectPublication = + String.format( + "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", + publicationName); + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(selectPublication)) { + if (rs.next()) { + Long count = rs.getLong(1); + // Close eagerly as the transaction might stay running + if (count == 0L) { + LOGGER.info( + "Creating new publication '{}' for plugin '{}'", + publicationName, + plugin); + switch (publicationAutocreateMode) { + case DISABLED: + throw new ConnectException( + "Publication autocreation is disabled, please create one and restart the connector."); + case ALL_TABLES: + boolean supportPartitionRoot = ((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13); + createPublicationStmt = supportPartitionRoot + ? String.format( + "CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);", + publicationName) + : String.format( + "CREATE PUBLICATION %s FOR ALL TABLES;", + publicationName); + LOGGER.info( + "Creating Publication with statement '{}'", + createPublicationStmt); + // Publication doesn't exist, create it. + stmt.execute(createPublicationStmt); + break; + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, false); + break; + } + } else { + switch (publicationAutocreateMode) { + case FILTERED: + createOrUpdatePublicationModeFilterted( Review Comment: The method name contains a spelling error: "Filterred" should be "Filtered". This typo appears in the method call createOrUpdatePublicationModeFilterted and should be corrected to createOrUpdatePublicationModeFiltered for better code readability and maintainability. ```suggestion createOrUpdatePublicationModeFiltered( tableFilterString, stmt, false); break; } } else { switch (publicationAutocreateMode) { case FILTERED: createOrUpdatePublicationModeFiltered( ``` ########## fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java: ########## @@ -0,0 +1,932 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.spi.SlotCreationResult; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcConnectionException; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.core.ServerVersion; +import org.postgresql.replication.PGReplicationStream; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.postgresql.util.PSQLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.time.Duration; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.Math.toIntExact; + +/** + * Copied from Flink Cdc 3.5.0 + * + * <p>Line 192~199: add publish_via_partition_root for partition table. + */ +public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { + + private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); + + private final String slotName; + private final String publicationName; + private final RelationalTableFilters tableFilter; + private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; + private final PostgresConnectorConfig.LogicalDecoder plugin; + private final boolean dropSlotOnClose; + private final PostgresConnectorConfig connectorConfig; + private final Duration statusUpdateInterval; + private final MessageDecoder messageDecoder; + private final PostgresConnection jdbcConnection; + private final TypeRegistry typeRegistry; + private final Properties streamParams; + + private Lsn defaultStartingPos; + private SlotCreationResult slotCreationInfo; + private boolean hasInitedSlot; + + private Lsn endingPos; + + /** + * Creates a new replication connection with the given params. + * + * @param config the JDBC configuration for the connection; may not be null + * @param slotName the name of the DB slot for logical replication; may not be null + * @param publicationName the name of the DB publication for logical replication; may not be + * null + * @param tableFilter the tables to watch of the DB publication for logical replication; may not + * be null + * @param publicationAutocreateMode the mode for publication autocreation; may not be null + * @param plugin decoder matching the server side plug-in used for streaming changes; may not be + * null + * @param dropSlotOnClose whether the replication slot should be dropped once the connection is + * closed + * @param statusUpdateInterval the interval at which the replication connection should + * periodically send status + * @param doSnapshot whether the connector is doing snapshot + * @param jdbcConnection general PostgreSQL JDBC connection + * @param typeRegistry registry with PostgreSQL types + * @param streamParams additional parameters to pass to the replication stream + * @param schema the schema; must not be null + * <p>updates to the server + */ + private PostgresReplicationConnection( + PostgresConnectorConfig config, + String slotName, + String publicationName, + RelationalTableFilters tableFilter, + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, + PostgresConnectorConfig.LogicalDecoder plugin, + boolean dropSlotOnClose, + boolean doSnapshot, + Duration statusUpdateInterval, + PostgresConnection jdbcConnection, + TypeRegistry typeRegistry, + Properties streamParams, + PostgresSchema schema) { + super( + addDefaultSettings(config.getJdbcConfig()), + PostgresConnection.FACTORY, + null, + null, + "\"", + "\""); + + this.connectorConfig = config; + this.slotName = slotName; + this.publicationName = publicationName; + this.tableFilter = tableFilter; + this.publicationAutocreateMode = publicationAutocreateMode; + this.plugin = plugin; + this.dropSlotOnClose = dropSlotOnClose; + this.statusUpdateInterval = statusUpdateInterval; + this.messageDecoder = + plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); + this.jdbcConnection = jdbcConnection; + this.typeRegistry = typeRegistry; + this.streamParams = streamParams; + this.slotCreationInfo = null; + this.hasInitedSlot = false; + } + + private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) { + // first copy the parent's default settings... + // then set some additional replication specific settings + return JdbcConfiguration.adapt( + PostgresConnection.addDefaultSettings( + configuration, PostgresConnection.CONNECTION_STREAMING) + .edit() + .with("replication", "database") + .with( + "preferQueryMode", + "simple") // replication protocol only supports simple query mode + .build()); + } + + private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException { + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) { + return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName()); + } + } + + protected void initPublication() { + String createPublicationStmt; + String tableFilterString = null; + if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) { + LOGGER.info("Initializing PgOutput logical decoder publication"); + try { + // Unless the autocommit is disabled the SELECT publication query will stay running + Connection conn = pgConnection(); + conn.setAutoCommit(false); + + String selectPublication = + String.format( + "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", + publicationName); + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(selectPublication)) { + if (rs.next()) { + Long count = rs.getLong(1); + // Close eagerly as the transaction might stay running + if (count == 0L) { + LOGGER.info( + "Creating new publication '{}' for plugin '{}'", + publicationName, + plugin); + switch (publicationAutocreateMode) { + case DISABLED: + throw new ConnectException( + "Publication autocreation is disabled, please create one and restart the connector."); + case ALL_TABLES: + boolean supportPartitionRoot = ((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13); + createPublicationStmt = supportPartitionRoot + ? String.format( + "CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);", + publicationName) + : String.format( + "CREATE PUBLICATION %s FOR ALL TABLES;", + publicationName); + LOGGER.info( + "Creating Publication with statement '{}'", + createPublicationStmt); + // Publication doesn't exist, create it. + stmt.execute(createPublicationStmt); + break; + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, false); + break; + } + } else { + switch (publicationAutocreateMode) { + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, true); + break; + default: + LOGGER.trace( + "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server " + + "and will be used by the plugin", + publicationName, + plugin, + database()); + } + } + } + } + conn.commit(); + conn.setAutoCommit(true); + } catch (SQLException e) { + throw new JdbcConnectionException(e); + } + } + } + + private void createOrUpdatePublicationModeFilterted( + String tableFilterString, Statement stmt, boolean isUpdate) { + String createOrUpdatePublicationStmt; + try { + Set<TableId> tablesToCapture = determineCapturedTables(); + tableFilterString = + tablesToCapture.stream() + .map(TableId::toDoubleQuotedString) + .collect(Collectors.joining(", ")); + if (tableFilterString.isEmpty()) { + throw new DebeziumException( + String.format( + "No table filters found for filtered publication %s", + publicationName)); + } + createOrUpdatePublicationStmt = + isUpdate + ? String.format( + "ALTER PUBLICATION %s SET TABLE %s;", + publicationName, tableFilterString) + : String.format( + "CREATE PUBLICATION %s FOR TABLE %s;", + publicationName, tableFilterString); Review Comment: When creating publications in FILTERED mode, the publish_via_partition_root option is not being applied. This means partition tables won't be properly synchronized when using table filters. The FILTERED mode should also check for PostgreSQL 13+ and include publish_via_partition_root option in the CREATE PUBLICATION statement, similar to how it's done for ALL_TABLES mode on lines 192-199. This could cause data synchronization issues with partition tables when using filtered publications. ```suggestion BaseConnection baseConnection = connection().unwrap(BaseConnection.class); boolean isPg13OrLater = baseConnection != null && baseConnection.haveMinimumServerVersion(ServerVersion.v13); String publicationOptions = !isUpdate && isPg13OrLater ? " WITH (publish_via_partition_root = true)" : ""; createOrUpdatePublicationStmt = isUpdate ? String.format( "ALTER PUBLICATION %s SET TABLE %s;", publicationName, tableFilterString) : String.format( "CREATE PUBLICATION %s FOR TABLE %s%s;", publicationName, tableFilterString, publicationOptions); ``` ########## fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java: ########## @@ -0,0 +1,932 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.spi.SlotCreationResult; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcConnectionException; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.core.ServerVersion; +import org.postgresql.replication.PGReplicationStream; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.postgresql.util.PSQLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.time.Duration; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.Math.toIntExact; + +/** + * Copied from Flink Cdc 3.5.0 + * + * <p>Line 192~199: add publish_via_partition_root for partition table. + */ +public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { + + private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); + + private final String slotName; + private final String publicationName; + private final RelationalTableFilters tableFilter; + private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; + private final PostgresConnectorConfig.LogicalDecoder plugin; + private final boolean dropSlotOnClose; + private final PostgresConnectorConfig connectorConfig; + private final Duration statusUpdateInterval; + private final MessageDecoder messageDecoder; + private final PostgresConnection jdbcConnection; + private final TypeRegistry typeRegistry; + private final Properties streamParams; + + private Lsn defaultStartingPos; + private SlotCreationResult slotCreationInfo; + private boolean hasInitedSlot; + + private Lsn endingPos; + + /** + * Creates a new replication connection with the given params. + * + * @param config the JDBC configuration for the connection; may not be null + * @param slotName the name of the DB slot for logical replication; may not be null + * @param publicationName the name of the DB publication for logical replication; may not be + * null + * @param tableFilter the tables to watch of the DB publication for logical replication; may not + * be null + * @param publicationAutocreateMode the mode for publication autocreation; may not be null + * @param plugin decoder matching the server side plug-in used for streaming changes; may not be + * null + * @param dropSlotOnClose whether the replication slot should be dropped once the connection is + * closed + * @param statusUpdateInterval the interval at which the replication connection should + * periodically send status + * @param doSnapshot whether the connector is doing snapshot + * @param jdbcConnection general PostgreSQL JDBC connection + * @param typeRegistry registry with PostgreSQL types + * @param streamParams additional parameters to pass to the replication stream + * @param schema the schema; must not be null + * <p>updates to the server + */ + private PostgresReplicationConnection( + PostgresConnectorConfig config, + String slotName, + String publicationName, + RelationalTableFilters tableFilter, + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, + PostgresConnectorConfig.LogicalDecoder plugin, + boolean dropSlotOnClose, + boolean doSnapshot, + Duration statusUpdateInterval, + PostgresConnection jdbcConnection, + TypeRegistry typeRegistry, + Properties streamParams, + PostgresSchema schema) { + super( + addDefaultSettings(config.getJdbcConfig()), + PostgresConnection.FACTORY, + null, + null, + "\"", + "\""); + + this.connectorConfig = config; + this.slotName = slotName; + this.publicationName = publicationName; + this.tableFilter = tableFilter; + this.publicationAutocreateMode = publicationAutocreateMode; + this.plugin = plugin; + this.dropSlotOnClose = dropSlotOnClose; + this.statusUpdateInterval = statusUpdateInterval; + this.messageDecoder = + plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); + this.jdbcConnection = jdbcConnection; + this.typeRegistry = typeRegistry; + this.streamParams = streamParams; + this.slotCreationInfo = null; + this.hasInitedSlot = false; + } + + private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) { + // first copy the parent's default settings... + // then set some additional replication specific settings + return JdbcConfiguration.adapt( + PostgresConnection.addDefaultSettings( + configuration, PostgresConnection.CONNECTION_STREAMING) + .edit() + .with("replication", "database") + .with( + "preferQueryMode", + "simple") // replication protocol only supports simple query mode + .build()); + } + + private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException { + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) { + return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName()); + } + } + + protected void initPublication() { + String createPublicationStmt; + String tableFilterString = null; + if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) { + LOGGER.info("Initializing PgOutput logical decoder publication"); + try { + // Unless the autocommit is disabled the SELECT publication query will stay running + Connection conn = pgConnection(); + conn.setAutoCommit(false); + + String selectPublication = + String.format( + "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", + publicationName); + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(selectPublication)) { + if (rs.next()) { + Long count = rs.getLong(1); + // Close eagerly as the transaction might stay running + if (count == 0L) { + LOGGER.info( + "Creating new publication '{}' for plugin '{}'", + publicationName, + plugin); + switch (publicationAutocreateMode) { + case DISABLED: + throw new ConnectException( + "Publication autocreation is disabled, please create one and restart the connector."); + case ALL_TABLES: + boolean supportPartitionRoot = ((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13); + createPublicationStmt = supportPartitionRoot + ? String.format( + "CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);", + publicationName) + : String.format( + "CREATE PUBLICATION %s FOR ALL TABLES;", + publicationName); + LOGGER.info( + "Creating Publication with statement '{}'", + createPublicationStmt); + // Publication doesn't exist, create it. + stmt.execute(createPublicationStmt); + break; + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, false); + break; + } + } else { + switch (publicationAutocreateMode) { + case FILTERED: + createOrUpdatePublicationModeFilterted( Review Comment: The method name contains a spelling error: "Filterred" should be "Filtered". This typo appears in the method call createOrUpdatePublicationModeFilterted and should be corrected to createOrUpdatePublicationModeFiltered for better code readability and maintainability. ```suggestion createOrUpdatePublicationModeFiltered( tableFilterString, stmt, false); break; } } else { switch (publicationAutocreateMode) { case FILTERED: createOrUpdatePublicationModeFiltered( ``` ########## fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java: ########## @@ -0,0 +1,932 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.spi.SlotCreationResult; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcConnectionException; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.core.ServerVersion; +import org.postgresql.replication.PGReplicationStream; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.postgresql.util.PSQLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.time.Duration; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.Math.toIntExact; + +/** + * Copied from Flink Cdc 3.5.0 + * + * <p>Line 192~199: add publish_via_partition_root for partition table. + */ +public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { + + private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); + + private final String slotName; + private final String publicationName; + private final RelationalTableFilters tableFilter; + private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; + private final PostgresConnectorConfig.LogicalDecoder plugin; + private final boolean dropSlotOnClose; + private final PostgresConnectorConfig connectorConfig; + private final Duration statusUpdateInterval; + private final MessageDecoder messageDecoder; + private final PostgresConnection jdbcConnection; + private final TypeRegistry typeRegistry; + private final Properties streamParams; + + private Lsn defaultStartingPos; + private SlotCreationResult slotCreationInfo; + private boolean hasInitedSlot; + + private Lsn endingPos; + + /** + * Creates a new replication connection with the given params. + * + * @param config the JDBC configuration for the connection; may not be null + * @param slotName the name of the DB slot for logical replication; may not be null + * @param publicationName the name of the DB publication for logical replication; may not be + * null + * @param tableFilter the tables to watch of the DB publication for logical replication; may not + * be null + * @param publicationAutocreateMode the mode for publication autocreation; may not be null + * @param plugin decoder matching the server side plug-in used for streaming changes; may not be + * null + * @param dropSlotOnClose whether the replication slot should be dropped once the connection is + * closed + * @param statusUpdateInterval the interval at which the replication connection should + * periodically send status + * @param doSnapshot whether the connector is doing snapshot + * @param jdbcConnection general PostgreSQL JDBC connection + * @param typeRegistry registry with PostgreSQL types + * @param streamParams additional parameters to pass to the replication stream + * @param schema the schema; must not be null + * <p>updates to the server + */ + private PostgresReplicationConnection( + PostgresConnectorConfig config, + String slotName, + String publicationName, + RelationalTableFilters tableFilter, + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, + PostgresConnectorConfig.LogicalDecoder plugin, + boolean dropSlotOnClose, + boolean doSnapshot, + Duration statusUpdateInterval, + PostgresConnection jdbcConnection, + TypeRegistry typeRegistry, + Properties streamParams, + PostgresSchema schema) { + super( + addDefaultSettings(config.getJdbcConfig()), + PostgresConnection.FACTORY, + null, + null, + "\"", + "\""); + + this.connectorConfig = config; + this.slotName = slotName; + this.publicationName = publicationName; + this.tableFilter = tableFilter; + this.publicationAutocreateMode = publicationAutocreateMode; + this.plugin = plugin; + this.dropSlotOnClose = dropSlotOnClose; + this.statusUpdateInterval = statusUpdateInterval; + this.messageDecoder = + plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); + this.jdbcConnection = jdbcConnection; + this.typeRegistry = typeRegistry; + this.streamParams = streamParams; + this.slotCreationInfo = null; + this.hasInitedSlot = false; + } + + private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) { + // first copy the parent's default settings... + // then set some additional replication specific settings + return JdbcConfiguration.adapt( + PostgresConnection.addDefaultSettings( + configuration, PostgresConnection.CONNECTION_STREAMING) + .edit() + .with("replication", "database") + .with( + "preferQueryMode", + "simple") // replication protocol only supports simple query mode + .build()); + } + + private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException { + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) { + return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName()); + } + } + + protected void initPublication() { + String createPublicationStmt; + String tableFilterString = null; + if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) { + LOGGER.info("Initializing PgOutput logical decoder publication"); + try { + // Unless the autocommit is disabled the SELECT publication query will stay running + Connection conn = pgConnection(); + conn.setAutoCommit(false); + + String selectPublication = + String.format( + "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", + publicationName); + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(selectPublication)) { + if (rs.next()) { + Long count = rs.getLong(1); + // Close eagerly as the transaction might stay running + if (count == 0L) { + LOGGER.info( + "Creating new publication '{}' for plugin '{}'", + publicationName, + plugin); + switch (publicationAutocreateMode) { + case DISABLED: + throw new ConnectException( + "Publication autocreation is disabled, please create one and restart the connector."); + case ALL_TABLES: + boolean supportPartitionRoot = ((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13); + createPublicationStmt = supportPartitionRoot + ? String.format( + "CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);", + publicationName) + : String.format( + "CREATE PUBLICATION %s FOR ALL TABLES;", + publicationName); + LOGGER.info( + "Creating Publication with statement '{}'", + createPublicationStmt); + // Publication doesn't exist, create it. + stmt.execute(createPublicationStmt); + break; + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, false); + break; + } + } else { + switch (publicationAutocreateMode) { + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, true); + break; + default: + LOGGER.trace( + "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server " + + "and will be used by the plugin", + publicationName, + plugin, + database()); + } + } + } + } + conn.commit(); + conn.setAutoCommit(true); + } catch (SQLException e) { + throw new JdbcConnectionException(e); + } + } + } + + private void createOrUpdatePublicationModeFilterted( Review Comment: The method name contains a spelling error: "Filterred" should be "Filtered". This typo appears in the method name createOrUpdatePublicationModeFilterted and should be corrected to createOrUpdatePublicationModeFiltered for better code readability and maintainability. ```suggestion private void createOrUpdatePublicationModeFiltered( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
