morningman commented on code in PR #41391:
URL: https://github.com/apache/doris/pull/41391#discussion_r1778840993
##########
fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4:
##########
@@ -102,7 +103,17 @@ materializedViewStatement
| CANCEL MATERIALIZED VIEW TASK taskId=INTEGER_VALUE ON
mvName=multipartIdentifier #cancelMTMVTask
| SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier
#showCreateMTMV
;
-
+jobScheduleStatement
+ : CREATE JOB label=multipartIdentifier ON SCHEDULE
+ (
+ (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier
+ (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))?
+ (ENDS endsTime=STRING_LITERAL)?)
+ |
+ (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
+ commentSpec?
+ DO insertIntoTableStatement
#createScheduledJob
Review Comment:
Why not just using `supportedDmlStatement`?
And if don't support other dml, we can disallow them when analyzing
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java:
##########
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
Review Comment:
Wrong comment format?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.parser.NereidsParser;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Optional;
+
+/**
+ * Build job info and analyze the SQL statement to create a job.
+ */
+public class CreateJobInfo {
+
+ // exclude job name prefix, which is used by inner job
+ private static final String excludeJobNamePrefix = "inner_";
+
+ private final Optional<String> labelNameOptional;
+
+ private final Optional<String> onceJobStartTimestampOptional;
+
+ private final Optional<Long> intervalOptional;
+
+ private final Optional<String> intervalTimeUnitOptional;
+
+ private final Optional<String> startsTimeStampOptional;
+
+ private final Optional<String> endsTimeStampOptional;
+
+ private final Optional<Boolean> immediateStartOptional;
+
+ private final String comment;
+
+ private final String originalSql;
+
+ /**
+ * Constructor for CreateJobInfo.
+ *
+ * @param labelNameOptional Job name.
+ * @param onceJobStartTimestampOptional Start time for a one-time job.
+ * @param intervalOptional Interval for a recurring job.
+ * @param intervalTimeUnitOptional Interval time unit for a recurring
job.
+ * @param startsTimeStampOptional Start time for a recurring job.
+ * @param endsTimeStampOptional End time for a recurring job.
+ * @param immediateStartOptional Immediate start for a job.
+ * @param comment Comment for the job.
+ * @param originalSql Original SQL statement.
+ */
+ public CreateJobInfo(Optional<String> labelNameOptional, Optional<String>
onceJobStartTimestampOptional,
+ Optional<Long> intervalOptional, Optional<String>
intervalTimeUnitOptional,
+ Optional<String> startsTimeStampOptional,
Optional<String> endsTimeStampOptional,
+ Optional<Boolean> immediateStartOptional, String
comment, String originalSql) {
+ this.labelNameOptional = labelNameOptional;
+ this.onceJobStartTimestampOptional = onceJobStartTimestampOptional;
+ this.intervalOptional = intervalOptional;
+ this.intervalTimeUnitOptional = intervalTimeUnitOptional;
+ this.startsTimeStampOptional = startsTimeStampOptional;
+ this.endsTimeStampOptional = endsTimeStampOptional;
+ this.immediateStartOptional = immediateStartOptional;
+ this.comment = comment;
+ this.originalSql = originalSql;
+
+ }
+
+ /**
+ * Analyzes the provided SQL statement and builds the job information.
+ * @param ctx Connect context.
+ * @return AbstractJob instance.
+ * @throws UserException If there is an error during SQL analysis or job
creation.
+ */
+ public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws
UserException {
+ checkAuth();
+ if (labelNameOptional.orElseThrow(() -> new
AnalysisException("labelName is null")).isEmpty()) {
+ throw new AnalysisException("Job name can not be empty");
+ }
+
+ String jobName = labelNameOptional.get();
+ checkJobName(jobName);
+ String dbName = ctx.getDatabase();
+
+ Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+ // check its insert stmt,currently only support insert stmt
+ JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
+ JobExecuteType executeType = intervalOptional.isPresent() ?
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
+ jobExecutionConfiguration.setExecuteType(executeType);
+ TimerDefinition timerDefinition = new TimerDefinition();
+
+ if (executeType.equals(JobExecuteType.ONE_TIME)) {
+ buildOnceJob(timerDefinition, jobExecutionConfiguration);
+ } else {
+ buildRecurringJob(timerDefinition, jobExecutionConfiguration);
+ }
+ jobExecutionConfiguration.setTimerDefinition(timerDefinition);
+
+ String executeSql = parseExecuteSql(originalSql, jobName, comment);
+ return analyzeAndCreateJob(executeSql, dbName,
jobExecutionConfiguration);
+ }
+
+ /**
+ * Builds a TimerDefinition for a once-job.
+ *
+ * @param timerDefinition Timer definition to be built.
+ * @param jobExecutionConfiguration Job execution configuration.
+ * @throws AnalysisException If the job is not configured correctly.
+ */
+ private void buildOnceJob(TimerDefinition timerDefinition,
+ JobExecutionConfiguration
jobExecutionConfiguration) throws AnalysisException {
+ if (immediateStartOptional.isPresent() &&
Boolean.TRUE.equals(immediateStartOptional.get())) {
+ jobExecutionConfiguration.setImmediate(true);
+ timerDefinition.setStartTimeMs(System.currentTimeMillis());
+ return;
+ }
+
+ // Ensure start time is provided for once jobs.
+ String startTime = onceJobStartTimestampOptional.orElseThrow(()
+ -> new AnalysisException("Once time job must set start time"));
+ timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startTime));
+ }
+
+ /**
+ * Builds a TimerDefinition for a recurring job.
+ *
+ * @param timerDefinition Timer definition to be built.
+ * @param jobExecutionConfiguration Job execution configuration.
+ * @throws AnalysisException If the job is not configured correctly.
+ */
+ private void buildRecurringJob(TimerDefinition timerDefinition,
+ JobExecutionConfiguration
jobExecutionConfiguration) throws AnalysisException {
+ // Ensure interval is provided for recurring jobs.
+ long interval = intervalOptional.orElseThrow(()
+ -> new AnalysisException("Interval must be set for recurring
job"));
+ timerDefinition.setInterval(interval);
+
+ // Ensure interval time unit is provided for recurring jobs.
+ String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(()
+ -> new AnalysisException("Interval time unit must be set for
recurring job"));
+ IntervalUnit intervalUnit =
IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
+ if (intervalUnit == null) {
+ throw new AnalysisException("Invalid interval time unit: " +
intervalTimeUnit);
+ }
+
+ // Check if interval unit is second and disable if not in test mode.
+ if (intervalUnit.equals(IntervalUnit.SECOND) &&
!Config.enable_job_schedule_second_for_test) {
+ throw new AnalysisException("Interval time unit can not be second
in production mode");
+ }
+
+ timerDefinition.setIntervalUnit(intervalUnit);
+
+ // Set end time if provided.
+ endsTimeStampOptional.ifPresent(s ->
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(s)));
+
+ // Set immediate start if configured.
+ if (immediateStartOptional.isPresent() &&
Boolean.TRUE.equals(immediateStartOptional.get())) {
+ jobExecutionConfiguration.setImmediate(true);
+ // Avoid immediate re-scheduling by setting start time slightly in
the past.
+ timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100);
+ return;
+ }
+ // Set start time if provided.
+ startsTimeStampOptional.ifPresent(s ->
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(s)));
+ }
+
+ protected static void checkAuth() throws AnalysisException {
+ if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ }
+
+ /**
+ * parse execute sql from create job stmt
+ * Some stmt not implement toSql method,so we need to parse sql from
originStmt
+ */
+ private static String parseExecuteSql(String sql, String jobName, String
comment) throws AnalysisException {
+ String lowerCaseSql = sql.toLowerCase();
+ String lowerCaseJobName = jobName.toLowerCase();
+ // Find the end position of the job name in the SQL statement.
+ int jobNameEndIndex = lowerCaseSql.indexOf(lowerCaseJobName) +
lowerCaseJobName.length();
+ String subSqlStmt = lowerCaseSql.substring(jobNameEndIndex);
+ String originSubSqlStmt = sql.substring(jobNameEndIndex);
+ // If the comment is not empty, extract the SQL statement from the end
position of the comment.
+ if (org.apache.commons.lang3.StringUtils.isNotBlank(comment)) {
+
+ String lowerCaseComment = comment.toLowerCase();
+ int splitDoIndex = subSqlStmt.indexOf(lowerCaseComment) +
lowerCaseComment.length();
+ subSqlStmt = subSqlStmt.substring(splitDoIndex);
+ originSubSqlStmt = originSubSqlStmt.substring(splitDoIndex);
+ }
+ // Find the position of the "do" keyword and extract the execution SQL
statement from this position.
+ int executeSqlIndex = subSqlStmt.indexOf("do");
Review Comment:
I think when using antler, we can get the sql string directly?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.parser.NereidsParser;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Optional;
+
+/**
+ * Build job info and analyze the SQL statement to create a job.
+ */
+public class CreateJobInfo {
+
+ // exclude job name prefix, which is used by inner job
+ private static final String excludeJobNamePrefix = "inner_";
+
+ private final Optional<String> labelNameOptional;
+
+ private final Optional<String> onceJobStartTimestampOptional;
+
+ private final Optional<Long> intervalOptional;
+
+ private final Optional<String> intervalTimeUnitOptional;
+
+ private final Optional<String> startsTimeStampOptional;
+
+ private final Optional<String> endsTimeStampOptional;
+
+ private final Optional<Boolean> immediateStartOptional;
+
+ private final String comment;
+
+ private final String originalSql;
+
+ /**
+ * Constructor for CreateJobInfo.
+ *
+ * @param labelNameOptional Job name.
+ * @param onceJobStartTimestampOptional Start time for a one-time job.
+ * @param intervalOptional Interval for a recurring job.
+ * @param intervalTimeUnitOptional Interval time unit for a recurring
job.
+ * @param startsTimeStampOptional Start time for a recurring job.
+ * @param endsTimeStampOptional End time for a recurring job.
+ * @param immediateStartOptional Immediate start for a job.
+ * @param comment Comment for the job.
+ * @param originalSql Original SQL statement.
+ */
+ public CreateJobInfo(Optional<String> labelNameOptional, Optional<String>
onceJobStartTimestampOptional,
+ Optional<Long> intervalOptional, Optional<String>
intervalTimeUnitOptional,
+ Optional<String> startsTimeStampOptional,
Optional<String> endsTimeStampOptional,
+ Optional<Boolean> immediateStartOptional, String
comment, String originalSql) {
+ this.labelNameOptional = labelNameOptional;
+ this.onceJobStartTimestampOptional = onceJobStartTimestampOptional;
+ this.intervalOptional = intervalOptional;
+ this.intervalTimeUnitOptional = intervalTimeUnitOptional;
+ this.startsTimeStampOptional = startsTimeStampOptional;
+ this.endsTimeStampOptional = endsTimeStampOptional;
+ this.immediateStartOptional = immediateStartOptional;
+ this.comment = comment;
+ this.originalSql = originalSql;
+
+ }
+
+ /**
+ * Analyzes the provided SQL statement and builds the job information.
+ * @param ctx Connect context.
+ * @return AbstractJob instance.
+ * @throws UserException If there is an error during SQL analysis or job
creation.
+ */
+ public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws
UserException {
+ checkAuth();
+ if (labelNameOptional.orElseThrow(() -> new
AnalysisException("labelName is null")).isEmpty()) {
+ throw new AnalysisException("Job name can not be empty");
+ }
+
+ String jobName = labelNameOptional.get();
+ checkJobName(jobName);
+ String dbName = ctx.getDatabase();
+
+ Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+ // check its insert stmt,currently only support insert stmt
+ JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
+ JobExecuteType executeType = intervalOptional.isPresent() ?
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
+ jobExecutionConfiguration.setExecuteType(executeType);
+ TimerDefinition timerDefinition = new TimerDefinition();
+
+ if (executeType.equals(JobExecuteType.ONE_TIME)) {
+ buildOnceJob(timerDefinition, jobExecutionConfiguration);
+ } else {
+ buildRecurringJob(timerDefinition, jobExecutionConfiguration);
+ }
+ jobExecutionConfiguration.setTimerDefinition(timerDefinition);
+
+ String executeSql = parseExecuteSql(originalSql, jobName, comment);
+ return analyzeAndCreateJob(executeSql, dbName,
jobExecutionConfiguration);
+ }
+
+ /**
+ * Builds a TimerDefinition for a once-job.
+ *
+ * @param timerDefinition Timer definition to be built.
+ * @param jobExecutionConfiguration Job execution configuration.
+ * @throws AnalysisException If the job is not configured correctly.
+ */
+ private void buildOnceJob(TimerDefinition timerDefinition,
+ JobExecutionConfiguration
jobExecutionConfiguration) throws AnalysisException {
+ if (immediateStartOptional.isPresent() &&
Boolean.TRUE.equals(immediateStartOptional.get())) {
+ jobExecutionConfiguration.setImmediate(true);
+ timerDefinition.setStartTimeMs(System.currentTimeMillis());
+ return;
+ }
+
+ // Ensure start time is provided for once jobs.
+ String startTime = onceJobStartTimestampOptional.orElseThrow(()
+ -> new AnalysisException("Once time job must set start time"));
+ timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startTime));
+ }
+
+ /**
+ * Builds a TimerDefinition for a recurring job.
+ *
+ * @param timerDefinition Timer definition to be built.
+ * @param jobExecutionConfiguration Job execution configuration.
+ * @throws AnalysisException If the job is not configured correctly.
+ */
+ private void buildRecurringJob(TimerDefinition timerDefinition,
+ JobExecutionConfiguration
jobExecutionConfiguration) throws AnalysisException {
+ // Ensure interval is provided for recurring jobs.
+ long interval = intervalOptional.orElseThrow(()
+ -> new AnalysisException("Interval must be set for recurring
job"));
+ timerDefinition.setInterval(interval);
+
+ // Ensure interval time unit is provided for recurring jobs.
+ String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(()
+ -> new AnalysisException("Interval time unit must be set for
recurring job"));
+ IntervalUnit intervalUnit =
IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
+ if (intervalUnit == null) {
+ throw new AnalysisException("Invalid interval time unit: " +
intervalTimeUnit);
+ }
+
+ // Check if interval unit is second and disable if not in test mode.
+ if (intervalUnit.equals(IntervalUnit.SECOND) &&
!Config.enable_job_schedule_second_for_test) {
+ throw new AnalysisException("Interval time unit can not be second
in production mode");
+ }
+
+ timerDefinition.setIntervalUnit(intervalUnit);
+
+ // Set end time if provided.
+ endsTimeStampOptional.ifPresent(s ->
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(s)));
+
+ // Set immediate start if configured.
+ if (immediateStartOptional.isPresent() &&
Boolean.TRUE.equals(immediateStartOptional.get())) {
+ jobExecutionConfiguration.setImmediate(true);
+ // Avoid immediate re-scheduling by setting start time slightly in
the past.
+ timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100);
+ return;
+ }
+ // Set start time if provided.
+ startsTimeStampOptional.ifPresent(s ->
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(s)));
+ }
+
+ protected static void checkAuth() throws AnalysisException {
+ if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ }
+
+ /**
+ * parse execute sql from create job stmt
+ * Some stmt not implement toSql method,so we need to parse sql from
originStmt
+ */
+ private static String parseExecuteSql(String sql, String jobName, String
comment) throws AnalysisException {
+ String lowerCaseSql = sql.toLowerCase();
+ String lowerCaseJobName = jobName.toLowerCase();
+ // Find the end position of the job name in the SQL statement.
+ int jobNameEndIndex = lowerCaseSql.indexOf(lowerCaseJobName) +
lowerCaseJobName.length();
+ String subSqlStmt = lowerCaseSql.substring(jobNameEndIndex);
+ String originSubSqlStmt = sql.substring(jobNameEndIndex);
+ // If the comment is not empty, extract the SQL statement from the end
position of the comment.
+ if (org.apache.commons.lang3.StringUtils.isNotBlank(comment)) {
+
+ String lowerCaseComment = comment.toLowerCase();
+ int splitDoIndex = subSqlStmt.indexOf(lowerCaseComment) +
lowerCaseComment.length();
+ subSqlStmt = subSqlStmt.substring(splitDoIndex);
+ originSubSqlStmt = originSubSqlStmt.substring(splitDoIndex);
+ }
+ // Find the position of the "do" keyword and extract the execution SQL
statement from this position.
+ int executeSqlIndex = subSqlStmt.indexOf("do");
+ String executeSql = originSubSqlStmt.substring(executeSqlIndex +
2).trim();
+ if (StringUtils.isBlank(executeSql)) {
+ throw new AnalysisException("execute sql has invalid format");
+ }
+ return executeSql;
+ }
+
+ /**
+ * Analyzes the provided SQL statement and creates an appropriate job
based on the parsed logical plan.
+ * Currently, only "InsertIntoTableCommand" is supported for job creation.
+ *
+ * @param sql the SQL statement to be analyzed
+ * @param currentDbName the current database name where the
SQL statement will be executed
+ * @param jobExecutionConfiguration the configuration for job execution
+ * @return an instance of AbstractJob corresponding to the SQL statement
+ * @throws UserException if there is an error during SQL analysis or job
creation
+ */
+ private AbstractJob analyzeAndCreateJob(String sql, String currentDbName,
+ JobExecutionConfiguration
jobExecutionConfiguration) throws UserException {
+ NereidsParser parser = new NereidsParser();
+ LogicalPlan logicalPlan = parser.parseSingle(sql);
+ if (logicalPlan instanceof InsertIntoTableCommand) {
+ InsertIntoTableCommand insertIntoTableCommand =
(InsertIntoTableCommand) logicalPlan;
+ try {
+ insertIntoTableCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor(), false);
+ return new InsertJob(labelNameOptional.get(),
+ JobStatus.RUNNING,
+ currentDbName,
+ comment,
+ ConnectContext.get().getCurrentUserIdentity(),
+ jobExecutionConfiguration,
+ System.currentTimeMillis(),
+ sql);
+ } catch (Exception e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ } else {
+ throw new AnalysisException("Not support this sql : " + sql);
Review Comment:
I think we can also print the class name of this sql too
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.parser.NereidsParser;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Optional;
+
+/**
+ * Build job info and analyze the SQL statement to create a job.
+ */
+public class CreateJobInfo {
+
+ // exclude job name prefix, which is used by inner job
+ private static final String excludeJobNamePrefix = "inner_";
+
+ private final Optional<String> labelNameOptional;
+
+ private final Optional<String> onceJobStartTimestampOptional;
+
+ private final Optional<Long> intervalOptional;
+
+ private final Optional<String> intervalTimeUnitOptional;
+
+ private final Optional<String> startsTimeStampOptional;
+
+ private final Optional<String> endsTimeStampOptional;
+
+ private final Optional<Boolean> immediateStartOptional;
+
+ private final String comment;
+
+ private final String originalSql;
+
+ /**
+ * Constructor for CreateJobInfo.
+ *
+ * @param labelNameOptional Job name.
+ * @param onceJobStartTimestampOptional Start time for a one-time job.
+ * @param intervalOptional Interval for a recurring job.
+ * @param intervalTimeUnitOptional Interval time unit for a recurring
job.
+ * @param startsTimeStampOptional Start time for a recurring job.
+ * @param endsTimeStampOptional End time for a recurring job.
+ * @param immediateStartOptional Immediate start for a job.
+ * @param comment Comment for the job.
+ * @param originalSql Original SQL statement.
+ */
+ public CreateJobInfo(Optional<String> labelNameOptional, Optional<String>
onceJobStartTimestampOptional,
+ Optional<Long> intervalOptional, Optional<String>
intervalTimeUnitOptional,
+ Optional<String> startsTimeStampOptional,
Optional<String> endsTimeStampOptional,
+ Optional<Boolean> immediateStartOptional, String
comment, String originalSql) {
+ this.labelNameOptional = labelNameOptional;
+ this.onceJobStartTimestampOptional = onceJobStartTimestampOptional;
+ this.intervalOptional = intervalOptional;
+ this.intervalTimeUnitOptional = intervalTimeUnitOptional;
+ this.startsTimeStampOptional = startsTimeStampOptional;
+ this.endsTimeStampOptional = endsTimeStampOptional;
+ this.immediateStartOptional = immediateStartOptional;
+ this.comment = comment;
+ this.originalSql = originalSql;
+
+ }
+
+ /**
+ * Analyzes the provided SQL statement and builds the job information.
+ * @param ctx Connect context.
+ * @return AbstractJob instance.
+ * @throws UserException If there is an error during SQL analysis or job
creation.
+ */
+ public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws
UserException {
+ checkAuth();
+ if (labelNameOptional.orElseThrow(() -> new
AnalysisException("labelName is null")).isEmpty()) {
+ throw new AnalysisException("Job name can not be empty");
+ }
+
+ String jobName = labelNameOptional.get();
+ checkJobName(jobName);
+ String dbName = ctx.getDatabase();
+
+ Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+ // check its insert stmt,currently only support insert stmt
+ JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
+ JobExecuteType executeType = intervalOptional.isPresent() ?
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
+ jobExecutionConfiguration.setExecuteType(executeType);
+ TimerDefinition timerDefinition = new TimerDefinition();
+
+ if (executeType.equals(JobExecuteType.ONE_TIME)) {
+ buildOnceJob(timerDefinition, jobExecutionConfiguration);
+ } else {
+ buildRecurringJob(timerDefinition, jobExecutionConfiguration);
+ }
+ jobExecutionConfiguration.setTimerDefinition(timerDefinition);
+
+ String executeSql = parseExecuteSql(originalSql, jobName, comment);
+ return analyzeAndCreateJob(executeSql, dbName,
jobExecutionConfiguration);
+ }
+
+ /**
+ * Builds a TimerDefinition for a once-job.
+ *
+ * @param timerDefinition Timer definition to be built.
+ * @param jobExecutionConfiguration Job execution configuration.
+ * @throws AnalysisException If the job is not configured correctly.
+ */
+ private void buildOnceJob(TimerDefinition timerDefinition,
+ JobExecutionConfiguration
jobExecutionConfiguration) throws AnalysisException {
+ if (immediateStartOptional.isPresent() &&
Boolean.TRUE.equals(immediateStartOptional.get())) {
+ jobExecutionConfiguration.setImmediate(true);
+ timerDefinition.setStartTimeMs(System.currentTimeMillis());
+ return;
+ }
+
+ // Ensure start time is provided for once jobs.
+ String startTime = onceJobStartTimestampOptional.orElseThrow(()
+ -> new AnalysisException("Once time job must set start time"));
+ timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startTime));
+ }
+
+ /**
+ * Builds a TimerDefinition for a recurring job.
+ *
+ * @param timerDefinition Timer definition to be built.
+ * @param jobExecutionConfiguration Job execution configuration.
+ * @throws AnalysisException If the job is not configured correctly.
+ */
+ private void buildRecurringJob(TimerDefinition timerDefinition,
+ JobExecutionConfiguration
jobExecutionConfiguration) throws AnalysisException {
+ // Ensure interval is provided for recurring jobs.
+ long interval = intervalOptional.orElseThrow(()
+ -> new AnalysisException("Interval must be set for recurring
job"));
+ timerDefinition.setInterval(interval);
+
+ // Ensure interval time unit is provided for recurring jobs.
+ String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(()
+ -> new AnalysisException("Interval time unit must be set for
recurring job"));
+ IntervalUnit intervalUnit =
IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
+ if (intervalUnit == null) {
+ throw new AnalysisException("Invalid interval time unit: " +
intervalTimeUnit);
+ }
+
+ // Check if interval unit is second and disable if not in test mode.
+ if (intervalUnit.equals(IntervalUnit.SECOND) &&
!Config.enable_job_schedule_second_for_test) {
+ throw new AnalysisException("Interval time unit can not be second
in production mode");
+ }
+
+ timerDefinition.setIntervalUnit(intervalUnit);
+
+ // Set end time if provided.
+ endsTimeStampOptional.ifPresent(s ->
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(s)));
+
+ // Set immediate start if configured.
+ if (immediateStartOptional.isPresent() &&
Boolean.TRUE.equals(immediateStartOptional.get())) {
+ jobExecutionConfiguration.setImmediate(true);
+ // Avoid immediate re-scheduling by setting start time slightly in
the past.
+ timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100);
+ return;
+ }
+ // Set start time if provided.
+ startsTimeStampOptional.ifPresent(s ->
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(s)));
+ }
+
+ protected static void checkAuth() throws AnalysisException {
+ if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ }
+
+ /**
+ * parse execute sql from create job stmt
+ * Some stmt not implement toSql method,so we need to parse sql from
originStmt
+ */
+ private static String parseExecuteSql(String sql, String jobName, String
comment) throws AnalysisException {
+ String lowerCaseSql = sql.toLowerCase();
+ String lowerCaseJobName = jobName.toLowerCase();
+ // Find the end position of the job name in the SQL statement.
+ int jobNameEndIndex = lowerCaseSql.indexOf(lowerCaseJobName) +
lowerCaseJobName.length();
+ String subSqlStmt = lowerCaseSql.substring(jobNameEndIndex);
+ String originSubSqlStmt = sql.substring(jobNameEndIndex);
+ // If the comment is not empty, extract the SQL statement from the end
position of the comment.
+ if (org.apache.commons.lang3.StringUtils.isNotBlank(comment)) {
Review Comment:
use `Strings.isNullOrEmpty()` ? in Guava
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java:
##########
@@ -123,12 +123,17 @@ public void runWithUpdateInfo(ConnectContext ctx,
StmtExecutor executor,
runInternal(ctx, executor);
}
+ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor
executor) throws Exception {
+ return initPlan(ctx, executor, true);
+ }
+
/**
* This function is used to generate the plan for Nereids.
* There are some load functions that only need to the plan, such as
stream_load.
* Therefore, this section will be presented separately.
*/
Review Comment:
Add comment for `needBeginTransaction`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]