This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new d462fae94ca branch-4.1: [fix](streamingjob) fix streaming insert job
not refreshing TVF props after ALTER SQL #61451 (#61489)
d462fae94ca is described below
commit d462fae94cacc89978649acf151fe2769c60a833
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 19 14:23:28 2026 +0800
branch-4.1: [fix](streamingjob) fix streaming insert job not refreshing TVF
props after ALTER SQL #61451 (#61489)
Cherry-picked from #61451
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingInsertJob.java | 3 +
.../test_streaming_insert_job_alter_aksk.groovy | 131 +++++++++++++++++++++
2 files changed, 134 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 4d9b00a7121..f2b07e02ab1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -368,6 +368,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (StringUtils.isNotEmpty(alterJobCommand.getSql())) {
setExecuteSql(alterJobCommand.getSql());
initLogicalPlan(true);
+ // refresh cached TVF props so fetchMeta and
createStreamingInsertTask
+ // pick up the new credentials (e.g. aksk) from the updated SQL
+ this.originTvfProps = getCurrentTvf().getProperties().getMap();
String encryptedSql = generateEncryptedSql();
logParts.add("sql: " + encryptedSql);
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter_aksk.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter_aksk.groovy
new file mode 100644
index 00000000000..585d43d01ca
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter_aksk.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_alter_aksk") {
+ def tableName = "test_streaming_insert_job_alter_aksk_tbl"
+ def jobName = "test_streaming_insert_job_alter_aksk"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // Step 1: create job with correct aksk so that plan generation succeeds
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ // Step 2: wait for at least one successful task to confirm the job works
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def r = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+ log.info("check job succeed task count: " + r)
+ r.size() == 1 && r.get(0).get(0) >= '1'
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("show job: " + showjob)
+ throw ex
+ }
+
+ // Step 3: pause the job before altering (ALTER requires PAUSED state).
+ // Ignore errors in case the job is already paused (e.g. auto-paused after
consuming all files).
+ try {
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ } catch (Exception ignored) {
+ log.info("PAUSE job got exception (may already be paused): " +
ignored.getMessage())
+ }
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
+ def r = sql """select status from jobs("type"="insert") where
Name='${jobName}' and ExecuteType='STREAMING'"""
+ r.size() == 1 && 'PAUSED' == r.get(0).get(0)
+ })
+
+ // Step 4: alter to wrong aksk while job is PAUSED.
+ // originTvfProps must be refreshed by the fix so that fetchMeta picks up
the
+ // bad credentials after resume. Without the fix, originTvfProps would
still
+ // hold the old valid aksk and the job would keep running after resume.
+ sql """
+ ALTER JOB ${jobName}
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "wrong_ak_for_test",
+ "s3.secret_key" = "wrong_sk_for_test"
+ )
+ """
+
+ // Step 5: resume the job and wait for it to pause again due to fetchMeta
failure
+ sql """RESUME JOB where jobname = '${jobName}'"""
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def r = sql """select status from jobs("type"="insert")
where Name='${jobName}' and ExecuteType='STREAMING'"""
+ log.info("check job status paused after altering to wrong
aksk: " + r)
+ r.size() == 1 && 'PAUSED' == r.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("show job: " + showjob)
+ throw ex
+ }
+
+ // Step 6: verify the pause was caused by fetchMeta failure, not other
reasons
+ def errorMsg = sql """select ErrorMsg from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("error msg after altering to wrong aksk: " + errorMsg)
+ assert errorMsg.get(0).get(0).contains("Failed to fetch meta"),
+ "Expected fetchMeta failure after alter to wrong aksk, got: " +
errorMsg.get(0).get(0)
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def cnt = sql """select count(1) from jobs("type"="insert") where
Name='${jobName}'"""
+ assert cnt.get(0).get(0) == 0
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]