This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b6dea0afdb7 [Feature](Job)STARTS and AT allow setting
current_timestamp (#30593)
b6dea0afdb7 is described below
commit b6dea0afdb7af818be57bc6ea44164a18844c02a
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri Feb 2 14:10:41 2024 +0800
[Feature](Job)STARTS and AT allow setting current_timestamp (#30593)
---
fe/fe-core/src/main/cup/sql_parser.cup | 19 ++++++++++++++++--
.../org/apache/doris/analysis/CreateJobStmt.java | 15 +++++++++++---
.../doris/job/base/JobExecutionConfiguration.java | 4 +++-
.../suites/job_p0/test_base_insert_job.groovy | 23 ++++++++++++----------
4 files changed, 45 insertions(+), 16 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index ea0bfac3d51..708986bcbce 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -834,6 +834,7 @@ nonterminal ArrayList<String> opt_common_hints;
nonterminal String optional_on_ident;
nonterminal String opt_job_starts;
nonterminal String opt_job_ends;
+nonterminal String job_at_time;
nonterminal ColocateGroupName colocate_group_name;
nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
@@ -2596,7 +2597,7 @@ create_job_stmt ::=
CreateJobStmt stmt = new
CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.STREAMING,atTime,null,null,null,null,comment,executeSql);
RESULT = stmt;
:} */
- | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULE KW_AT
STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
+ | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULE job_at_time:atTime
opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new
CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.ONE_TIME,atTime,null,null,null,null,comment,executeSql);
RESULT = stmt;
@@ -2610,8 +2611,22 @@ create_job_stmt ::=
{:
RESULT = startTime;
:}
+ |KW_STARTS KW_CURRENT_TIMESTAMP
+ {:
+ RESULT = CreateJobStmt.CURRENT_TIMESTAMP_STRING;
+ :}
+ ;
+
+ job_at_time ::=
+ | KW_AT STRING_LITERAL:atTime
+ {:
+ RESULT = atTime;
+ :}
+ |KW_AT KW_CURRENT_TIMESTAMP
+ {:
+ RESULT = CreateJobStmt.CURRENT_TIMESTAMP_STRING;
+ :}
;
-
opt_job_ends ::=
{:
RESULT = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index 17e2185bf5e..d8618caae84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -82,6 +82,8 @@ public class CreateJobStmt extends DdlStmt {
private final String endsTimeStamp;
private final String comment;
+
+ public static final String CURRENT_TIMESTAMP_STRING = "current_timestamp";
private JobExecuteType executeType;
// exclude job name prefix, which is used by inner job
@@ -122,7 +124,11 @@ public class CreateJobStmt extends DdlStmt {
TimerDefinition timerDefinition = new TimerDefinition();
if (null != onceJobStartTimestamp) {
-
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
+ if
(onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
+ jobExecutionConfiguration.setImmediate(true);
+ } else {
+
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
+ }
}
if (null != interval) {
timerDefinition.setInterval(interval);
@@ -139,7 +145,11 @@ public class CreateJobStmt extends DdlStmt {
timerDefinition.setIntervalUnit(intervalUnit);
}
if (null != startsTimeStamp) {
-
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
+ if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
+ jobExecutionConfiguration.setImmediate(true);
+ } else {
+
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
+ }
}
if (null != endsTimeStamp) {
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
@@ -158,7 +168,6 @@ public class CreateJobStmt extends DdlStmt {
jobExecutionConfiguration,
System.currentTimeMillis(),
executeSql);
- //job.checkJobParams();
jobInstance = job;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 0b44073464f..553db9e966f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -137,8 +137,10 @@ public class JobExecutionConfiguration {
long jobStartTimeMs = timerDefinition.getStartTimeMs();
if (isImmediate()) {
jobStartTimeMs += intervalValue;
+ if (jobStartTimeMs > endTimeMs) {
+ return delayTimeSeconds;
+ }
}
-
return getExecutionDelaySeconds(startTimeMs, endTimeMs,
jobStartTimeMs,
intervalValue, currentTimeMs);
}
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index d9ebb832152..6ba917bd576 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -111,16 +111,10 @@ suite("test_base_insert_job") {
"replication_allocation" = "tag.location.default: 1"
);
"""
- // Enlarge this parameter to avoid other factors that cause time
verification to fail when submitting.
- def currentMs = System.currentTimeMillis() + 20000;
- def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs),
ZoneId.systemDefault());
-
- def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- def startTime = dateTime.format(formatter);
def dataCount = sql """select count(*) from ${tableName}"""
assert dataCount.get(0).get(0) == 0
sql """
- CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test
for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values
('2023-07-19', sleep(10000), 1001);
+ CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment
'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values
('2023-07-19', sleep(10000), 1001);
"""
Thread.sleep(25000)
@@ -153,9 +147,18 @@ suite("test_base_insert_job") {
//assert comment
assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"
sql """
- DROP JOB IF EXISTS where jobname = '${jobName}'
+ DROP JOB IF EXISTS where jobname = 'press'
"""
+ sql """
+ CREATE JOB press ON SCHEDULE every 10 hour starts CURRENT_TIMESTAMP
comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName}
values ('2023-07-19', 99, 99);
+ """
+ Thread.sleep(2500)
+ def recurringTableDatas = sql """ select count(1) from ${tableName} where
user_id=99 and type=99 """
+ assert recurringTableDatas.get(0).get(0) == 1
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
sql """
CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test
for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values
('2023-07-19', sleep(10000), 1001);
"""
@@ -198,7 +201,7 @@ suite("test_base_insert_job") {
// assert not support stmt
try {
sql """
- CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment
'test' DO update ${tableName} set type=2 where type=1;
+ CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment
'test' DO update ${tableName} set type=2 where type=1;
"""
} catch (Exception e) {
assert e.getMessage().contains("Not support UpdateStmt type in job")
@@ -206,7 +209,7 @@ suite("test_base_insert_job") {
// assert start time greater than current time
try {
sql """
- CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment
'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
+ CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07'
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("startTimeMs must be greater than
current time")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]