This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c210258f5ca [Improve](Streamingjob) support only snapshot sync for
mysql and pg (#61389)
c210258f5ca is described below
commit c210258f5ca8d13cd34599fe3348cd551364086d
Author: wudi <[email protected]>
AuthorDate: Mon Mar 23 14:22:35 2026 +0800
[Improve](Streamingjob) support only snapshot sync for mysql and pg (#61389)
### What problem does this PR solve?
#### Background
StreamingJob currently supports two offset modes:
- `initial`: full snapshot + continuous incremental replication
- `latest`: incremental replication only (no snapshot)
There is no way to perform a one-time full sync and stop. This is
needed for data migration scenarios where only a point-in-time full
copy is required, without ongoing replication.
#### Usage
Set `offset=snapshot` when creating a StreamingJob:
```sql
CREATE JOB mysql_db_sync
ON STREAMING
FROM MYSQL (
...
"user" = "root",
"password" = "",
"database" = "db",
"include_tables" = "user_info,student",
"offset" = "snapshot"
)
TO DATABASE target_test_db (
)
```
The job will perform a full table snapshot and automatically transition
to FINISHED once all data is synced. No binlog/WAL subscription is
established.
#### Design
The implementation centers on a hasReachedEnd() signal in
SourceOffsetProvider:
- FE: JdbcSourceOffsetProvider.hasReachedEnd() returns true when in
snapshot-only mode and all snapshot splits have been consumed
(finishedSplits non-empty, remainingSplits empty).
StreamingInsertJob.onStreamTaskSuccess() checks hasReachedEnd() before creating
the next task — if true, the job is marked FINISHED.
- BE (cdc_client): snapshot maps to StartupOptions.snapshot() for both
MySQL and PostgreSQL connectors. The chunk-split path is
reused from initial mode.
- Crash recovery: if FE crashes before persisting FINISHED, the job
resumes via PAUSED→PENDING. handlePendingState() calls
replayOffsetProviderIfNeed() then checks hasReachedEnd() — if all splits
are already finished, the job transitions directly to
FINISHED without creating any new task.
#### Testing
Added regression tests for both MySQL and PostgreSQL:
- test_streaming_mysql_job_snapshot.groovy
- test_streaming_postgres_job_snapshot.groovy
Both tests verify:
1. All existing data is synced correctly after job finishes
2. Job status transitions to FINISHED
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 3 +-
.../streaming/DataSourceConfigValidator.java | 3 +-
.../insert/streaming/StreamingInsertJob.java | 10 ++
.../streaming/StreamingJobSchedulerTask.java | 6 ++
.../doris/job/offset/SourceOffsetProvider.java | 9 ++
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 34 +++++-
.../source/reader/JdbcIncrementalSourceReader.java | 3 +-
.../source/reader/mysql/MySqlSourceReader.java | 7 +-
.../reader/postgres/PostgresSourceReader.java | 2 +
.../cdc/test_streaming_mysql_job_snapshot.out | 9 ++
.../cdc/test_streaming_postgres_job_snapshot.out | 9 ++
.../cdc/test_streaming_mysql_job_snapshot.groovy | 111 ++++++++++++++++++++
.../test_streaming_postgres_job_snapshot.groovy | 114 +++++++++++++++++++++
13 files changed, 309 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index b2bda583beb..47ee5f21d27 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -27,11 +27,12 @@ public class DataSourceConfigKeys {
public static final String SCHEMA = "schema";
public static final String INCLUDE_TABLES = "include_tables";
public static final String EXCLUDE_TABLES = "exclude_tables";
- // initial,earliest,latest,{binlog,postion},\d{13}
+ // initial,earliest,latest,snapshot,{binlog,position},\d{13}
public static final String OFFSET = "offset";
public static final String OFFSET_INITIAL = "initial";
public static final String OFFSET_EARLIEST = "earliest";
public static final String OFFSET_LATEST = "latest";
+ public static final String OFFSET_SNAPSHOT = "snapshot";
public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 63efaf296cb..b75e202b1a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -110,7 +110,8 @@ public class DataSourceConfigValidator {
if (key.equals(DataSourceConfigKeys.OFFSET)
&& !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
- || value.equals(DataSourceConfigKeys.OFFSET_LATEST))) {
+ || value.equals(DataSourceConfigKeys.OFFSET_LATEST)
+ || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
return false;
}
return true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index be5c70d864a..c999b9d99e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -630,6 +630,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+ if (offsetProvider.hasReachedEnd()) {
+ // offset provider has reached a natural end, mark job as
finished
+ log.info("Streaming insert job {} source data fully consumed,
marking job as FINISHED", getJobId());
+ updateJobStatus(JobStatus.FINISHED);
+ return;
+ }
AbstractStreamingTask nextTask = createStreamingTask();
this.runningStreamTask = nextTask;
log.info("Streaming insert job {} create next streaming insert
task {} after task {} success",
@@ -1262,6 +1268,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ public boolean hasReachedEnd() {
+ return offsetProvider != null && offsetProvider.hasReachedEnd();
+ }
+
/**
* 1. Clean offset info in ms (s3 tvf)
* 2. Clean chunk info in meta table (jdbc)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 8df18f1ee63..95ace617a7e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -66,6 +66,12 @@ public class StreamingJobSchedulerTask extends AbstractTask {
}
}
streamingInsertJob.replayOffsetProviderIfNeed();
+ if (streamingInsertJob.hasReachedEnd()) {
+ // Source already fully consumed (e.g. snapshot-only mode
recovered after FE restart).
+ // Transition directly to FINISHED without creating a new task.
+ streamingInsertJob.updateJobStatus(JobStatus.FINISHED);
+ return;
+ }
streamingInsertJob.createStreamingTask();
streamingInsertJob.setSampleStartTime(System.currentTimeMillis());
streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 892231444e3..16fb2394fe3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -110,5 +110,14 @@ public interface SourceOffsetProvider {
return null;
}
+ /**
+ * Returns true if the provider has reached a natural completion point
+ * and the job should be marked as FINISHED.
+ * Default: false (most providers run indefinitely).
+ */
+ default boolean hasReachedEnd() {
+ return false;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index d8959086fa5..b77dd8d8bd6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -116,7 +116,8 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
nextOffset.setSplits(snapshotSplits);
return nextOffset;
} else if (currentOffset != null && currentOffset.snapshotSplit()) {
- // snapshot to binlog
+ // initial mode: snapshot to binlog
+ // snapshot-only mode must be intercepted by hasReachedEnd()
before reaching here
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(finishedSplits);
nextOffset.setSplits(Collections.singletonList(binlogSplit));
@@ -243,6 +244,9 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
if (currentOffset.snapshotSplit()) {
+ if (isSnapshotOnlyMode() && remainingSplits.isEmpty()) {
+ return false;
+ }
return true;
}
@@ -372,14 +376,21 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
List<SnapshotSplit> lastSnapshotSplits =
recalculateRemainingSplits(chunkHighWatermarkMap,
snapshotSplits);
if (this.remainingSplits.isEmpty()) {
- currentOffset = new JdbcOffset();
if (!lastSnapshotSplits.isEmpty()) {
+ currentOffset = new JdbcOffset();
currentOffset.setSplits(lastSnapshotSplits);
- } else {
- // when snapshot to binlog phase fe restarts
+ } else if (!isSnapshotOnlyMode()) {
+ // initial mode: rebuild binlog split for
snapshot-to-binlog transition
+ currentOffset = new JdbcOffset();
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(finishedSplits);
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+ } else {
+ // snapshot-only completed: leave currentOffset as
null,
+ // hasReachedEnd() detects completion via
finishedSplits
+ log.info("Replaying offset provider for job {}:
snapshot-only mode completed,"
+ + " finishedSplits={}, skip currentOffset
restoration",
+ getJobId(), finishedSplits.size());
}
}
}
@@ -535,7 +546,20 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
if (startMode == null) {
return false;
}
- return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode);
+ return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode)
+ ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startMode);
+ }
+
+ private boolean isSnapshotOnlyMode() {
+ String offset = sourceProperties.get(DataSourceConfigKeys.OFFSET);
+ return DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset);
+ }
+
+ @Override
+ public boolean hasReachedEnd() {
+ return isSnapshotOnlyMode()
+ && CollectionUtils.isNotEmpty(finishedSplits)
+ && remainingSplits.isEmpty();
}
/**
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 5b8e343faae..77052577341 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -143,7 +143,8 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
// Check startup mode - for PostgreSQL, we use similar logic as MySQL
String startupMode =
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode))
{
+ if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
+ ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
remainingSnapshotSplits =
startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
} else {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 11e5007894d..15787782da9 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -163,7 +163,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
MySqlBinlogSplit remainingBinlogSplit = null;
- if (startupMode.equals(StartupMode.INITIAL)) {
+ if (startupMode.equals(StartupMode.INITIAL) ||
startupMode.equals(StartupMode.SNAPSHOT)) {
remainingSnapshotSplits =
startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
} else {
@@ -789,8 +789,9 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
// setting startMode
String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode))
{
- // do not need set offset when initial
- // configFactory.startupOptions(StartupOptions.initial());
+ configFactory.startupOptions(StartupOptions.initial());
+ } else if
(DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
+ configFactory.startupOptions(StartupOptions.snapshot());
} else if
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.earliest());
BinlogOffset binlogOffset =
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 6a5670ad6de..d465a71c242 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -205,6 +205,8 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode))
{
configFactory.startupOptions(StartupOptions.initial());
+ } else if
(DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
+ configFactory.startupOptions(StartupOptions.snapshot());
} else if
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.earliest());
} else if
(DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.out
new file mode 100644
index 00000000000..ea60bd7e201
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_table1 --
+A1 1
+B1 2
+
+-- !select_snapshot_table2 --
+A2 1
+B2 2
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.out
new file mode 100644
index 00000000000..ea60bd7e201
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_table1 --
+A1 1
+B1 2
+
+-- !select_snapshot_table2 --
+A2 1
+B2 2
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.groovy
new file mode 100644
index 00000000000..72ee1f29c5f
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test snapshot-only mode (offset=snapshot):
+ * 1. Job syncs existing data via full snapshot.
+ * 2. Job transitions to FINISHED after snapshot completes (no binlog phase).
+ * 3. Data inserted after job finishes is NOT synced to Doris.
+ */
+suite("test_streaming_mysql_job_snapshot",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_snapshot_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_mysql_snapshot1"
+ def table2 = "user_info_mysql_snapshot2"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ sql """drop table if exists ${currentDb}.${table2} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // prepare source tables and pre-existing data in mysql
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1',
1)"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1',
2)"""
+ sql """CREATE TABLE ${mysqlDb}.${table2} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2',
1)"""
+ sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2',
2)"""
+ }
+
+ // create streaming job with offset=snapshot (snapshot-only mode)
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysqlDb}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1},${table2}",
+ "offset" = "snapshot"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // wait for job to transition to FINISHED
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def jobStatus = sql """select Status from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ log.info("jobStatus: " + jobStatus)
+ jobStatus.size() == 1 && jobStatus.get(0).get(0) ==
'FINISHED'
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // verify snapshot data is correctly synced
+ qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name
asc """
+ qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name
asc """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.groovy
new file mode 100644
index 00000000000..771b4934319
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.groovy
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test snapshot-only mode (offset=snapshot):
+ * 1. Job syncs existing data via full snapshot.
+ * 2. Job transitions to FINISHED after snapshot completes (no binlog phase).
+ * 3. Data inserted after job finishes is NOT synced to Doris.
+ */
+suite("test_streaming_postgres_job_snapshot",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_snapshot_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_snapshot1"
+ def table2 = "user_info_pg_snapshot2"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ sql """drop table if exists ${currentDb}.${table2} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // prepare source tables and pre-existing data in postgres
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "name" varchar(200),
+ "age" int2,
+ PRIMARY KEY ("name")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('B1', 2)"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} (
+ "name" varchar(200),
+ "age" int2,
+ PRIMARY KEY ("name")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age)
VALUES ('A2', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age)
VALUES ('B2', 2)"""
+ }
+
+ // create streaming job with offset=snapshot (snapshot-only mode)
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1},${table2}",
+ "offset" = "snapshot"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // wait for job to transition to FINISHED
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def jobStatus = sql """select Status from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ log.info("jobStatus: " + jobStatus)
+ jobStatus.size() == 1 && jobStatus.get(0).get(0) ==
'FINISHED'
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // verify snapshot data is correctly synced
+ qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name
asc """
+ qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name
asc """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]