This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5a243f57f4c branch-4.0: [Improve](streaming job) support postgres
partition table sync #60560 (#60614)
5a243f57f4c is described below
commit 5a243f57f4c3edd948c63ff4f9ac46e900fb4a9b
Author: wudi <[email protected]>
AuthorDate: Tue Feb 24 13:30:38 2026 +0800
branch-4.0: [Improve](streaming job) support postgres partition table sync
#60560 (#60614)
### What problem does this PR solve?
Cherry-picked from https://github.com/apache/doris/pull/60560, #60624
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../insert/streaming/StreamingMultiTblTask.java | 4 +-
.../connection/PostgresReplicationConnection.java | 932 +++++++++++++++++++++
.../reader/postgres/PostgresSourceReader.java | 12 +-
.../cdc/test_streaming_postgres_job_partition.out | 10 +
.../cdc/test_streaming_postgres_job.groovy | 8 +-
.../test_streaming_postgres_job_partition.groovy | 178 ++++
.../cdc/test_streaming_postgres_job_priv.groovy | 4 +-
7 files changed, 1140 insertions(+), 8 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 0bdd6262864..b6a4e8c939b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -113,7 +113,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
@Override
public void run() throws JobException {
if (getIsCanceled().get()) {
- log.info("task has been canceled, task id is {}", getTaskId());
+ log.info("streaming task has been canceled, task id is {}",
getTaskId());
return;
}
sendWriteRequest();
@@ -348,7 +348,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
log.warn("Failed to get task timeout reason, response: {}",
response);
}
} catch (ExecutionException | InterruptedException ex) {
- log.error("Send get fail reason request failed: ", ex);
+ log.error("Send get task fail reason request failed: ", ex);
}
return "";
}
diff --git
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
new file mode 100644
index 00000000000..ac372bbb8cb
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
@@ -0,0 +1,932 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Flink Cdc 3.5.0
+ *
+ * <p>Line 192~199: add publish_via_partition_root for partition table.
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements
ReplicationConnection {
+
+ private static Logger LOGGER =
LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+ private final String slotName;
+ private final String publicationName;
+ private final RelationalTableFilters tableFilter;
+ private final PostgresConnectorConfig.AutoCreateMode
publicationAutocreateMode;
+ private final PostgresConnectorConfig.LogicalDecoder plugin;
+ private final boolean dropSlotOnClose;
+ private final PostgresConnectorConfig connectorConfig;
+ private final Duration statusUpdateInterval;
+ private final MessageDecoder messageDecoder;
+ private final PostgresConnection jdbcConnection;
+ private final TypeRegistry typeRegistry;
+ private final Properties streamParams;
+
+ private Lsn defaultStartingPos;
+ private SlotCreationResult slotCreationInfo;
+ private boolean hasInitedSlot;
+
+ private Lsn endingPos;
+
+ /**
+ * Creates a new replication connection with the given params.
+ *
+ * @param config the JDBC configuration for the connection; may not be null
+ * @param slotName the name of the DB slot for logical replication; may
not be null
+ * @param publicationName the name of the DB publication for logical
replication; may not be
+ * null
+ * @param tableFilter the tables to watch of the DB publication for
logical replication; may not
+ * be null
+ * @param publicationAutocreateMode the mode for publication autocreation;
may not be null
+ * @param plugin decoder matching the server side plug-in used for
streaming changes; may not be
+ * null
+ * @param dropSlotOnClose whether the replication slot should be dropped
once the connection is
+ * closed
+ * @param statusUpdateInterval the interval at which the replication
connection should
+ * periodically send status
+ * @param doSnapshot whether the connector is doing snapshot
+ * @param jdbcConnection general PostgreSQL JDBC connection
+ * @param typeRegistry registry with PostgreSQL types
+ * @param streamParams additional parameters to pass to the replication
stream
+ * @param schema the schema; must not be null
+ * <p>updates to the server
+ */
+ private PostgresReplicationConnection(
+ PostgresConnectorConfig config,
+ String slotName,
+ String publicationName,
+ RelationalTableFilters tableFilter,
+ PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+ PostgresConnectorConfig.LogicalDecoder plugin,
+ boolean dropSlotOnClose,
+ boolean doSnapshot,
+ Duration statusUpdateInterval,
+ PostgresConnection jdbcConnection,
+ TypeRegistry typeRegistry,
+ Properties streamParams,
+ PostgresSchema schema) {
+ super(
+ addDefaultSettings(config.getJdbcConfig()),
+ PostgresConnection.FACTORY,
+ null,
+ null,
+ "\"",
+ "\"");
+
+ this.connectorConfig = config;
+ this.slotName = slotName;
+ this.publicationName = publicationName;
+ this.tableFilter = tableFilter;
+ this.publicationAutocreateMode = publicationAutocreateMode;
+ this.plugin = plugin;
+ this.dropSlotOnClose = dropSlotOnClose;
+ this.statusUpdateInterval = statusUpdateInterval;
+ this.messageDecoder =
+ plugin.messageDecoder(new MessageDecoderContext(config,
schema), jdbcConnection);
+ this.jdbcConnection = jdbcConnection;
+ this.typeRegistry = typeRegistry;
+ this.streamParams = streamParams;
+ this.slotCreationInfo = null;
+ this.hasInitedSlot = false;
+ }
+
+ private static JdbcConfiguration addDefaultSettings(JdbcConfiguration
configuration) {
+ // first copy the parent's default settings...
+ // then set some additional replication specific settings
+ return JdbcConfiguration.adapt(
+ PostgresConnection.addDefaultSettings(
+ configuration,
PostgresConnection.CONNECTION_STREAMING)
+ .edit()
+ .with("replication", "database")
+ .with(
+ "preferQueryMode",
+ "simple") // replication protocol only
supports simple query mode
+ .build());
+ }
+
+ private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException,
InterruptedException {
+ try (PostgresConnection connection =
+ new PostgresConnection(
+ connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_SLOT_INFO)) {
+ return connection.readReplicationSlotInfo(slotName,
plugin.getPostgresPluginName());
+ }
+ }
+
+ protected void initPublication() {
+ String createPublicationStmt;
+ String tableFilterString = null;
+ if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+ LOGGER.info("Initializing PgOutput logical decoder publication");
+ try {
+ // Unless the autocommit is disabled the SELECT publication
query will stay running
+ Connection conn = pgConnection();
+ conn.setAutoCommit(false);
+
+ String selectPublication =
+ String.format(
+ "SELECT COUNT(1) FROM pg_publication WHERE
pubname = '%s'",
+ publicationName);
+ try (Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(selectPublication)) {
+ if (rs.next()) {
+ Long count = rs.getLong(1);
+ // Close eagerly as the transaction might stay running
+ if (count == 0L) {
+ LOGGER.info(
+ "Creating new publication '{}' for plugin
'{}'",
+ publicationName,
+ plugin);
+ switch (publicationAutocreateMode) {
+ case DISABLED:
+ throw new ConnectException(
+ "Publication autocreation is
disabled, please create one and restart the connector.");
+ case ALL_TABLES:
+ boolean supportPartitionRoot =
((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13);
+ createPublicationStmt =
supportPartitionRoot
+ ? String.format(
+ "CREATE PUBLICATION %s FOR
ALL TABLES WITH (publish_via_partition_root = true);",
+ publicationName)
+ : String.format(
+ "CREATE PUBLICATION %s FOR
ALL TABLES;",
+ publicationName);
+ LOGGER.info(
+ "Creating Publication with
statement '{}'",
+ createPublicationStmt);
+ // Publication doesn't exist, create it.
+ stmt.execute(createPublicationStmt);
+ break;
+ case FILTERED:
+ createOrUpdatePublicationModeFilterted(
+ tableFilterString, stmt, false);
+ break;
+ }
+ } else {
+ switch (publicationAutocreateMode) {
+ case FILTERED:
+ createOrUpdatePublicationModeFilterted(
+ tableFilterString, stmt, true);
+ break;
+ default:
+ LOGGER.trace(
+ "A logical publication named '{}'
for plugin '{}' and database '{}' is already active on the server "
+ + "and will be used by the
plugin",
+ publicationName,
+ plugin,
+ database());
+ }
+ }
+ }
+ }
+ conn.commit();
+ conn.setAutoCommit(true);
+ } catch (SQLException e) {
+ throw new JdbcConnectionException(e);
+ }
+ }
+ }
+
+ private void createOrUpdatePublicationModeFilterted(
+ String tableFilterString, Statement stmt, boolean isUpdate) {
+ String createOrUpdatePublicationStmt;
+ try {
+ Set<TableId> tablesToCapture = determineCapturedTables();
+ tableFilterString =
+ tablesToCapture.stream()
+ .map(TableId::toDoubleQuotedString)
+ .collect(Collectors.joining(", "));
+ if (tableFilterString.isEmpty()) {
+ throw new DebeziumException(
+ String.format(
+ "No table filters found for filtered
publication %s",
+ publicationName));
+ }
+ createOrUpdatePublicationStmt =
+ isUpdate
+ ? String.format(
+ "ALTER PUBLICATION %s SET TABLE %s;",
+ publicationName, tableFilterString)
+ : String.format(
+ "CREATE PUBLICATION %s FOR TABLE %s;",
+ publicationName, tableFilterString);
+ LOGGER.info(
+ isUpdate
+ ? "Updating Publication with statement '{}'"
+ : "Creating Publication with statement '{}'",
+ createOrUpdatePublicationStmt);
+ stmt.execute(createOrUpdatePublicationStmt);
+ } catch (Exception e) {
+ throw new ConnectException(
+ String.format(
+ "Unable to %s filtered publication %s for %s",
+ isUpdate ? "update" : "create", publicationName,
tableFilterString),
+ e);
+ }
+ }
+
+ private Set<TableId> determineCapturedTables() throws Exception {
+ Set<TableId> allTableIds =
jdbcConnection.getAllTableIds(connectorConfig.databaseName());
+
+ Set<TableId> capturedTables = new HashSet<>();
+
+ for (TableId tableId : allTableIds) {
+ if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
+ LOGGER.trace("Adding table {} to the list of captured tables",
tableId);
+ capturedTables.add(tableId);
+ } else {
+ LOGGER.trace(
+ "Ignoring table {} as it's not included in the filter
configuration",
+ tableId);
+ }
+ }
+
+ return capturedTables.stream()
+ .sorted()
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ protected void initReplicationSlot() throws SQLException,
InterruptedException {
+ ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
+
+ boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID ==
slotInfo;
+ try {
+ // there's no info for this plugin and slot so create a new slot
+ if (shouldCreateSlot) {
+ this.createReplicationSlot();
+ }
+
+ // replication connection does not support parsing of SQL
statements so we need to
+ // create
+ // the connection without executing on connect statements - see
JDBC opt
+ // preferQueryMode=simple
+ pgConnection();
+ final String identifySystemStatement = "IDENTIFY_SYSTEM";
+ LOGGER.debug(
+ "running '{}' to validate replication connection",
identifySystemStatement);
+ final Lsn xlogStart =
+ queryAndMap(
+ identifySystemStatement,
+ rs -> {
+ if (!rs.next()) {
+ throw new IllegalStateException(
+ "The DB connection is not a valid
replication connection");
+ }
+ String xlogpos = rs.getString("xlogpos");
+ LOGGER.debug("received latest xlogpos '{}'",
xlogpos);
+ return Lsn.valueOf(xlogpos);
+ });
+
+ if (slotCreationInfo != null) {
+ this.defaultStartingPos = slotCreationInfo.startLsn();
+ } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
+ // this is a new slot or we weren't able to read a valid flush
LSN pos, so we always
+ // start from the xlog pos that was reported
+ this.defaultStartingPos = xlogStart;
+ } else {
+ Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
+ this.defaultStartingPos =
+ latestFlushedLsn.compareTo(xlogStart) < 0 ?
latestFlushedLsn : xlogStart;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("found previous flushed LSN '{}'",
latestFlushedLsn);
+ }
+ }
+ hasInitedSlot = true;
+ } catch (SQLException e) {
+ throw new JdbcConnectionException(e);
+ }
+ }
+
+ // Temporary replication slots is a new feature of PostgreSQL 10
+ private boolean useTemporarySlot() throws SQLException {
+ // Temporary replication slots cannot be used due to connection restart
+ // when finding WAL position
+ // return dropSlotOnClose &&
pgConnection().haveMinimumServerVersion(ServerVersion.v10);
+ return false;
+ }
+
+ /**
+ * creating a replication connection and starting to stream involves a few
steps: 1. we create
+ * the connection and ensure that a. the slot exists b. the slot isn't
currently being used 2.
+ * we query to get our potential start position in the slot (lsn) 3. we
try and start streaming,
+ * depending on our options (such as in wal2json) this may fail, which can
result in the
+ * connection being killed and we need to start the process over if we are
using a temporary
+ * slot 4. actually start the streamer
+ *
+ * <p>This method takes care of all of these and this method queries for a
default starting
+ * position If you know where you are starting from you should call {@link
#startStreaming(Lsn,
+ * WalPositionLocator)}, this method delegates to that method
+ *
+ * @return
+ * @throws SQLException
+ * @throws InterruptedException
+ */
+ @Override
+ public ReplicationStream startStreaming(WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ return startStreaming(null, walPosition);
+ }
+
+ @Override
+ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator
walPosition)
+ throws SQLException, InterruptedException {
+ initConnection();
+
+ connect();
+ if (offset == null || !offset.isValid()) {
+ offset = defaultStartingPos;
+ }
+ Lsn lsn = offset;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("starting streaming from LSN '{}'", lsn);
+ }
+
+ final int maxRetries = connectorConfig.maxRetries();
+ final Duration delay = connectorConfig.retryDelay();
+ int tryCount = 0;
+ while (true) {
+ try {
+ return createReplicationStream(lsn, walPosition);
+ } catch (Exception e) {
+ String message = "Failed to start replication stream at " +
lsn;
+ if (++tryCount > maxRetries) {
+ if (e.getMessage().matches(".*replication slot .* is
active.*")) {
+ message +=
+ "; when setting up multiple connectors for the
same database host, please make sure to use a distinct replication slot name
for each.";
+ }
+ throw new DebeziumException(message, e);
+ } else {
+ LOGGER.warn(
+ message + ", waiting for {} ms and retrying,
attempt number {} over {}",
+ delay,
+ tryCount,
+ maxRetries);
+ final Metronome metronome = Metronome.sleeper(delay,
Clock.SYSTEM);
+ metronome.pause();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void initConnection() throws SQLException, InterruptedException {
+ // See
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+ // For pgoutput specifically, the publication must be created before
the slot.
+ initPublication();
+ if (!hasInitedSlot) {
+ initReplicationSlot();
+ }
+ }
+
+ @Override
+ public Optional<SlotCreationResult> createReplicationSlot() throws
SQLException {
+ // note that some of these options are only supported in Postgres
9.4+, additionally
+ // the options are not yet exported by the jdbc api wrapper,
therefore, we just do
+ // this ourselves but eventually this should be moved back to the jdbc
API
+ // see https://github.com/pgjdbc/pgjdbc/issues/1305
+
+ LOGGER.debug("Creating new replication slot '{}' for plugin '{}'",
slotName, plugin);
+ String tempPart = "";
+ // Exported snapshots are supported in Postgres 9.4+
+ boolean canExportSnapshot =
pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
+ if ((dropSlotOnClose) && !canExportSnapshot) {
+ LOGGER.warn(
+ "A slot marked as temporary or with an exported snapshot
was created, "
+ + "but not on a supported version of Postgres,
ignoring!");
+ }
+ if (useTemporarySlot()) {
+ tempPart = "TEMPORARY";
+ }
+
+ // See
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+ // For pgoutput specifically, the publication must be created prior to
the slot.
+ initPublication();
+
+ try (Statement stmt = pgConnection().createStatement()) {
+ String createCommand =
+ String.format(
+ "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
+ slotName, tempPart,
plugin.getPostgresPluginName());
+ LOGGER.info("Creating replication slot with command {}",
createCommand);
+ stmt.execute(createCommand);
+ // when we are in Postgres 9.4+, we can parse the slot creation
info,
+ // otherwise, it returns nothing
+ if (canExportSnapshot) {
+ this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
+ }
+
+ return Optional.ofNullable(slotCreationInfo);
+ }
+ }
+
+ protected BaseConnection pgConnection() throws SQLException {
+ return (BaseConnection) connection(false);
+ }
+
+ private SlotCreationResult parseSlotCreation(ResultSet rs) {
+ try {
+ if (rs.next()) {
+ String slotName = rs.getString("slot_name");
+ String startPoint = rs.getString("consistent_point");
+ String snapName = rs.getString("snapshot_name");
+ String pluginName = rs.getString("output_plugin");
+
+ return new SlotCreationResult(slotName, startPoint, snapName,
pluginName);
+ } else {
+ throw new ConnectException("No replication slot found");
+ }
+ } catch (SQLException ex) {
+ throw new ConnectException("Unable to parse
create_replication_slot response", ex);
+ }
+ }
+
+ private ReplicationStream createReplicationStream(
+ final Lsn startLsn, WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ PGReplicationStream s;
+
+ try {
+ try {
+ s =
+ startPgReplicationStream(
+ startLsn,
+ plugin.forceRds()
+ ?
messageDecoder::optionsWithoutMetadata
+ : messageDecoder::optionsWithMetadata);
+ messageDecoder.setContainsMetadata(plugin.forceRds() ? false :
true);
+ } catch (PSQLException e) {
+ LOGGER.debug(
+ "Could not register for streaming, retrying without
optional options", e);
+
+ // re-init the slot after a failed start of slot, as this
+ // may have closed the slot
+ if (useTemporarySlot()) {
+ initReplicationSlot();
+ }
+
+ s =
+ startPgReplicationStream(
+ startLsn,
+ plugin.forceRds()
+ ?
messageDecoder::optionsWithoutMetadata
+ : messageDecoder::optionsWithMetadata);
+ messageDecoder.setContainsMetadata(plugin.forceRds() ? false :
true);
+ }
+ } catch (PSQLException e) {
+ if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
+ // It is possible we are connecting to an old wal2json plug-in
+ LOGGER.warn(
+ "Could not register for streaming with metadata in
messages, falling back to messages without metadata");
+
+ // re-init the slot after a failed start of slot, as this
+ // may have closed the slot
+ if (useTemporarySlot()) {
+ initReplicationSlot();
+ }
+
+ s = startPgReplicationStream(startLsn,
messageDecoder::optionsWithoutMetadata);
+ messageDecoder.setContainsMetadata(false);
+ } else if (e.getMessage()
+ .matches("(?s)ERROR: requested WAL segment .* has already
been removed.*")) {
+ LOGGER.error("Cannot rewind to last processed WAL position",
e);
+ throw new ConnectException(
+ "The offset to start reading from has been removed
from the database write-ahead log. Create a new snapshot and consider setting
of PostgreSQL parameter wal_keep_segments = 0.");
+ } else {
+ throw e;
+ }
+ }
+
+ final PGReplicationStream stream = s;
+
+ return new ReplicationStream() {
+
+ private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
+ private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+ private ExecutorService keepAliveExecutor = null;
+ private AtomicBoolean keepAliveRunning;
+ private final Metronome metronome =
+ Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
+
+ // make sure this is volatile since multiple threads may be
interested in this value
+ private volatile Lsn lastReceivedLsn;
+
+ @Override
+ public void read(ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ processWarnings(false);
+ ByteBuffer read = stream.read();
+ final Lsn lastReceiveLsn =
Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace(
+ "Streaming requested from LSN {}, received LSN {}",
+ startLsn,
+ lastReceiveLsn);
+ if (reachEnd(lastReceivedLsn)) {
+ lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace("Received message at LSN {}",
lastReceivedLsn);
+ processor.process(new ReplicationMessage.NoopMessage(null,
null));
+ return;
+ }
+ if (messageDecoder.shouldMessageBeSkipped(
+ read, lastReceiveLsn, startLsn, walPosition)) {
+ return;
+ }
+ deserializeMessages(read, processor);
+ }
+
+ @Override
+ public boolean readPending(ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ processWarnings(false);
+ ByteBuffer read = stream.readPending();
+ final Lsn lastReceiveLsn =
Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace(
+ "Streaming requested from LSN {}, received LSN {}",
+ startLsn,
+ lastReceiveLsn);
+
+ if (reachEnd(lastReceiveLsn)) {
+ lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace("Received message at LSN {}",
lastReceivedLsn);
+ processor.process(new ReplicationMessage.NoopMessage(null,
null));
+ return true;
+ }
+
+ if (read == null) {
+ return false;
+ }
+
+ if (messageDecoder.shouldMessageBeSkipped(
+ read, lastReceiveLsn, startLsn, walPosition)) {
+ return true;
+ }
+
+ deserializeMessages(read, processor);
+
+ return true;
+ }
+
+ private void deserializeMessages(
+ ByteBuffer buffer, ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
+ messageDecoder.processMessage(buffer, processor, typeRegistry);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ processWarnings(true);
+ stream.close();
+ }
+
+ @Override
+ public void flushLsn(Lsn lsn) throws SQLException {
+ doFlushLsn(lsn);
+ }
+
+ private void doFlushLsn(Lsn lsn) throws SQLException {
+ stream.setFlushedLSN(lsn.asLogSequenceNumber());
+ stream.setAppliedLSN(lsn.asLogSequenceNumber());
+
+ stream.forceUpdateStatus();
+ }
+
+ @Override
+ public Lsn lastReceivedLsn() {
+ return lastReceivedLsn;
+ }
+
+ @Override
+ public void startKeepAlive(ExecutorService service) {
+ if (keepAliveExecutor == null) {
+ keepAliveExecutor = service;
+ keepAliveRunning = new AtomicBoolean(true);
+ keepAliveExecutor.submit(
+ () -> {
+ while (keepAliveRunning.get()) {
+ try {
+ LOGGER.trace(
+ "Forcing status update with
replication stream");
+ stream.forceUpdateStatus();
+ metronome.pause();
+ } catch (Exception exp) {
+ throw new RuntimeException(
+ "received unexpected exception
will perform keep alive",
+ exp);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void stopKeepAlive() {
+ if (keepAliveExecutor != null) {
+ keepAliveRunning.set(false);
+ keepAliveExecutor.shutdownNow();
+ keepAliveExecutor = null;
+ }
+ }
+
+ private void processWarnings(final boolean forced) throws
SQLException {
+ if (--warningCheckCounter == 0 || forced) {
+ warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+ for (SQLWarning w = connection().getWarnings();
+ w != null;
+ w = w.getNextWarning()) {
+ LOGGER.debug(
+ "Server-side message: '{}', state = {}, code =
{}",
+ w.getMessage(),
+ w.getSQLState(),
+ w.getErrorCode());
+ }
+ connection().clearWarnings();
+ }
+ }
+
+ @Override
+ public Lsn startLsn() {
+ return startLsn;
+ }
+
+ private boolean reachEnd(Lsn receivedLsn) {
+ if (receivedLsn == null) {
+ return false;
+ }
+ return endingPos != null
+ && (!endingPos.isNonStopping())
+ && endingPos.compareTo(receivedLsn) < 0;
+ }
+ };
+ }
+
+ public void setEndingPos(Lsn endingPos) {
+ this.endingPos = endingPos;
+ }
+
+ private PGReplicationStream startPgReplicationStream(
+ final Lsn lsn,
+ BiFunction<
+ ChainedLogicalStreamBuilder,
+ Function<Integer, Boolean>,
+ ChainedLogicalStreamBuilder>
+ configurator)
+ throws SQLException {
+ assert lsn != null;
+ ChainedLogicalStreamBuilder streamBuilder =
+ pgConnection()
+ .getReplicationAPI()
+ .replicationStream()
+ .logical()
+ .withSlotName("\"" + slotName + "\"")
+ .withStartPosition(lsn.asLogSequenceNumber())
+ .withSlotOptions(streamParams);
+ streamBuilder = configurator.apply(streamBuilder,
this::hasMinimumVersion);
+
+ if (statusUpdateInterval != null && statusUpdateInterval.toMillis() >
0) {
+ streamBuilder.withStatusInterval(
+ toIntExact(statusUpdateInterval.toMillis()),
TimeUnit.MILLISECONDS);
+ }
+
+ PGReplicationStream stream = streamBuilder.start();
+
+ // TODO DBZ-508 get rid of this
+ // Needed by tests when connections are opened and closed in a fast
sequence
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ }
+ stream.forceUpdateStatus();
+ return stream;
+ }
+
+ private Boolean hasMinimumVersion(int version) {
+ try {
+ return pgConnection().haveMinimumServerVersion(version);
+ } catch (SQLException e) {
+ throw new DebeziumException(e);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ close(true);
+ }
+
+ public synchronized void close(boolean dropSlot) {
+ try {
+ LOGGER.debug("Closing message decoder");
+ messageDecoder.close();
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while closing message decoder", e);
+ }
+
+ try {
+ LOGGER.debug("Closing replication connection");
+ super.close();
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while closing Postgres connection",
e);
+ }
+ if (dropSlotOnClose && dropSlot) {
+ // we're dropping the replication slot via a regular - i.e. not a
replication -
+ // connection
+ try (PostgresConnection connection =
+ new PostgresConnection(
+ connectorConfig.getJdbcConfig(),
+ PostgresConnection.CONNECTION_DROP_SLOT)) {
+ connection.dropReplicationSlot(slotName);
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while dropping replication
slot", e);
+ }
+ }
+ }
+
+ @Override
+ public void reconnect() throws SQLException {
+ close(false);
+ // Don't re-execute initial commands on reconnection
+ connection(false);
+ }
+
+ protected static class ReplicationConnectionBuilder implements Builder {
+
+ private final PostgresConnectorConfig config;
+ private String slotName = DEFAULT_SLOT_NAME;
+ private String publicationName = DEFAULT_PUBLICATION_NAME;
+ private RelationalTableFilters tableFilter;
+ private PostgresConnectorConfig.AutoCreateMode
publicationAutocreateMode =
+ PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
+ private PostgresConnectorConfig.LogicalDecoder plugin =
+ PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
+ private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
+ private Duration statusUpdateIntervalVal;
+ private boolean doSnapshot;
+ private TypeRegistry typeRegistry;
+ private PostgresSchema schema;
+ private Properties slotStreamParams = new Properties();
+ private PostgresConnection jdbcConnection;
+
+ protected ReplicationConnectionBuilder(PostgresConnectorConfig config)
{
+ assert config != null;
+ this.config = config;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder withSlot(final String slotName) {
+ assert slotName != null;
+ this.slotName = slotName;
+ return this;
+ }
+
+ @Override
+ public Builder withPublication(String publicationName) {
+ assert publicationName != null;
+ this.publicationName = publicationName;
+ return this;
+ }
+
+ @Override
+ public Builder withTableFilter(RelationalTableFilters tableFilter) {
+ assert tableFilter != null;
+ this.tableFilter = tableFilter;
+ return this;
+ }
+
+ @Override
+ public Builder withPublicationAutocreateMode(
+ PostgresConnectorConfig.AutoCreateMode
publicationAutocreateMode) {
+ assert publicationName != null;
+ this.publicationAutocreateMode = publicationAutocreateMode;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder withPlugin(
+ final PostgresConnectorConfig.LogicalDecoder plugin) {
+ assert plugin != null;
+ this.plugin = plugin;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder dropSlotOnClose(final boolean
dropSlotOnClose) {
+ this.dropSlotOnClose = dropSlotOnClose;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder streamParams(final String
slotStreamParams) {
+ if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
+ this.slotStreamParams = new Properties();
+ String[] paramsWithValues = slotStreamParams.split(";");
+ for (String paramsWithValue : paramsWithValues) {
+ String[] paramAndValue = paramsWithValue.split("=");
+ if (paramAndValue.length == 2) {
+ this.slotStreamParams.setProperty(paramAndValue[0],
paramAndValue[1]);
+ } else {
+ LOGGER.warn(
+ "The following STREAM_PARAMS value is invalid:
{}",
+ paramsWithValue);
+ }
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder statusUpdateInterval(
+ final Duration statusUpdateInterval) {
+ this.statusUpdateIntervalVal = statusUpdateInterval;
+ return this;
+ }
+
+ @Override
+ public Builder doSnapshot(boolean doSnapshot) {
+ this.doSnapshot = doSnapshot;
+ return this;
+ }
+
+ @Override
+ public Builder jdbcMetadataConnection(PostgresConnection
jdbcConnection) {
+ this.jdbcConnection = jdbcConnection;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnection build() {
+ assert plugin != null : "Decoding plugin name is not set";
+ return new PostgresReplicationConnection(
+ config,
+ slotName,
+ publicationName,
+ tableFilter,
+ publicationAutocreateMode,
+ plugin,
+ dropSlotOnClose,
+ doSnapshot,
+ statusUpdateIntervalVal,
+ jdbcConnection,
+ typeRegistry,
+ slotStreamParams,
+ schema);
+ }
+
+ @Override
+ public Builder withTypeRegistry(TypeRegistry typeRegistry) {
+ this.typeRegistry = typeRegistry;
+ return this;
+ }
+
+ @Override
+ public Builder withSchema(PostgresSchema schema) {
+ this.schema = schema;
+ return this;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index c7effe7a967..5a9fa095941 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -158,16 +158,19 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
String hostname = props.getProperty("PGHOST");
String port = props.getProperty("PGPORT");
- String database = props.getProperty("PGDBNAME");
+ String databaseFromUrl = props.getProperty("PGDBNAME");
Preconditions.checkNotNull(hostname, "host is required");
Preconditions.checkNotNull(port, "port is required");
- Preconditions.checkNotNull(database, "database is required");
configFactory.hostname(hostname);
configFactory.port(Integer.parseInt(port));
configFactory.username(cdcConfig.get(DataSourceConfigKeys.USER));
configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
- configFactory.database(database);
+
+ String database = cdcConfig.get(DataSourceConfigKeys.DATABASE);
+ String finalDatabase = StringUtils.isNotEmpty(database) ? database :
databaseFromUrl;
+ Preconditions.checkNotNull(finalDatabase, "database is required");
+ configFactory.database(finalDatabase);
String schema = cdcConfig.get(DataSourceConfigKeys.SCHEMA);
Preconditions.checkNotNull(schema, "schema is required");
@@ -219,6 +222,9 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
configFactory.heartbeatInterval(
Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS));
+ // support scan partition table
+ configFactory.setIncludePartitionedTables(true);
+
// subtaskId use pg create slot in snapshot phase, slotname is
slot_name_subtaskId
return configFactory.create(subtaskId);
}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
new file mode 100644
index 00000000000..331d1fe2040
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_orders_partition_snapshot --
+1 1001 2024-01-10
+2 1002 2024-02-05
+
+-- !select_orders_partition_binlog_all --
+2 2002 2024-02-05
+3 1003 2024-01-20
+4 1004 2024-03-15
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index ba2c1247016..59be0f77d7a 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -135,11 +135,13 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
// mock incremental into
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES
('Doris',18);"""
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE name = 'Doris'; """
+ log.info("xminResult: " + xminResult)
sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 10 WHERE
name = 'B1';"""
sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name =
'A1';"""
}
- sleep(30000); // wait for cdc incremental data
+ sleep(60000); // wait for cdc incremental data
// check incremental data
qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc
"""
@@ -156,9 +158,11 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
// mock incremental into again
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES
('Apache',40);"""
+ def xminResult1 = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE name = 'Apache'; """
+ log.info("xminResult1: " + xminResult1)
}
- sleep(30000); // wait for cdc incremental data
+ sleep(60000); // wait for cdc incremental data
// check incremental data
qt_select_next_binlog_table1 """ SELECT * FROM ${table1} order by name
asc """
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
new file mode 100644
index 00000000000..e68b0302d51
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_partition",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_partition_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_orders"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // 1. create postgres partition table and insert snapshot data
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+
+ sql """
+ CREATE TABLE ${pgSchema}.${table1} (
+ id BIGINT,
+ user_id BIGINT,
+ order_date DATE,
+ PRIMARY KEY (id, order_date)
+ ) PARTITION BY RANGE (order_date)
+ """
+
+ // create two partitions: 2024-01, 2024-02
+ sql """CREATE TABLE ${table1}_p202401 PARTITION OF
${pgSchema}.${table1}
+ FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')"""
+ sql """CREATE TABLE ${table1}_p202402 PARTITION OF
${pgSchema}.${table1}
+ FOR VALUES FROM ('2024-02-01') TO ('2024-03-01')"""
+
+ // make snapshot data, insert into two partitions
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
VALUES (1, 1001, DATE '2024-01-10');"""
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
VALUES (2, 1002, DATE '2024-02-05');"""
+ }
+
+ // 2. create streaming job
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // wait snapshot data sync completed
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert")
+ where Name = '${jobName}'
and ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ // 3. check snapshot data
+ qt_select_orders_partition_snapshot """
+ SELECT id, user_id, order_date
+ FROM ${table1}
+ ORDER BY id
+ """
+
+ // 4. mock insert, update, delete and create new partition
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // insert
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
+ VALUES (3, 1003, DATE '2024-01-20');"""
+
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE id = 3"""
+ log.info("xminResult: " + xminResult)
+
+ // update
+ sql """UPDATE ${pgSchema}.${table1}
+ SET user_id = 2002
+ WHERE id = 2 AND order_date = DATE '2024-02-05';"""
+
+ // delete
+ sql """DELETE FROM ${pgSchema}.${table1}
+ WHERE id = 1 AND order_date = DATE '2024-01-10';"""
+
+ // create new partition and insert data
+ sql """CREATE TABLE ${table1}_p202403 PARTITION OF
${pgSchema}.${table1}
+ FOR VALUES FROM ('2024-03-01') TO ('2024-04-01')"""
+
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
+ VALUES (4, 1004, DATE '2024-03-15');"""
+
+ def xminResult1 = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE id = 4"""
+ log.info("xminResult1: " + xminResult1)
+ }
+
+ // wait for all incremental data
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert")
+ where Name = '${jobName}'
and ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ jobSuccendCount.size() == 1 && '3' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobInfo = sql """
+ select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(1) == "RUNNING"
+
+ // check binlog data
+ qt_select_orders_partition_binlog_all """
+ SELECT id, user_id, order_date
+ FROM ${table1}
+ ORDER BY id
+ """
+
+ sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 9c0cd6a464c..5747438b717 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -134,8 +134,10 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
)
// mock incremental into
- connect("${newPgUser}", "${newPgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age)
VALUES ('Doris',18);"""
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgDB}.${pgSchema}.${tableName} WHERE name = 'Doris';"""
+ log.info("xminResult: " + xminResult)
}
Awaitility.await().atMost(300, SECONDS)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]