This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new de90051162d [Fix](Job)Replaying logs should not modify the original
information of the job (#40474)
de90051162d is described below
commit de90051162de7004cf171bbf4d21bd95ff9f3540
Author: Calvin Kirs <[email protected]>
AuthorDate: Thu Sep 12 09:59:30 2024 +0800
[Fix](Job)Replaying logs should not modify the original information of the
job (#40474)
## Proposed changes
```
JobExecutionConfiguration jobConfig = new
JobExecutionConfiguration();
jobConfig.setExecuteType(JobExecuteType.INSTANT);
setJobConfig(jobConfig);
```
- Replaying logs should not modify the original information of the job
- Use the new optimizer to check whether the executed statement is legal
---
.../org/apache/doris/analysis/CreateJobStmt.java | 40 +++++++----------
.../doris/job/extensions/insert/InsertJob.java | 19 +-------
.../data/job_p0/job_meta/job_query_test.out | 7 +++
.../suites/job_p0/job_meta/job_query_test.groovy | 28 ++++++++++++
regression-test/suites/job_p0/job_meta/load.groovy | 50 ++++++++++++++++++++++
.../suites/job_p0/test_base_insert_job.groovy | 2 +-
6 files changed, 103 insertions(+), 43 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index b9d42b249b2..0fff1e09749 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -32,15 +32,15 @@ import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.parser.NereidsParser;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
-import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import java.util.HashSet;
-
/**
* syntax:
* CREATE
@@ -91,12 +91,6 @@ public class CreateJobStmt extends DdlStmt implements
NotFallbackInParser {
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";
- private static final ImmutableSet<Class<? extends DdlStmt>>
supportStmtSuperClass
- = new ImmutableSet.Builder<Class<? extends
DdlStmt>>().add(InsertStmt.class)
- .build();
-
- private static final HashSet<String> supportStmtClassNamesCache = new
HashSet<>(16);
-
public CreateJobStmt(LabelName labelName, JobExecuteType executeType,
String onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String
comment, StatementBase doStmt) {
@@ -118,7 +112,6 @@ public class CreateJobStmt extends DdlStmt implements
NotFallbackInParser {
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
- analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate
jobInstance
JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
@@ -164,6 +157,7 @@ public class CreateJobStmt extends DdlStmt implements
NotFallbackInParser {
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt, jobName, comment);
+ analyzerSqlStmt(executeSql);
// create job use label name as its job name
InsertJob job = new InsertJob(jobName,
JobStatus.RUNNING,
@@ -191,22 +185,20 @@ public class CreateJobStmt extends DdlStmt implements
NotFallbackInParser {
}
}
- private void checkStmtSupport() throws AnalysisException {
- if
(supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) {
- return;
- }
- for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) {
- if (clazz.isAssignableFrom(doStmt.getClass())) {
-
supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName());
- return;
+ private void analyzerSqlStmt(String sql) throws UserException {
+ NereidsParser parser = new NereidsParser();
+ LogicalPlan logicalPlan = parser.parseSingle(sql);
+ if (logicalPlan instanceof InsertIntoTableCommand) {
+ InsertIntoTableCommand insertIntoTableCommand =
(InsertIntoTableCommand) logicalPlan;
+ try {
+ insertIntoTableCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor());
+ } catch (Exception e) {
+ throw new AnalysisException(e.getMessage());
}
- }
- throw new AnalysisException("Not support " +
doStmt.getClass().getSimpleName() + " type in job");
- }
- private void analyzerSqlStmt() throws UserException {
- checkStmtSupport();
- doStmt.analyze(analyzer);
+ } else {
+ throw new AnalysisException("Not support this sql : " + sql);
+ }
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 47d52c170b2..43f43ba8699 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -31,8 +31,6 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.io.Text;
-import org.apache.doris.common.util.LogBuilder;
-import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
@@ -647,23 +645,8 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
@Override
public void onReplayCreate() throws JobException {
- JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
- jobConfig.setExecuteType(JobExecuteType.INSTANT);
- setJobConfig(jobConfig);
onRegister();
- checkJobParams();
- log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg",
"replay create load job").build());
- }
-
- @Override
- public void onReplayEnd(AbstractJob<?, Map<Object, Object>> replayJob)
throws JobException {
- if (!(replayJob instanceof InsertJob)) {
- return;
- }
- InsertJob insertJob = (InsertJob) replayJob;
- unprotectReadEndOperation(insertJob);
- log.info(new LogBuilder(LogKey.LOAD_JOB,
- insertJob.getJobId()).add("operation", insertJob).add("msg",
"replay end load job").build());
+ super.onReplayCreate();
}
public int getProgress() {
diff --git a/regression-test/data/job_p0/job_meta/job_query_test.out
b/regression-test/data/job_p0/job_meta/job_query_test.out
new file mode 100644
index 00000000000..1a2bfe0f9cd
--- /dev/null
+++ b/regression-test/data/job_p0/job_meta/job_query_test.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select1 --
+JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into
t_test_BASE_inSert_job (timestamp, type, user_id) values
('2023-03-18','1','12213');
+
+-- !select2 --
+JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert
into t_test_BASE_inSert_job (timestamp, type, user_id) values
('2023-03-18','1','12213');
+
diff --git a/regression-test/suites/job_p0/job_meta/job_query_test.groovy
b/regression-test/suites/job_p0/job_meta/job_query_test.groovy
new file mode 100644
index 00000000000..3505a8108dd
--- /dev/null
+++ b/regression-test/suites/job_p0/job_meta/job_query_test.groovy
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+suite('job_query_test', 'p0,restart_fe') {
+ def oneTimeJobName = "JOB_ONETIME"
+ def recurringJobName = "JOB_RECURRING"
+ qt_select1 """
+ select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type"
= "insert") where name = '${oneTimeJobName}'
+ """
+ qt_select2 """
+ select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type"
= "insert") where name = '${recurringJobName}'
+ """
+
+
+}
\ No newline at end of file
diff --git a/regression-test/suites/job_p0/job_meta/load.groovy
b/regression-test/suites/job_p0/job_meta/load.groovy
new file mode 100644
index 00000000000..bf7b8a12128
--- /dev/null
+++ b/regression-test/suites/job_p0/job_meta/load.groovy
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('load', 'p0,restart_fe') {
+ def tableName = "t_test_BASE_inSert_job"
+ def oneTimeJobName = "JOB_ONETIME"
+ def recurringJobName = "JOB_RECURRING"
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${oneTimeJobName}'
+ """
+ sql """
+ DROP JOB IF EXISTS where jobname = '${recurringJobName}'
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName}`
+ (
+ `timestamp` DATE NOT NULL COMMENT "['0000-01-01', '9999-12-31']",
+ `type` TINYINT NOT NULL COMMENT "[-128, 127]",
+ `user_id` BIGINT COMMENT "[-9223372036854775808,
9223372036854775807]"
+ )
+ DUPLICATE KEY(`timestamp`, `type`)
+ DISTRIBUTED BY HASH(`type`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """
+ CREATE JOB ${recurringJobName} ON SCHEDULE every 1 HOUR STARTS
'2052-03-18 00:00:00' comment 'test' DO insert into ${tableName} (timestamp,
type, user_id) values ('2023-03-18','1','12213');
+ """
+
+ sql """
+ CREATE JOB ${oneTimeJobName} ON SCHEDULE AT '2052-03-18 00:00:00'
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
+ """
+
+}
\ No newline at end of file
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index e67e65bf345..be744427d88 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -216,7 +216,7 @@ suite("test_base_insert_job") {
CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment
'test' DO update ${tableName} set type=2 where type=1;
"""
} catch (Exception e) {
- assert e.getMessage().contains("Not support UpdateStmt type in job")
+ assert e.getMessage().contains("Not support this sql")
}
// assert start time greater than current time
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]