This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 66922d06bd0 [fix](streaming-job) fix postgres cdc multi-table 
publication data loss and binlog duplicate key (#64075)
66922d06bd0 is described below

commit 66922d06bd02e37edaa3add3908cb2b8ca08075c
Author: wudi <[email protected]>
AuthorDate: Fri Jun 5 14:08:19 2026 +0800

    [fix](streaming-job) fix postgres cdc multi-table publication data loss and 
binlog duplicate key (#64075)
    
    ## What problem does this PR solve?
    
    Several fixes and hardening for from-to (at-least-once) PostgreSQL /
    MySQL CDC streaming jobs.
    
    ### 1. Multi-table snapshot data loss (PostgreSQL)
    
    For a job with **multiple tables** and `offset=initial`, each
    snapshot/backfill split rewrote the shared per-job publication to only
    its own table (`ALTER PUBLICATION ... SET TABLE <one table>`), so during
    snapshot the publication kept flipping between single tables. Logical
    decoding evaluates publication membership at the WAL position of each
    change, so a row written to a table while the publication temporarily
    excluded it is filtered out and **permanently lost**, violating
    at-least-once. Single-table jobs and user-provided publications are not
    affected.
    
    Fix: Doris creates and owns the publication as the full `include_tables`
    set up-front in `PostgresSourceReader.initialize`, and publication
    autocreate is always DISABLED so it is never rewritten per split.
    
    ### 2. Binlog "Duplicate key" when a same-named table exists in another
    schema
    
    `doReadTableColumn` filtered the columns returned by `getColumns` only
    by TABLE_NAME. Because the schema argument of `getColumns` is a LIKE
    pattern, a schema whose name matches via wildcard (e.g. `_`) and
    contains a same-named table leaks its columns, which then collide on
    column name and throw `IllegalStateException: Duplicate key` on the
    binlog reader. Fix: also compare SCHEMA_NAME.
    
    ### 3. cdc_client stream-load writer thread leak
    
    `DorisBatchStreamLoad.close()` previously only `shutdown()` the
    consumer, leaving any producer parked on a full `flushQueue`. Every job
    close leaked one writer thread, eventually exhausting the pool
    (`RejectedExecutionException`). `close()` now flags exit +
    `loadThreadAlive=false`, `clear()`s the queue to release the parked
    `put()`, and `shutdownNow()` to interrupt a consumer stuck in a
    stream-load HTTP call.
    
    ### Stabilize flaky cases & new regression
    
    - `test_streaming_insert_job_fetch_meta_error`: assert PAUSED status and
    error message inside the poll (the job is auto-resumable and
    oscillates), instead of a separate read afterwards.
    - `test_streaming_postgres_job_special_offset`: use a UNIQUE-key table,
    wait for the full snapshot before asserting, and explicitly assert rows
    before the ALTER LSN are skipped.
    - New
    `test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table`:
    multi-table snapshot with concurrent DML on every table, asserting no
    row of either table is lost .
---
 .../postgresql/connection/PostgresConnection.java  |   9 +-
 .../connection/PostgresReplicationConnection.java  | 932 ---------------------
 .../cdcclient/service/PipelineCoordinator.java     |   9 +-
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java |   7 +-
 .../source/reader/mysql/MySqlSourceReader.java     |   7 +-
 .../reader/postgres/PostgresSourceReader.java      |  93 +-
 .../src/main/resources/application.properties      |   4 +-
 ...ob_snapshot_with_concurrent_dml_multi_table.out |  43 +
 ...snapshot_with_concurrent_dml_multi_table.groovy | 163 ++++
 ...st_streaming_postgres_job_special_offset.groovy |  11 +-
 ...st_streaming_insert_job_fetch_meta_error.groovy |   9 +-
 11 files changed, 327 insertions(+), 960 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
 
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
index 2f6ca5756dd..dbaf60f2a04 100644
--- 
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
+++ 
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
@@ -60,6 +60,7 @@ import java.util.regex.Pattern;
  * Copied from Flink Cdc 3.6.0
  *
  * <p>Line 820~854: modified getColumnValue method to fix FLINK-39748.
+ * <p>Line 699-703, 705-706: doReadTableColumn also matches SCHEMA_NAME 
(besides TABLE_NAME, FLINK-38965) to avoid Duplicate key from a decoy schema's 
same-named table.
  */
 public class PostgresConnection extends JdbcConnection {
 
@@ -695,8 +696,14 @@ public class PostgresConnection extends JdbcConnection {
         // - When querying 'user_sink', the pattern may also match 'userbsink' 
(due to '_')
         // - When querying 'user%data' (where % is literal), it may match 
'user_test_data' (due to
         // '%')
+        // The schema name passed to getColumns is also a LIKE pattern, so a 
decoy schema
+        // (e.g. 'cdcXtest' matched by 'cdc_test' via '_') can return a 
same-named table whose
+        // TABLE_NAME still equals tableId.table(); compare the schema too, 
otherwise those
+        // columns merge with the real table's and collide on column name 
(Duplicate key).
+        final String resultSchemaName = columnMetadata.getString(2);
         final String resultTableName = columnMetadata.getString(3);
-        if (!tableId.table().equals(resultTableName)) {
+        if (!tableId.table().equals(resultTableName)
+                || (tableId.schema() != null && 
!tableId.schema().equals(resultSchemaName))) {
             return Optional.empty();
         }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
 
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
deleted file mode 100644
index eeee1e725ef..00000000000
--- 
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
+++ /dev/null
@@ -1,932 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
- */
-
-package io.debezium.connector.postgresql.connection;
-
-import io.debezium.DebeziumException;
-import io.debezium.connector.postgresql.PostgresConnectorConfig;
-import io.debezium.connector.postgresql.PostgresSchema;
-import io.debezium.connector.postgresql.TypeRegistry;
-import io.debezium.connector.postgresql.spi.SlotCreationResult;
-import io.debezium.jdbc.JdbcConfiguration;
-import io.debezium.jdbc.JdbcConnection;
-import io.debezium.jdbc.JdbcConnectionException;
-import io.debezium.relational.RelationalTableFilters;
-import io.debezium.relational.TableId;
-import io.debezium.util.Clock;
-import io.debezium.util.Metronome;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.postgresql.core.BaseConnection;
-import org.postgresql.core.ServerVersion;
-import org.postgresql.replication.PGReplicationStream;
-import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
-import org.postgresql.util.PSQLException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static java.lang.Math.toIntExact;
-
-/**
- * Copied from Flink Cdc 3.5.0
- *
- * <p>Line 248~251, 258: add publish_via_partition_root for partition table 
(FILTERED mode).
- */
-public class PostgresReplicationConnection extends JdbcConnection implements 
ReplicationConnection {
-
-    private static Logger LOGGER = 
LoggerFactory.getLogger(PostgresReplicationConnection.class);
-
-    private final String slotName;
-    private final String publicationName;
-    private final RelationalTableFilters tableFilter;
-    private final PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode;
-    private final PostgresConnectorConfig.LogicalDecoder plugin;
-    private final boolean dropSlotOnClose;
-    private final PostgresConnectorConfig connectorConfig;
-    private final Duration statusUpdateInterval;
-    private final MessageDecoder messageDecoder;
-    private final PostgresConnection jdbcConnection;
-    private final TypeRegistry typeRegistry;
-    private final Properties streamParams;
-
-    private Lsn defaultStartingPos;
-    private SlotCreationResult slotCreationInfo;
-    private boolean hasInitedSlot;
-
-    private Lsn endingPos;
-
-    /**
-     * Creates a new replication connection with the given params.
-     *
-     * @param config the JDBC configuration for the connection; may not be null
-     * @param slotName the name of the DB slot for logical replication; may 
not be null
-     * @param publicationName the name of the DB publication for logical 
replication; may not be
-     *     null
-     * @param tableFilter the tables to watch of the DB publication for 
logical replication; may not
-     *     be null
-     * @param publicationAutocreateMode the mode for publication autocreation; 
may not be null
-     * @param plugin decoder matching the server side plug-in used for 
streaming changes; may not be
-     *     null
-     * @param dropSlotOnClose whether the replication slot should be dropped 
once the connection is
-     *     closed
-     * @param statusUpdateInterval the interval at which the replication 
connection should
-     *     periodically send status
-     * @param doSnapshot whether the connector is doing snapshot
-     * @param jdbcConnection general PostgreSQL JDBC connection
-     * @param typeRegistry registry with PostgreSQL types
-     * @param streamParams additional parameters to pass to the replication 
stream
-     * @param schema the schema; must not be null
-     *     <p>updates to the server
-     */
-    private PostgresReplicationConnection(
-            PostgresConnectorConfig config,
-            String slotName,
-            String publicationName,
-            RelationalTableFilters tableFilter,
-            PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
-            PostgresConnectorConfig.LogicalDecoder plugin,
-            boolean dropSlotOnClose,
-            boolean doSnapshot,
-            Duration statusUpdateInterval,
-            PostgresConnection jdbcConnection,
-            TypeRegistry typeRegistry,
-            Properties streamParams,
-            PostgresSchema schema) {
-        super(
-                addDefaultSettings(config.getJdbcConfig()),
-                PostgresConnection.FACTORY,
-                null,
-                null,
-                "\"",
-                "\"");
-
-        this.connectorConfig = config;
-        this.slotName = slotName;
-        this.publicationName = publicationName;
-        this.tableFilter = tableFilter;
-        this.publicationAutocreateMode = publicationAutocreateMode;
-        this.plugin = plugin;
-        this.dropSlotOnClose = dropSlotOnClose;
-        this.statusUpdateInterval = statusUpdateInterval;
-        this.messageDecoder =
-                plugin.messageDecoder(new MessageDecoderContext(config, 
schema), jdbcConnection);
-        this.jdbcConnection = jdbcConnection;
-        this.typeRegistry = typeRegistry;
-        this.streamParams = streamParams;
-        this.slotCreationInfo = null;
-        this.hasInitedSlot = false;
-    }
-
-    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration 
configuration) {
-        // first copy the parent's default settings...
-        // then set some additional replication specific settings
-        return JdbcConfiguration.adapt(
-                PostgresConnection.addDefaultSettings(
-                                configuration, 
PostgresConnection.CONNECTION_STREAMING)
-                        .edit()
-                        .with("replication", "database")
-                        .with(
-                                "preferQueryMode",
-                                "simple") // replication protocol only 
supports simple query mode
-                        .build());
-    }
-
-    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, 
InterruptedException {
-        try (PostgresConnection connection =
-                new PostgresConnection(
-                        connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_SLOT_INFO)) {
-            return connection.readReplicationSlotInfo(slotName, 
plugin.getPostgresPluginName());
-        }
-    }
-
-    protected void initPublication() {
-        String createPublicationStmt;
-        String tableFilterString = null;
-        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
-            LOGGER.info("Initializing PgOutput logical decoder publication");
-            try {
-                // Unless the autocommit is disabled the SELECT publication 
query will stay running
-                Connection conn = pgConnection();
-                conn.setAutoCommit(false);
-
-                String selectPublication =
-                        String.format(
-                                "SELECT COUNT(1) FROM pg_publication WHERE 
pubname = '%s'",
-                                publicationName);
-                try (Statement stmt = conn.createStatement();
-                        ResultSet rs = stmt.executeQuery(selectPublication)) {
-                    if (rs.next()) {
-                        Long count = rs.getLong(1);
-                        // Close eagerly as the transaction might stay running
-                        if (count == 0L) {
-                            LOGGER.info(
-                                    "Creating new publication '{}' for plugin 
'{}'",
-                                    publicationName,
-                                    plugin);
-                            switch (publicationAutocreateMode) {
-                                case DISABLED:
-                                    throw new ConnectException(
-                                            "Publication autocreation is 
disabled, please create one and restart the connector.");
-                                case ALL_TABLES:
-                                    createPublicationStmt =
-                                            String.format(
-                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES;",
-                                                    publicationName);
-                                    LOGGER.info(
-                                            "Creating Publication with 
statement '{}'",
-                                            createPublicationStmt);
-                                    // Publication doesn't exist, create it.
-                                    stmt.execute(createPublicationStmt);
-                                    break;
-                                case FILTERED:
-                                    createOrUpdatePublicationModeFilterted(
-                                            tableFilterString, stmt, false);
-                                    break;
-                            }
-                        } else {
-                            switch (publicationAutocreateMode) {
-                                case FILTERED:
-                                    createOrUpdatePublicationModeFilterted(
-                                            tableFilterString, stmt, true);
-                                    break;
-                                default:
-                                    LOGGER.trace(
-                                            "A logical publication named '{}' 
for plugin '{}' and database '{}' is already active on the server "
-                                                    + "and will be used by the 
plugin",
-                                            publicationName,
-                                            plugin,
-                                            database());
-                            }
-                        }
-                    }
-                }
-                conn.commit();
-                conn.setAutoCommit(true);
-            } catch (SQLException e) {
-                throw new JdbcConnectionException(e);
-            }
-        }
-    }
-
-    private void createOrUpdatePublicationModeFilterted(
-            String tableFilterString, Statement stmt, boolean isUpdate) {
-        String createOrUpdatePublicationStmt;
-        try {
-            Set<TableId> tablesToCapture = determineCapturedTables();
-            tableFilterString =
-                    tablesToCapture.stream()
-                            .map(TableId::toDoubleQuotedString)
-                            .collect(Collectors.joining(", "));
-            if (tableFilterString.isEmpty()) {
-                throw new DebeziumException(
-                        String.format(
-                                "No table filters found for filtered 
publication %s",
-                                publicationName));
-            }
-            boolean supportPartitionRoot = !isUpdate
-                    && ((BaseConnection) 
pgConnection()).haveMinimumServerVersion(ServerVersion.v13);
-            String pubViaRootSuffix = supportPartitionRoot
-                    ? " WITH (publish_via_partition_root = true)" : "";
-            createOrUpdatePublicationStmt =
-                    isUpdate
-                            ? String.format(
-                            "ALTER PUBLICATION %s SET TABLE %s;",
-                            publicationName, tableFilterString)
-                            : String.format(
-                                    "CREATE PUBLICATION %s FOR TABLE %s%s;",
-                                    publicationName, tableFilterString, 
pubViaRootSuffix);
-            LOGGER.info(
-                    isUpdate
-                            ? "Updating Publication with statement '{}'"
-                            : "Creating Publication with statement '{}'",
-                    createOrUpdatePublicationStmt);
-            stmt.execute(createOrUpdatePublicationStmt);
-        } catch (Exception e) {
-            throw new ConnectException(
-                    String.format(
-                            "Unable to %s filtered publication %s for %s",
-                            isUpdate ? "update" : "create", publicationName, 
tableFilterString),
-                    e);
-        }
-    }
-
-    private Set<TableId> determineCapturedTables() throws Exception {
-        Set<TableId> allTableIds = 
jdbcConnection.getAllTableIds(connectorConfig.databaseName());
-
-        Set<TableId> capturedTables = new HashSet<>();
-
-        for (TableId tableId : allTableIds) {
-            if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
-                LOGGER.trace("Adding table {} to the list of captured tables", 
tableId);
-                capturedTables.add(tableId);
-            } else {
-                LOGGER.trace(
-                        "Ignoring table {} as it's not included in the filter 
configuration",
-                        tableId);
-            }
-        }
-
-        return capturedTables.stream()
-                .sorted()
-                .collect(Collectors.toCollection(LinkedHashSet::new));
-    }
-
-    protected void initReplicationSlot() throws SQLException, 
InterruptedException {
-        ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
-
-        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == 
slotInfo;
-        try {
-            // there's no info for this plugin and slot so create a new slot
-            if (shouldCreateSlot) {
-                this.createReplicationSlot();
-            }
-
-            // replication connection does not support parsing of SQL 
statements so we need to
-            // create
-            // the connection without executing on connect statements - see 
JDBC opt
-            // preferQueryMode=simple
-            pgConnection();
-            final String identifySystemStatement = "IDENTIFY_SYSTEM";
-            LOGGER.debug(
-                    "running '{}' to validate replication connection", 
identifySystemStatement);
-            final Lsn xlogStart =
-                    queryAndMap(
-                            identifySystemStatement,
-                            rs -> {
-                                if (!rs.next()) {
-                                    throw new IllegalStateException(
-                                            "The DB connection is not a valid 
replication connection");
-                                }
-                                String xlogpos = rs.getString("xlogpos");
-                                LOGGER.debug("received latest xlogpos '{}'", 
xlogpos);
-                                return Lsn.valueOf(xlogpos);
-                            });
-
-            if (slotCreationInfo != null) {
-                this.defaultStartingPos = slotCreationInfo.startLsn();
-            } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
-                // this is a new slot or we weren't able to read a valid flush 
LSN pos, so we always
-                // start from the xlog pos that was reported
-                this.defaultStartingPos = xlogStart;
-            } else {
-                Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
-                this.defaultStartingPos =
-                        latestFlushedLsn.compareTo(xlogStart) < 0 ? 
latestFlushedLsn : xlogStart;
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("found previous flushed LSN '{}'", 
latestFlushedLsn);
-                }
-            }
-            hasInitedSlot = true;
-        } catch (SQLException e) {
-            throw new JdbcConnectionException(e);
-        }
-    }
-
-    // Temporary replication slots is a new feature of PostgreSQL 10
-    private boolean useTemporarySlot() throws SQLException {
-        // Temporary replication slots cannot be used due to connection restart
-        // when finding WAL position
-        // return dropSlotOnClose && 
pgConnection().haveMinimumServerVersion(ServerVersion.v10);
-        return false;
-    }
-
-    /**
-     * creating a replication connection and starting to stream involves a few 
steps: 1. we create
-     * the connection and ensure that a. the slot exists b. the slot isn't 
currently being used 2.
-     * we query to get our potential start position in the slot (lsn) 3. we 
try and start streaming,
-     * depending on our options (such as in wal2json) this may fail, which can 
result in the
-     * connection being killed and we need to start the process over if we are 
using a temporary
-     * slot 4. actually start the streamer
-     *
-     * <p>This method takes care of all of these and this method queries for a 
default starting
-     * position If you know where you are starting from you should call {@link 
#startStreaming(Lsn,
-     * WalPositionLocator)}, this method delegates to that method
-     *
-     * @return
-     * @throws SQLException
-     * @throws InterruptedException
-     */
-    @Override
-    public ReplicationStream startStreaming(WalPositionLocator walPosition)
-            throws SQLException, InterruptedException {
-        return startStreaming(null, walPosition);
-    }
-
-    @Override
-    public ReplicationStream startStreaming(Lsn offset, WalPositionLocator 
walPosition)
-            throws SQLException, InterruptedException {
-        initConnection();
-
-        connect();
-        if (offset == null || !offset.isValid()) {
-            offset = defaultStartingPos;
-        }
-        Lsn lsn = offset;
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("starting streaming from LSN '{}'", lsn);
-        }
-
-        final int maxRetries = connectorConfig.maxRetries();
-        final Duration delay = connectorConfig.retryDelay();
-        int tryCount = 0;
-        while (true) {
-            try {
-                return createReplicationStream(lsn, walPosition);
-            } catch (Exception e) {
-                String message = "Failed to start replication stream at " + 
lsn;
-                if (++tryCount > maxRetries) {
-                    if (e.getMessage().matches(".*replication slot .* is 
active.*")) {
-                        message +=
-                                "; when setting up multiple connectors for the 
same database host, please make sure to use a distinct replication slot name 
for each.";
-                    }
-                    throw new DebeziumException(message, e);
-                } else {
-                    LOGGER.warn(
-                            message + ", waiting for {} ms and retrying, 
attempt number {} over {}",
-                            delay,
-                            tryCount,
-                            maxRetries);
-                    final Metronome metronome = Metronome.sleeper(delay, 
Clock.SYSTEM);
-                    metronome.pause();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void initConnection() throws SQLException, InterruptedException {
-        // See 
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
-        // For pgoutput specifically, the publication must be created before 
the slot.
-        initPublication();
-        if (!hasInitedSlot) {
-            initReplicationSlot();
-        }
-    }
-
-    @Override
-    public Optional<SlotCreationResult> createReplicationSlot() throws 
SQLException {
-        // note that some of these options are only supported in Postgres 
9.4+, additionally
-        // the options are not yet exported by the jdbc api wrapper, 
therefore, we just do
-        // this ourselves but eventually this should be moved back to the jdbc 
API
-        // see https://github.com/pgjdbc/pgjdbc/issues/1305
-
-        LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", 
slotName, plugin);
-        String tempPart = "";
-        // Exported snapshots are supported in Postgres 9.4+
-        boolean canExportSnapshot = 
pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
-        if ((dropSlotOnClose) && !canExportSnapshot) {
-            LOGGER.warn(
-                    "A slot marked as temporary or with an exported snapshot 
was created, "
-                            + "but not on a supported version of Postgres, 
ignoring!");
-        }
-        if (useTemporarySlot()) {
-            tempPart = "TEMPORARY";
-        }
-
-        // See 
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
-        // For pgoutput specifically, the publication must be created prior to 
the slot.
-        initPublication();
-
-        try (Statement stmt = pgConnection().createStatement()) {
-            String createCommand =
-                    String.format(
-                            "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
-                            slotName, tempPart, 
plugin.getPostgresPluginName());
-            LOGGER.info("Creating replication slot with command {}", 
createCommand);
-            stmt.execute(createCommand);
-            // when we are in Postgres 9.4+, we can parse the slot creation 
info,
-            // otherwise, it returns nothing
-            if (canExportSnapshot) {
-                this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
-            }
-
-            return Optional.ofNullable(slotCreationInfo);
-        }
-    }
-
-    protected BaseConnection pgConnection() throws SQLException {
-        return (BaseConnection) connection(false);
-    }
-
-    private SlotCreationResult parseSlotCreation(ResultSet rs) {
-        try {
-            if (rs.next()) {
-                String slotName = rs.getString("slot_name");
-                String startPoint = rs.getString("consistent_point");
-                String snapName = rs.getString("snapshot_name");
-                String pluginName = rs.getString("output_plugin");
-
-                return new SlotCreationResult(slotName, startPoint, snapName, 
pluginName);
-            } else {
-                throw new ConnectException("No replication slot found");
-            }
-        } catch (SQLException ex) {
-            throw new ConnectException("Unable to parse 
create_replication_slot response", ex);
-        }
-    }
-
-    private ReplicationStream createReplicationStream(
-            final Lsn startLsn, WalPositionLocator walPosition)
-            throws SQLException, InterruptedException {
-        PGReplicationStream s;
-
-        try {
-            try {
-                s =
-                        startPgReplicationStream(
-                                startLsn,
-                                plugin.forceRds()
-                                        ? 
messageDecoder::optionsWithoutMetadata
-                                        : messageDecoder::optionsWithMetadata);
-                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : 
true);
-            } catch (PSQLException e) {
-                LOGGER.debug(
-                        "Could not register for streaming, retrying without 
optional options", e);
-
-                // re-init the slot after a failed start of slot, as this
-                // may have closed the slot
-                if (useTemporarySlot()) {
-                    initReplicationSlot();
-                }
-
-                s =
-                        startPgReplicationStream(
-                                startLsn,
-                                plugin.forceRds()
-                                        ? 
messageDecoder::optionsWithoutMetadata
-                                        : messageDecoder::optionsWithMetadata);
-                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : 
true);
-            }
-        } catch (PSQLException e) {
-            if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
-                // It is possible we are connecting to an old wal2json plug-in
-                LOGGER.warn(
-                        "Could not register for streaming with metadata in 
messages, falling back to messages without metadata");
-
-                // re-init the slot after a failed start of slot, as this
-                // may have closed the slot
-                if (useTemporarySlot()) {
-                    initReplicationSlot();
-                }
-
-                s = startPgReplicationStream(startLsn, 
messageDecoder::optionsWithoutMetadata);
-                messageDecoder.setContainsMetadata(false);
-            } else if (e.getMessage()
-                    .matches("(?s)ERROR: requested WAL segment .* has already 
been removed.*")) {
-                LOGGER.error("Cannot rewind to last processed WAL position", 
e);
-                throw new ConnectException(
-                        "The offset to start reading from has been removed 
from the database write-ahead log. Create a new snapshot and consider setting 
of PostgreSQL parameter wal_keep_segments = 0.");
-            } else {
-                throw e;
-            }
-        }
-
-        final PGReplicationStream stream = s;
-
-        return new ReplicationStream() {
-
-            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
-            private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
-            private ExecutorService keepAliveExecutor = null;
-            private AtomicBoolean keepAliveRunning;
-            private final Metronome metronome =
-                    Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
-
-            // make sure this is volatile since multiple threads may be 
interested in this value
-            private volatile Lsn lastReceivedLsn;
-
-            @Override
-            public void read(ReplicationMessageProcessor processor)
-                    throws SQLException, InterruptedException {
-                processWarnings(false);
-                ByteBuffer read = stream.read();
-                final Lsn lastReceiveLsn = 
Lsn.valueOf(stream.getLastReceiveLSN());
-                LOGGER.trace(
-                        "Streaming requested from LSN {}, received LSN {}",
-                        startLsn,
-                        lastReceiveLsn);
-                if (reachEnd(lastReceivedLsn)) {
-                    lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
-                    LOGGER.trace("Received message at LSN {}", 
lastReceivedLsn);
-                    processor.process(new ReplicationMessage.NoopMessage(null, 
null));
-                    return;
-                }
-                if (messageDecoder.shouldMessageBeSkipped(
-                        read, lastReceiveLsn, startLsn, walPosition)) {
-                    return;
-                }
-                deserializeMessages(read, processor);
-            }
-
-            @Override
-            public boolean readPending(ReplicationMessageProcessor processor)
-                    throws SQLException, InterruptedException {
-                processWarnings(false);
-                ByteBuffer read = stream.readPending();
-                final Lsn lastReceiveLsn = 
Lsn.valueOf(stream.getLastReceiveLSN());
-                LOGGER.trace(
-                        "Streaming requested from LSN {}, received LSN {}",
-                        startLsn,
-                        lastReceiveLsn);
-
-                if (reachEnd(lastReceiveLsn)) {
-                    lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
-                    LOGGER.trace("Received message at LSN {}", 
lastReceivedLsn);
-                    processor.process(new ReplicationMessage.NoopMessage(null, 
null));
-                    return true;
-                }
-
-                if (read == null) {
-                    return false;
-                }
-
-                if (messageDecoder.shouldMessageBeSkipped(
-                        read, lastReceiveLsn, startLsn, walPosition)) {
-                    return true;
-                }
-
-                deserializeMessages(read, processor);
-
-                return true;
-            }
-
-            private void deserializeMessages(
-                    ByteBuffer buffer, ReplicationMessageProcessor processor)
-                    throws SQLException, InterruptedException {
-                lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
-                LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
-                messageDecoder.processMessage(buffer, processor, typeRegistry);
-            }
-
-            @Override
-            public void close() throws SQLException {
-                processWarnings(true);
-                stream.close();
-            }
-
-            @Override
-            public void flushLsn(Lsn lsn) throws SQLException {
-                doFlushLsn(lsn);
-            }
-
-            private void doFlushLsn(Lsn lsn) throws SQLException {
-                stream.setFlushedLSN(lsn.asLogSequenceNumber());
-                stream.setAppliedLSN(lsn.asLogSequenceNumber());
-
-                stream.forceUpdateStatus();
-            }
-
-            @Override
-            public Lsn lastReceivedLsn() {
-                return lastReceivedLsn;
-            }
-
-            @Override
-            public void startKeepAlive(ExecutorService service) {
-                if (keepAliveExecutor == null) {
-                    keepAliveExecutor = service;
-                    keepAliveRunning = new AtomicBoolean(true);
-                    keepAliveExecutor.submit(
-                            () -> {
-                                while (keepAliveRunning.get()) {
-                                    try {
-                                        LOGGER.trace(
-                                                "Forcing status update with 
replication stream");
-                                        stream.forceUpdateStatus();
-                                        metronome.pause();
-                                    } catch (Exception exp) {
-                                        throw new RuntimeException(
-                                                "received unexpected exception 
will perform keep alive",
-                                                exp);
-                                    }
-                                }
-                            });
-                }
-            }
-
-            @Override
-            public void stopKeepAlive() {
-                if (keepAliveExecutor != null) {
-                    keepAliveRunning.set(false);
-                    keepAliveExecutor.shutdownNow();
-                    keepAliveExecutor = null;
-                }
-            }
-
-            private void processWarnings(final boolean forced) throws 
SQLException {
-                if (--warningCheckCounter == 0 || forced) {
-                    warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
-                    for (SQLWarning w = connection().getWarnings();
-                            w != null;
-                            w = w.getNextWarning()) {
-                        LOGGER.debug(
-                                "Server-side message: '{}', state = {}, code = 
{}",
-                                w.getMessage(),
-                                w.getSQLState(),
-                                w.getErrorCode());
-                    }
-                    connection().clearWarnings();
-                }
-            }
-
-            @Override
-            public Lsn startLsn() {
-                return startLsn;
-            }
-
-            private boolean reachEnd(Lsn receivedLsn) {
-                if (receivedLsn == null) {
-                    return false;
-                }
-                return endingPos != null
-                        && (!endingPos.isNonStopping())
-                        && endingPos.compareTo(receivedLsn) < 0;
-            }
-        };
-    }
-
-    public void setEndingPos(Lsn endingPos) {
-        this.endingPos = endingPos;
-    }
-
-    private PGReplicationStream startPgReplicationStream(
-            final Lsn lsn,
-            BiFunction<
-                    ChainedLogicalStreamBuilder,
-                    Function<Integer, Boolean>,
-                    ChainedLogicalStreamBuilder>
-                    configurator)
-            throws SQLException {
-        assert lsn != null;
-        ChainedLogicalStreamBuilder streamBuilder =
-                pgConnection()
-                        .getReplicationAPI()
-                        .replicationStream()
-                        .logical()
-                        .withSlotName("\"" + slotName + "\"")
-                        .withStartPosition(lsn.asLogSequenceNumber())
-                        .withSlotOptions(streamParams);
-        streamBuilder = configurator.apply(streamBuilder, 
this::hasMinimumVersion);
-
-        if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 
0) {
-            streamBuilder.withStatusInterval(
-                    toIntExact(statusUpdateInterval.toMillis()), 
TimeUnit.MILLISECONDS);
-        }
-
-        PGReplicationStream stream = streamBuilder.start();
-
-        // TODO DBZ-508 get rid of this
-        // Needed by tests when connections are opened and closed in a fast 
sequence
-        try {
-            Thread.sleep(10);
-        } catch (Exception e) {
-        }
-        stream.forceUpdateStatus();
-        return stream;
-    }
-
-    private Boolean hasMinimumVersion(int version) {
-        try {
-            return pgConnection().haveMinimumServerVersion(version);
-        } catch (SQLException e) {
-            throw new DebeziumException(e);
-        }
-    }
-
-    @Override
-    public synchronized void close() {
-        close(true);
-    }
-
-    public synchronized void close(boolean dropSlot) {
-        try {
-            LOGGER.debug("Closing message decoder");
-            messageDecoder.close();
-        } catch (Throwable e) {
-            LOGGER.error("Unexpected error while closing message decoder", e);
-        }
-
-        try {
-            LOGGER.debug("Closing replication connection");
-            super.close();
-        } catch (Throwable e) {
-            LOGGER.error("Unexpected error while closing Postgres connection", 
e);
-        }
-        if (dropSlotOnClose && dropSlot) {
-            // we're dropping the replication slot via a regular - i.e. not a 
replication -
-            // connection
-            try (PostgresConnection connection =
-                    new PostgresConnection(
-                            connectorConfig.getJdbcConfig(),
-                            PostgresConnection.CONNECTION_DROP_SLOT)) {
-                connection.dropReplicationSlot(slotName);
-            } catch (Throwable e) {
-                LOGGER.error("Unexpected error while dropping replication 
slot", e);
-            }
-        }
-    }
-
-    @Override
-    public void reconnect() throws SQLException {
-        close(false);
-        // Don't re-execute initial commands on reconnection
-        connection(false);
-    }
-
-    protected static class ReplicationConnectionBuilder implements Builder {
-
-        private final PostgresConnectorConfig config;
-        private String slotName = DEFAULT_SLOT_NAME;
-        private String publicationName = DEFAULT_PUBLICATION_NAME;
-        private RelationalTableFilters tableFilter;
-        private PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode =
-                PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
-        private PostgresConnectorConfig.LogicalDecoder plugin =
-                PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
-        private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
-        private Duration statusUpdateIntervalVal;
-        private boolean doSnapshot;
-        private TypeRegistry typeRegistry;
-        private PostgresSchema schema;
-        private Properties slotStreamParams = new Properties();
-        private PostgresConnection jdbcConnection;
-
-        protected ReplicationConnectionBuilder(PostgresConnectorConfig config) 
{
-            assert config != null;
-            this.config = config;
-        }
-
-        @Override
-        public ReplicationConnectionBuilder withSlot(final String slotName) {
-            assert slotName != null;
-            this.slotName = slotName;
-            return this;
-        }
-
-        @Override
-        public Builder withPublication(String publicationName) {
-            assert publicationName != null;
-            this.publicationName = publicationName;
-            return this;
-        }
-
-        @Override
-        public Builder withTableFilter(RelationalTableFilters tableFilter) {
-            assert tableFilter != null;
-            this.tableFilter = tableFilter;
-            return this;
-        }
-
-        @Override
-        public Builder withPublicationAutocreateMode(
-                PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode) {
-            assert publicationName != null;
-            this.publicationAutocreateMode = publicationAutocreateMode;
-            return this;
-        }
-
-        @Override
-        public ReplicationConnectionBuilder withPlugin(
-                final PostgresConnectorConfig.LogicalDecoder plugin) {
-            assert plugin != null;
-            this.plugin = plugin;
-            return this;
-        }
-
-        @Override
-        public ReplicationConnectionBuilder dropSlotOnClose(final boolean 
dropSlotOnClose) {
-            this.dropSlotOnClose = dropSlotOnClose;
-            return this;
-        }
-
-        @Override
-        public ReplicationConnectionBuilder streamParams(final String 
slotStreamParams) {
-            if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
-                this.slotStreamParams = new Properties();
-                String[] paramsWithValues = slotStreamParams.split(";");
-                for (String paramsWithValue : paramsWithValues) {
-                    String[] paramAndValue = paramsWithValue.split("=");
-                    if (paramAndValue.length == 2) {
-                        this.slotStreamParams.setProperty(paramAndValue[0], 
paramAndValue[1]);
-                    } else {
-                        LOGGER.warn(
-                                "The following STREAM_PARAMS value is invalid: 
{}",
-                                paramsWithValue);
-                    }
-                }
-            }
-            return this;
-        }
-
-        @Override
-        public ReplicationConnectionBuilder statusUpdateInterval(
-                final Duration statusUpdateInterval) {
-            this.statusUpdateIntervalVal = statusUpdateInterval;
-            return this;
-        }
-
-        @Override
-        public Builder doSnapshot(boolean doSnapshot) {
-            this.doSnapshot = doSnapshot;
-            return this;
-        }
-
-        @Override
-        public Builder jdbcMetadataConnection(PostgresConnection 
jdbcConnection) {
-            this.jdbcConnection = jdbcConnection;
-            return this;
-        }
-
-        @Override
-        public ReplicationConnection build() {
-            assert plugin != null : "Decoding plugin name is not set";
-            return new PostgresReplicationConnection(
-                    config,
-                    slotName,
-                    publicationName,
-                    tableFilter,
-                    publicationAutocreateMode,
-                    plugin,
-                    dropSlotOnClose,
-                    doSnapshot,
-                    statusUpdateIntervalVal,
-                    jdbcConnection,
-                    typeRegistry,
-                    slotStreamParams,
-                    schema);
-        }
-
-        @Override
-        public Builder withTypeRegistry(TypeRegistry typeRegistry) {
-            this.typeRegistry = typeRegistry;
-            return this;
-        }
-
-        @Override
-        public Builder withSchema(PostgresSchema schema) {
-            this.schema = schema;
-            return this;
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 9505290d55e..ebbd9c4acf1 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -64,6 +64,7 @@ import com.google.common.base.Preconditions;
 import io.debezium.data.Envelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
 
@@ -80,16 +81,16 @@ public class PipelineCoordinator {
     // taskId -> writeFailReason
     private final Map<String, String> taskErrorMaps = new 
ConcurrentHashMap<>();
     private final ThreadPoolExecutor executor;
-    private static final int MAX_CONCURRENT_TASKS = 10;
     private static final int QUEUE_CAPACITY = 128;
     private static final ObjectMapper objectMapper = new ObjectMapper();
     private final byte[] LINE_DELIMITER = 
"\n".getBytes(StandardCharsets.UTF_8);
 
-    public PipelineCoordinator() {
+    public PipelineCoordinator(
+            @Value("${pipeline.max-concurrent-tasks:10}") int 
maxConcurrentTasks) {
         this.executor =
                 new ThreadPoolExecutor(
-                        MAX_CONCURRENT_TASKS,
-                        MAX_CONCURRENT_TASKS,
+                        maxConcurrentTasks,
+                        maxConcurrentTasks,
                         60L,
                         TimeUnit.SECONDS,
                         new LinkedBlockingQueue<>(QUEUE_CAPACITY),
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index f853f931646..b1d1cd8ba03 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -270,9 +270,12 @@ public class DorisBatchStreamLoad implements Serializable {
     }
 
     public void close() {
-        // close async executor
-        this.loadExecutorService.shutdown();
+        // Wake up any blocked producer and stop the loader to avoid writer 
thread leak.
         this.started.set(false);
+        this.loadThreadAlive = false;
+        this.flushQueue.clear();
+        this.currentCacheBytes.set(0);
+        this.loadExecutorService.shutdownNow();
     }
 
     @VisibleForTesting
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 1bc7db23fd4..a4c19aedb9e 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -849,7 +849,12 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                 
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
         final StatefulTaskContext statefulTaskContext =
                 new StatefulTaskContext(sourceConfig, binaryLogClient, 
jdbcConnection);
-        return new BinlogSplitReader(statefulTaskContext, 0);
+        int readerTag = Math.abs(config.getJobId().hashCode());
+        LOG.info(
+                "create binlog reader for job {}, thread tag = 
binlog-reader-{}",
+                config.getJobId(),
+                readerTag);
+        return new BinlogSplitReader(statefulTaskContext, readerTag);
     }
 
     private MySqlSourceConfig getSourceConfig(JobBaseConfig config) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 2e09e48957a..027a4c7af94 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -55,14 +55,17 @@ import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
 import org.apache.flink.table.types.DataType;
 
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -80,6 +83,8 @@ import io.debezium.relational.history.TableChanges;
 import io.debezium.time.Conversions;
 import lombok.Data;
 import org.postgresql.Driver;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,6 +103,11 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
     public void initialize(String jobId, DataSource dataSource, Map<String, 
String> config) {
         PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId, 0);
         PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        // Doris-owned publication: pre-create it covering all include_tables 
(autocreate is
+        // DISABLED).
+        if (isPublicationDorisOwned(config, jobId)) {
+            createPublicationForDorisOwned(dialect, config, jobId);
+        }
         // Only create the slot when Doris owns it (name == default); 
user-provided slots must
         // pre-exist, validated at CREATE JOB.
         if (isSlotDorisOwned(config, jobId)) {
@@ -150,6 +160,61 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
     }
 
+    /**
+     * Create/ensure the Doris-owned publication for all include_tables 
(idempotent, multi-BE safe).
+     */
+    private void createPublicationForDorisOwned(
+            PostgresDialect dialect, Map<String, String> config, String jobId) 
{
+        String pubName = resolvePublicationName(config, jobId);
+        String schema = config.get(DataSourceConfigKeys.SCHEMA);
+        String[] qualified = ConfigUtil.getTableList(schema, config);
+        if (qualified.length == 0) {
+            throw new CdcClientException("No tables to create publication " + 
pubName);
+        }
+        String tableList =
+                Arrays.stream(qualified)
+                        .map(
+                                q ->
+                                        new TableId(null, schema, 
q.substring(q.indexOf('.') + 1))
+                                                .toDoubleQuotedString())
+                        .collect(Collectors.joining(", "));
+        // Mirrors debezium PostgresReplicationConnection#initPublication: 
check existence, then
+        // CREATE ... FOR TABLE / ALTER ... SET TABLE (here always the full 
include_tables set).
+        try (PostgresConnection conn = dialect.openJdbcConnection();
+                Statement stmt = conn.connection().createStatement()) {
+            long count;
+            try (ResultSet rs =
+                    stmt.executeQuery(
+                            "SELECT COUNT(1) FROM pg_publication WHERE pubname 
= '"
+                                    + pubName
+                                    + "'")) {
+                rs.next();
+                count = rs.getLong(1);
+            }
+            if (count == 0) {
+                // Preserve debezium FILTERED behavior: on PG 13+ publish 
partitioned-root changes
+                // as the root table, matching 
configFactory.setIncludePartitionedTables(true).
+                String pubViaRootSuffix =
+                        ((BaseConnection) conn.connection())
+                                        
.haveMinimumServerVersion(ServerVersion.v13)
+                                ? " WITH (publish_via_partition_root = true)"
+                                : "";
+                stmt.execute(
+                        "CREATE PUBLICATION "
+                                + pubName
+                                + " FOR TABLE "
+                                + tableList
+                                + pubViaRootSuffix);
+            } else {
+                stmt.execute("ALTER PUBLICATION " + pubName + " SET TABLE " + 
tableList);
+            }
+            LOG.info("Ensured publication {} for tables {}", pubName, 
tableList);
+        } catch (SQLException e) {
+            throw new CdcClientException(
+                    "Failed to create publication " + pubName + ": " + 
e.getMessage(), e);
+        }
+    }
+
     @Override
     protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
         return generatePostgresConfig(config);
@@ -242,18 +307,17 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
         dbzProps.put("interval.handling.mode", "string");
 
-        // Doris-owned = FILTERED (auto-create per-table publication); 
otherwise DISABLED
-        // (user-provided or legacy dbz_publication already present on PG).
+        // Always DISABLED; the publication always pre-exists: Doris creates 
it for all
+        // include_tables
+        // in initialize(); user-provided / legacy (dbz_publication) ones are 
already present on PG.
+        // FILTERED would make each split SET TABLE its single table -> flip 
publication -> data
+        // loss.
         String publicationName = resolvePublicationName(cdcConfig, jobId);
         String slotName = resolveSlotName(cdcConfig, jobId);
-        AutoCreateMode autocreateMode =
-                isPublicationDorisOwned(cdcConfig, jobId)
-                        ? AutoCreateMode.FILTERED
-                        : AutoCreateMode.DISABLED;
         dbzProps.put(PostgresConnectorConfig.PUBLICATION_NAME.name(), 
publicationName);
         dbzProps.put(
                 PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name(),
-                autocreateMode.getValue());
+                AutoCreateMode.DISABLED.getValue());
 
         configFactory.debeziumProperties(dbzProps);
 
@@ -279,7 +343,8 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         // support scan partition table
         configFactory.setIncludePartitionedTables(true);
 
-        // FE injects "true" on TVF path; from-to leaves it absent → default 
false.
+        // from-to: FE forces "true" (at-least-once, skip backfill); TVF: 
absent → false
+        // (exactly-once needs backfill).
         configFactory.skipSnapshotBackfill(
                 
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));
 
@@ -319,6 +384,10 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         PostgresDialect dialect = new PostgresDialect(sourceConfig);
         PostgresSourceFetchTaskContext taskContext =
                 new PostgresSourceFetchTaskContext(sourceConfig, dialect);
+        LOG.info(
+                "create snapshot reader for job {}, thread tag = 
debezium-snapshot-reader-{}",
+                config.getJobId(),
+                subtaskId);
         IncrementalSourceScanFetcher snapshotReader =
                 new IncrementalSourceScanFetcher(taskContext, subtaskId);
         return snapshotReader;
@@ -330,9 +399,13 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         PostgresDialect dialect = new PostgresDialect(sourceConfig);
         PostgresSourceFetchTaskContext taskContext =
                 new PostgresSourceFetchTaskContext(sourceConfig, dialect);
-        // subTaskId maybe add jobId?
+        int readerTag = Math.abs(config.getJobId().hashCode());
+        LOG.info(
+                "create binlog reader for job {}, thread tag = 
debezium-reader-{}",
+                config.getJobId(),
+                readerTag);
         IncrementalSourceStreamFetcher binlogReader =
-                new IncrementalSourceStreamFetcher(taskContext, 0);
+                new IncrementalSourceStreamFetcher(taskContext, readerTag);
         return binlogReader;
     }
 
diff --git a/fs_brokers/cdc_client/src/main/resources/application.properties 
b/fs_brokers/cdc_client/src/main/resources/application.properties
index d22c98d434b..a8d22bed9dd 100644
--- a/fs_brokers/cdc_client/src/main/resources/application.properties
+++ b/fs_brokers/cdc_client/src/main/resources/application.properties
@@ -20,4 +20,6 @@ server.port=9096
 backend.http.port=8040
 # see doris-meta/image/VERSION
 cluster.token=cluster-token
-spring.mvc.async.request-timeout=300000
\ No newline at end of file
+spring.mvc.async.request-timeout=300000
+# Max concurrent write-record tasks (override via be.conf 
cdc_client_java_opts=-Dpipeline.max-concurrent-tasks=N)
+# pipeline.max-concurrent-tasks=10
\ No newline at end of file
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.out
new file mode 100644
index 00000000000..9da45918b71
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_count_t1 --
+1007
+
+-- !select_count_t2 --
+1007
+
+-- !select_updates_t1 --
+1      99
+100    99
+500    99
+999    99
+
+-- !select_updates_t2 --
+1      99
+100    99
+500    99
+999    99
+
+-- !select_inserts_t1 --
+1001   concurrent_ins  1
+1002   concurrent_ins  1
+1003   concurrent_ins  1
+1004   concurrent_ins  1
+1005   concurrent_ins  1
+1006   concurrent_ins  1
+1007   concurrent_ins  1
+1008   concurrent_ins  1
+1009   concurrent_ins  1
+1010   concurrent_ins  1
+
+-- !select_inserts_t2 --
+1001   concurrent_ins  1
+1002   concurrent_ins  1
+1003   concurrent_ins  1
+1004   concurrent_ins  1
+1005   concurrent_ins  1
+1006   concurrent_ins  1
+1007   concurrent_ins  1
+1008   concurrent_ins  1
+1009   concurrent_ins  1
+1010   concurrent_ins  1
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.groovy
new file mode 100644
index 00000000000..6ecdffdea8d
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.groovy
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Multi-table from-to snapshot + concurrent DML: with >=2 tables each 
snapshot split flips the
+// shared publication to its single table, so a row written to a table while 
the publication
+// temporarily excludes it would be filtered on stream replay and lost. The 
fix keeps the
+// publication full-table, so no row is lost. Asserts every row of BOTH tables 
is synced.
+suite("test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_pg_snapshot_concurrent_dml_multi_table_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_snapshot_dml_multi_pg_t1"
+    def table2 = "streaming_snapshot_dml_multi_pg_t2"
+    def tables = [table1, table2]
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def totalRows = 1000
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    tables.each { sql """drop table if exists ${currentDb}.${it} force""" }
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ===== Prepare PG side: two tables, each 1000 snapshot rows =====
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            tables.each { t ->
+                sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${t}"""
+                sql """
+                create table ${pgDB}.${pgSchema}.${t} (
+                    id      integer PRIMARY KEY,
+                    tag     varchar(64),
+                    version integer
+                );
+                """
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${t} (id, tag, version)
+                       SELECT g, 'snap', 0 FROM generate_series(1, 
${totalRows}) g"""
+            }
+        }
+
+        // snapshot_split_size=10 + snapshot_parallelism=1 -> 100 serial 
splits per table, slow
+        // enough that the concurrent DML overlaps snapshot while the 
publication keeps flipping.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1},${table2}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "10",
+                    "snapshot_parallelism" = "1"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // Wait until the first snapshot split commits (slot created, snapshot 
in progress) so the
+        // DML below lands inside the snapshot window and overlaps the 
publication flipping.
+        Awaitility.await().atMost(120, SECONDS).pollInterval(1, 
SECONDS).until({
+            def c = sql """select SucceedTaskCount from jobs("type"="insert") 
where Name='${jobName}' and ExecuteType='STREAMING'"""
+            c.size() == 1 && (c.get(0).get(0).toString() as long) >= 1
+        })
+
+        // Concurrent DML on BOTH tables while still snapshotting. Same DML 
shape on each table.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            tables.each { t ->
+                for (int i = 1; i <= 10; i++) {
+                    sql """INSERT INTO ${pgDB}.${pgSchema}.${t} (id, tag, 
version) VALUES (${totalRows + i}, 'concurrent_ins', 1)"""
+                }
+                sql """UPDATE ${pgDB}.${pgSchema}.${t} SET version=99 WHERE id 
IN (1, 100, 500, 999)"""
+                sql """DELETE FROM ${pgDB}.${pgSchema}.${t} WHERE id IN (2, 
200, 800)"""
+            }
+        }
+
+        // Each table: 1000 + 10 inserts - 3 deletes = 1007 rows, 
updates/deletes/inserts applied.
+        def expectedRows = totalRows + 10 - 3
+        try {
+            Awaitility.await().atMost(600, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        boolean allOk = true
+                        for (def t : tables) {
+                            def showTbl = sql """show tables from ${currentDb} 
like '${t}'"""
+                            if (showTbl.size() == 0) {
+                                allOk = false
+                                break
+                            }
+                            def cnt = sql """select count(1) from 
${currentDb}.${t}"""
+                            def upd1 = sql """select version from 
${currentDb}.${t} where id=1"""
+                            def upd999 = sql """select version from 
${currentDb}.${t} where id=999"""
+                            def del2 = sql """select count(1) from 
${currentDb}.${t} where id=2"""
+                            def del800 = sql """select count(1) from 
${currentDb}.${t} where id=800"""
+                            def ins = sql """select count(1) from 
${currentDb}.${t} where id=${totalRows + 10}"""
+                            def v1 = upd1.size() == 0 ? null : 
upd1.get(0).get(0)
+                            def v999 = upd999.size() == 0 ? null : 
upd999.get(0).get(0)
+                            log.info("table=${t} cnt=${cnt} v1=${v1} 
v999=${v999} del2=${del2} del800=${del800} ins=${ins}")
+                            boolean ok = cnt.get(0).get(0) == expectedRows &&
+                                    v1 != null && v1.toString() == '99' &&
+                                    v999 != null && v999.toString() == '99' &&
+                                    del2.get(0).get(0) == 0 &&
+                                    del800.get(0).get(0) == 0 &&
+                                    ins.get(0).get(0) == 1
+                            if (!ok) {
+                                allOk = false
+                                break
+                            }
+                        }
+                        allOk
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_count_t1 """select count(1) from ${currentDb}.${table1}"""
+        qt_select_count_t2 """select count(1) from ${currentDb}.${table2}"""
+        qt_select_updates_t1 """select id, version from ${currentDb}.${table1} 
where id in (1, 100, 500, 999) order by id"""
+        qt_select_updates_t2 """select id, version from ${currentDb}.${table2} 
where id in (1, 100, 500, 999) order by id"""
+        qt_select_inserts_t1 """select id, tag, version from 
${currentDb}.${table1} where id > ${totalRows} order by id"""
+        qt_select_inserts_t2 """select id, tag, version from 
${currentDb}.${table2} where id > ${totalRows} order by id"""
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
index 03af48213aa..c4214145151 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
@@ -95,13 +95,13 @@ suite("test_streaming_postgres_job_special_offset", 
"p0,external,pg,external_doc
         sql """drop table if exists ${currentDb}.${table1} force"""
 
         // ===== Test 2: CREATE with initial, then ALTER with JSON LSN offset 
=====
-        // Pre-create a DUPLICATE KEY table so duplicate rows from 
re-consuming are visible
+        // UNIQUE KEY table: dedup at-least-once re-consume; this case only 
verifies ALTER offset LSN filtering.
         sql """
             CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} (
                 `id` int NULL,
                 `name` varchar(100) NULL
             ) ENGINE=OLAP
-            DUPLICATE KEY(`id`)
+            UNIQUE KEY(`id`)
             DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
             PROPERTIES ("replication_allocation" = "tag.location.default: 1")
         """
@@ -126,7 +126,7 @@ suite("test_streaming_postgres_job_special_offset", 
"p0,external,pg,external_doc
             """
         Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
             def result = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
-            return result[0][0] >= 2
+            return result[0][0] >= 3
         })
         qt_select_after_create """ SELECT * FROM ${currentDb}.${table1} ORDER 
BY id """
 
@@ -169,8 +169,11 @@ suite("test_streaming_postgres_job_special_offset", 
"p0,external,pg,external_doc
         // After ALTER to LSN mark, only data AFTER that LSN (id 30,31) should 
be synced
         Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
             def result = sql """SELECT count(*) FROM ${currentDb}.${table1} 
WHERE id IN (30, 31)"""
-            return result[0][0] >= 2
+            return result[0][0] == 2
         })
+        // mark LSN sits between 21 and 30; before-mark rows must never be 
read.
+        def beforeMark = sql """SELECT count(*) FROM ${currentDb}.${table1} 
WHERE id IN (20, 21)"""
+        assert beforeMark[0][0] == 0 : "rows before ALTER LSN must be skipped, 
found ${beforeMark[0][0]}"
         qt_select_after_alter """ SELECT * FROM ${currentDb}.${table1} ORDER 
BY id """
 
         // Step 3: ALTER with named mode should fail for CDC
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
index 4be3d3ac8cb..986b893299d 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
@@ -61,13 +61,16 @@ suite("test_streaming_insert_job_fetch_meta_error", 
"nonConcurrent") {
             );
         """
 
+        // GET_REMOTE_DATA_ERROR auto-resumes (PAUSED->PENDING->PAUSED); 
assert in the poll snapshot, not a separate read.
         try {
             Awaitility.await().atMost(120, SECONDS)
                     .pollInterval(2, SECONDS).until(
                     {
-                        def jobRes = sql """ select Status from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                        def jobRes = sql """ select Status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
                         log.info("jobRes: " + jobRes)
                         jobRes.size() == 1 && 
'PAUSED'.equals(jobRes.get(0).get(0))
+                                && jobRes.get(0).get(1) != null
+                                && 
jobRes.get(0).get(1).toString().contains("simulated S3 auth error")
                     }
             )
         } catch (Exception ex) {
@@ -78,10 +81,6 @@ suite("test_streaming_insert_job_fetch_meta_error", 
"nonConcurrent") {
             throw ex
         }
 
-        def jobStatus = sql """select Status, ErrorMsg from 
jobs("type"="insert") where Name='${jobName}'"""
-        assert jobStatus.get(0).get(0) == "PAUSED"
-        assert jobStatus.get(0).get(1).contains("simulated S3 auth error")
-
         sql """
             DROP JOB IF EXISTS where jobname = '${jobName}'
         """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to