This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a47e4e8f056 Support parsing Doris CREATE STREAMING JOB and CANCEL TASK
syntax (#38318)
a47e4e8f056 is described below
commit a47e4e8f05619260f64530bcd27e49c69340e6fe
Author: cxy <[email protected]>
AuthorDate: Wed Mar 4 23:11:04 2026 +0800
Support parsing Doris CREATE STREAMING JOB and CANCEL TASK syntax (#38318)
* Support parsing Doris CREATE STREAMING JOB and CANCEL TASK syntax
* Support parsing Doris CREATE STREAMING JOB and CANCEL TASK syntax
---
.../core/database/visitor/SQLVisitorRule.java | 4 +
.../src/main/antlr4/imports/doris/BaseRule.g4 | 3 +
.../src/main/antlr4/imports/doris/DDLStatement.g4 | 14 +++
.../src/main/antlr4/imports/doris/DorisKeyword.g4 | 12 ++
.../sql/parser/autogen/DorisStatement.g4 | 2 +
.../statement/type/DorisDDLStatementVisitor.java | 35 ++++++
.../doris/ddl/DorisCancelTaskStatement.java | 39 ++++++
.../ddl/DorisCreateStreamingJobStatement.java | 111 ++++++++++++++++
.../doris/DorisCancelTaskStatementAssert.java | 48 +++++++
.../DorisCreateStreamingJobStatementAssert.java | 140 +++++++++++++++++++++
.../ddl/dialect/doris/DorisDDLStatementAssert.java | 8 ++
.../cases/parser/jaxb/RootSQLParserTestCases.java | 8 ++
.../doris/DorisCancelTaskStatementTestCase.java | 41 ++++++
.../DorisCreateStreamingJobStatementTestCase.java | 69 ++++++++++
.../src/main/resources/case/ddl/cancel-task.xml | 29 +++++
.../resources/case/ddl/create-streaming-job.xml | 88 +++++++++++++
.../resources/sql/supported/ddl/cancel-task.xml | 22 ++++
.../sql/supported/ddl/create-streaming-job.xml | 26 ++++
18 files changed, 699 insertions(+)
diff --git
a/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
b/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
index ea5de1f229b..e8867cc0887 100644
---
a/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
+++
b/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
@@ -291,6 +291,10 @@ public enum SQLVisitorRule {
CREATE_JOB("CreateJob", SQLStatementType.DDL),
+ CREATE_STREAMING_JOB("CreateStreamingJob", SQLStatementType.DDL),
+
+ CANCEL_TASK("CancelTask", SQLStatementType.DDL),
+
RESUME_JOB("ResumeJob", SQLStatementType.DDL),
RESUME_SYNC_JOB("ResumeSyncJob", SQLStatementType.DDL),
diff --git
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
index 2f23f17222e..412b7d7261a 100644
--- a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
+++ b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
@@ -350,6 +350,7 @@ identifierKeywordsUnambiguous
| MULTIPOINT
| MULTIPOLYGON
| MUTEX
+ | MYSQL
| Doris_ERRNO
| NAMES
| NAME
@@ -401,6 +402,7 @@ identifierKeywordsUnambiguous
| POINT
| POLYGON
| PORT
+ | POSTGRES
| PRECEDING
| PRESERVE
| PREV
@@ -495,6 +497,7 @@ identifierKeywordsUnambiguous
| STATUS
| STORAGE
| STREAM
+ | STREAMING
| STRING
// DORIS ADDED BEGIN
| STRRIGHT
diff --git
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4
index b48a8b52d82..fc2764dd78a 100644
---
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4
+++
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4
@@ -999,6 +999,16 @@ createJob
: CREATE JOB jobName ON SCHEDULE jobScheduleExpression (COMMENT string_)?
DO insert
;
+createStreamingJob
+ : CREATE JOB jobName ON STREAMING (PROPERTIES LP_ jobProps=properties
RP_)? (COMMENT string_)?
+ (DO insert | FROM streamingSourceType LP_ sourceProps=properties RP_ TO
DATABASE databaseName (PROPERTIES? LP_ targetProps=properties RP_)?)
+ ;
+
+streamingSourceType
+ : MYSQL
+ | POSTGRES
+ ;
+
jobScheduleExpression
: AT timestampValue
| EVERY intervalValue (STARTS timestampValue)? (ENDS timestampValue)?
@@ -1020,6 +1030,10 @@ alterJob
: ALTER JOB jobName propertiesClause? (DO insert)?
;
+cancelTask
+ : CANCEL TASK WHERE jobName EQ_ stringLiterals AND identifier EQ_
numberLiterals
+ ;
+
resumeSyncJob
: RESUME SYNC JOB (owner DOT_)? identifier
;
diff --git
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
index 91e0c971f8f..bd5f3356fec 100644
---
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
+++
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
@@ -1689,6 +1689,10 @@ MUTEX
: M U T E X
;
+MYSQL
+ : M Y S Q L
+ ;
+
Doris_ERRNO
: M Y S Q L UL_ E R R N O
;
@@ -2006,6 +2010,10 @@ PORT
: P O R T
;
+POSTGRES
+ : P O S T G R E S
+ ;
+
PRECEDES
: P R E C E D E S
;
@@ -2720,6 +2728,10 @@ STREAM
: S T R E A M
;
+STREAMING
+ : S T R E A M I N G
+ ;
+
STRING
: S T R I N G
;
diff --git
a/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
b/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
index 2f11cd11595..da0266851b3 100644
---
a/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
+++
b/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
@@ -162,6 +162,8 @@ execute
| sync
| unsetVariable
| createJob
+ | createStreamingJob
+ | cancelTask
| backup
| cancelBackup
// TODO consider refactor following sytax to SEMI_? EOF
diff --git
a/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
b/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
index 16023d3dae3..01b61c36810 100644
---
a/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
+++
b/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
@@ -73,6 +73,7 @@ import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateF
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateFunctionContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateIndexContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateJobContext;
+import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateStreamingJobContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateLikeClauseContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateLogfileGroupContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateMaterializedViewContext;
@@ -135,6 +136,7 @@ import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RenameR
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RenameTableContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RepeatStatementContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ReplaceTableContext;
+import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CancelTaskContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ResumeJobContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ResumeSyncJobContext;
import
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RollupItemContext;
@@ -267,9 +269,11 @@ import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterColoca
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterStoragePolicyStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateFunctionStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateJobStatement;
+import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateStreamingJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateSyncJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropFunctionStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseSyncJobStatement;
+import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCancelTaskStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseJobStatement;
@@ -520,6 +524,37 @@ public final class DorisDDLStatementVisitor extends
DorisStatementVisitor implem
return result;
}
+ @Override
+ public ASTNode visitCreateStreamingJob(final CreateStreamingJobContext
ctx) {
+ DorisCreateStreamingJobStatement result = new
DorisCreateStreamingJobStatement(getDatabaseType());
+ result.setJobName(new
JobNameSegment(ctx.jobName().start.getStartIndex(),
ctx.jobName().stop.getStopIndex(), (IdentifierValue)
visit(ctx.jobName().identifier())));
+ if (null != ctx.COMMENT() && null != ctx.string_()) {
+ result.setComment(new
JobCommentSegment(ctx.COMMENT().getSymbol().getStartIndex(),
ctx.string_().stop.getStopIndex(),
SQLUtils.getExactlyValue(ctx.string_().getText())));
+ }
+ if (null != ctx.jobProps) {
+
result.setJobProperties(extractPropertiesSegmentFromPropertiesContext(ctx.jobProps));
+ }
+ if (null != ctx.streamingSourceType()) {
+
result.setSourceType(ctx.streamingSourceType().getText().toUpperCase());
+
result.setSourceProperties(extractPropertiesSegmentFromPropertiesContext(ctx.sourceProps));
+ result.setTargetDatabase(new
IdentifierValue(ctx.databaseName().getText()).getValue());
+ if (null != ctx.targetProps) {
+
result.setTargetProperties(extractPropertiesSegmentFromPropertiesContext(ctx.targetProps));
+ }
+ }
+ if (null != ctx.insert()) {
+ result.setInsertStatement((InsertStatement) visit(ctx.insert()));
+ }
+ return result;
+ }
+
+ @Override
+ public ASTNode visitCancelTask(final CancelTaskContext ctx) {
+ String jobName =
SQLUtils.getExactlyValue(ctx.stringLiterals().getText());
+ long taskId = Long.parseLong(ctx.numberLiterals().getText());
+ return new DorisCancelTaskStatement(getDatabaseType(), jobName,
taskId);
+ }
+
private JobScheduleSegment createJobScheduleSegment(final
JobScheduleExpressionContext ctx) {
boolean everySchedule = null == ctx.AT();
JobScheduleSegment result = new
JobScheduleSegment(ctx.start.getStartIndex(), ctx.stop.getStopIndex(),
everySchedule);
diff --git
a/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCancelTaskStatement.java
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCancelTaskStatement.java
new file mode 100644
index 00000000000..79d0456dc56
--- /dev/null
+++
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCancelTaskStatement.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.statement.doris.ddl;
+
+import lombok.Getter;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
+
+/**
+ * Cancel task statement for Doris.
+ */
+@Getter
+public final class DorisCancelTaskStatement extends DDLStatement {
+
+ private final String jobName;
+
+ private final long taskId;
+
+ public DorisCancelTaskStatement(final DatabaseType databaseType, final
String jobName, final long taskId) {
+ super(databaseType);
+ this.jobName = jobName;
+ this.taskId = taskId;
+ }
+}
diff --git
a/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCreateStreamingJobStatement.java
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCreateStreamingJobStatement.java
new file mode 100644
index 00000000000..52028e99cd7
--- /dev/null
+++
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCreateStreamingJobStatement.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.statement.doris.ddl;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.job.JobCommentSegment;
+import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.job.JobNameSegment;
+import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertiesSegment;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
+
+import java.util.Optional;
+
+/**
+ * Create streaming job statement for Doris.
+ */
+@Getter
+@Setter
+public final class DorisCreateStreamingJobStatement extends DDLStatement {
+
+ private JobNameSegment jobName;
+
+ private PropertiesSegment jobProperties;
+
+ private JobCommentSegment comment;
+
+ private String sourceType;
+
+ private PropertiesSegment sourceProperties;
+
+ private String targetDatabase;
+
+ private PropertiesSegment targetProperties;
+
+ private InsertStatement insertStatement;
+
+ public DorisCreateStreamingJobStatement(final DatabaseType databaseType) {
+ super(databaseType);
+ }
+
+ /**
+ * Get job properties.
+ *
+ * @return job properties
+ */
+ public Optional<PropertiesSegment> getJobProperties() {
+ return Optional.ofNullable(jobProperties);
+ }
+
+ /**
+ * Get comment.
+ *
+ * @return comment
+ */
+ public Optional<JobCommentSegment> getComment() {
+ return Optional.ofNullable(comment);
+ }
+
+ /**
+ * Get source type.
+ *
+ * @return source type
+ */
+ public Optional<String> getSourceType() {
+ return Optional.ofNullable(sourceType);
+ }
+
+ /**
+ * Get source properties.
+ *
+ * @return source properties
+ */
+ public Optional<PropertiesSegment> getSourceProperties() {
+ return Optional.ofNullable(sourceProperties);
+ }
+
+ /**
+ * Get target database.
+ *
+ * @return target database
+ */
+ public Optional<String> getTargetDatabase() {
+ return Optional.ofNullable(targetDatabase);
+ }
+
+ /**
+ * Get target properties.
+ *
+ * @return target properties
+ */
+ public Optional<PropertiesSegment> getTargetProperties() {
+ return Optional.ofNullable(targetProperties);
+ }
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCancelTaskStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCancelTaskStatementAssert.java
new file mode 100644
index 00000000000..3180e788705
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCancelTaskStatementAssert.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ddl.dialect.doris;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCancelTaskStatement;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCancelTaskStatementTestCase;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Cancel task statement assert for Doris.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DorisCancelTaskStatementAssert {
+
+ /**
+ * Assert cancel task statement is correct with expected parser result.
+ *
+ * @param assertContext assert context
+ * @param actual actual cancel task statement
+ * @param expected expected cancel task statement test case
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext,
final DorisCancelTaskStatement actual, final DorisCancelTaskStatementTestCase
expected) {
+ if (null != expected.getJobName()) {
+ assertThat(assertContext.getText("Job name does not match: "),
actual.getJobName(), is(expected.getJobName()));
+ }
+ assertThat(assertContext.getText("Task ID does not match: "),
actual.getTaskId(), is(expected.getTaskId()));
+ }
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementAssert.java
new file mode 100644
index 00000000000..c6b1919f973
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementAssert.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ddl.dialect.doris;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertySegment;
+import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
+import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateStreamingJobStatement;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.SQLSegmentAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.table.TableAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.PropertyTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateStreamingJobStatementTestCase;
+
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Create streaming job statement assert for Doris.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DorisCreateStreamingJobStatementAssert {
+
+ /**
+ * Assert create streaming job statement is correct with expected parser
result.
+ *
+ * @param assertContext assert context
+ * @param actual actual create streaming job statement
+ * @param expected expected create streaming job statement test case
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext,
final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ assertJobName(assertContext, actual, expected);
+ assertComment(assertContext, actual, expected);
+ assertSourceType(assertContext, actual, expected);
+ assertTargetDatabase(assertContext, actual, expected);
+ assertSourceProperties(assertContext, actual, expected);
+ assertTargetProperties(assertContext, actual, expected);
+ assertJobProperties(assertContext, actual, expected);
+ assertInsertStatement(assertContext, actual, expected);
+ }
+
+ private static void assertJobName(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ assertNotNull(actual.getJobName(), assertContext.getText("Job name
should exist."));
+ if (null != expected.getJobName()) {
+ assertThat(assertContext.getText("Job name does not match: "),
actual.getJobName().getIdentifier().getValue(),
is(expected.getJobName().getName()));
+ SQLSegmentAssert.assertIs(assertContext, actual.getJobName(),
expected.getJobName());
+ }
+ }
+
+ private static void assertComment(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ if (null != expected.getComment()) {
+ assertTrue(actual.getComment().isPresent(),
assertContext.getText("Comment should exist."));
+ assertThat(assertContext.getText("Comment value does not match:
"), actual.getComment().get().getValue(), is(expected.getComment().getValue()));
+ SQLSegmentAssert.assertIs(assertContext,
actual.getComment().get(), expected.getComment());
+ }
+ }
+
+ private static void assertSourceType(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ if (null != expected.getSourceType()) {
+ assertTrue(actual.getSourceType().isPresent(),
assertContext.getText("Source type should exist."));
+ assertThat(assertContext.getText("Source type does not match: "),
actual.getSourceType().get(), is(expected.getSourceType()));
+ }
+ }
+
+ private static void assertTargetDatabase(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ if (null != expected.getTargetDatabase()) {
+ assertTrue(actual.getTargetDatabase().isPresent(),
assertContext.getText("Target database should exist."));
+ assertThat(assertContext.getText("Target database does not match:
"), actual.getTargetDatabase().get(), is(expected.getTargetDatabase()));
+ }
+ }
+
+ private static void assertSourceProperties(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ if (!expected.getSourceProperties().isEmpty()) {
+ assertTrue(actual.getSourceProperties().isPresent(),
assertContext.getText("Source properties should exist."));
+ assertProperties(assertContext,
actual.getSourceProperties().get().getProperties(),
expected.getSourceProperties(), "Source");
+ }
+ }
+
+ private static void assertTargetProperties(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ if (!expected.getTargetProperties().isEmpty()) {
+ assertTrue(actual.getTargetProperties().isPresent(),
assertContext.getText("Target properties should exist."));
+ assertProperties(assertContext,
actual.getTargetProperties().get().getProperties(),
expected.getTargetProperties(), "Target");
+ }
+ }
+
+ private static void assertJobProperties(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ if (!expected.getJobProperties().isEmpty()) {
+ assertTrue(actual.getJobProperties().isPresent(),
assertContext.getText("Job properties should exist."));
+ assertProperties(assertContext,
actual.getJobProperties().get().getProperties(), expected.getJobProperties(),
"Job");
+ }
+ }
+
+ private static void assertInsertStatement(final SQLCaseAssertContext
assertContext, final DorisCreateStreamingJobStatement actual, final
DorisCreateStreamingJobStatementTestCase expected) {
+ if (null != expected.getInsertTable()) {
+ assertNotNull(actual.getInsertStatement(),
assertContext.getText("Insert statement should exist."));
+ assertTrue(actual.getInsertStatement().getTable().isPresent(),
assertContext.getText("Insert target table should exist."));
+ TableAssert.assertIs(assertContext,
actual.getInsertStatement().getTable().get(), expected.getInsertTable());
+ }
+ if (null != expected.getInsertSelectTable()) {
+ assertNotNull(actual.getInsertStatement(),
assertContext.getText("Insert statement should exist."));
+
assertTrue(actual.getInsertStatement().getInsertSelect().isPresent(),
assertContext.getText("Insert select should exist."));
+ SimpleTableSegment fromTable = (SimpleTableSegment)
actual.getInsertStatement().getInsertSelect().get().getSelect().getFrom().orElse(null);
+ assertNotNull(fromTable, assertContext.getText("Insert select from
table should exist."));
+ TableAssert.assertIs(assertContext, fromTable,
expected.getInsertSelectTable());
+ }
+ }
+
+ private static void assertProperties(final SQLCaseAssertContext
assertContext, final List<PropertySegment> actual, final List<PropertyTestCase>
expected, final String type) {
+ assertThat(assertContext.getText(type + " properties size does not
match: "), actual.size(), is(expected.size()));
+ for (int i = 0; i < expected.size(); i++) {
+ assertProperty(assertContext, actual.get(i), expected.get(i));
+ }
+ }
+
+ private static void assertProperty(final SQLCaseAssertContext
assertContext, final PropertySegment actual, final PropertyTestCase expected) {
+ assertThat(assertContext.getText(String.format("Property key '%s'
assertion error: ", expected.getKey())), actual.getKey(),
is(expected.getKey()));
+ assertThat(assertContext.getText(String.format("Property value for key
'%s' assertion error: ", expected.getKey())), actual.getValue(),
is(expected.getValue()));
+ SQLSegmentAssert.assertIs(assertContext, actual, expected);
+ }
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
index 44fc4cce063..a5994765726 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterColoca
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterStoragePolicyStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateFunctionStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropFunctionStatement;
+import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCancelTaskStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseJobStatement;
@@ -32,6 +33,7 @@ import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisResumeSyncJ
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseSyncJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisStopSyncJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateJobStatement;
+import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateStreamingJobStatement;
import
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateSyncJobStatement;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ddl.dialect.doris.type.DorisAlterColocateGroupStatementAssert;
@@ -41,6 +43,7 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisAlterStoragePolicyStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisDropFunctionStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCancelTaskStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisAlterJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisDropJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisPauseJobStatementTestCase;
@@ -48,6 +51,7 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisResumeSyncJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisPauseSyncJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisStopSyncJobStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateStreamingJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateSyncJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.standard.function.CreateFunctionStatementTestCase;
@@ -91,6 +95,10 @@ public final class DorisDDLStatementAssert {
DorisDropJobStatementAssert.assertIs(assertContext,
(DorisDropJobStatement) actual, (DorisDropJobStatementTestCase) expected);
} else if (actual instanceof DorisAlterJobStatement) {
DorisAlterJobStatementAssert.assertIs(assertContext,
(DorisAlterJobStatement) actual, (DorisAlterJobStatementTestCase) expected);
+ } else if (actual instanceof DorisCreateStreamingJobStatement) {
+ DorisCreateStreamingJobStatementAssert.assertIs(assertContext,
(DorisCreateStreamingJobStatement) actual,
(DorisCreateStreamingJobStatementTestCase) expected);
+ } else if (actual instanceof DorisCancelTaskStatement) {
+ DorisCancelTaskStatementAssert.assertIs(assertContext,
(DorisCancelTaskStatement) actual, (DorisCancelTaskStatementTestCase) expected);
}
}
}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 88e13c5c753..9dc40c235af 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -38,6 +38,8 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisPauseSyncJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisStopSyncJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateJobStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCancelTaskStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateStreamingJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateSyncJobStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.DorisAlterSqlBlockRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.DorisDropSqlBlockRuleStatementTestCase;
@@ -585,6 +587,9 @@ public final class RootSQLParserTestCases {
@XmlElement(name = "create-job")
private final List<DorisCreateJobStatementTestCase> createJobTestCases =
new LinkedList<>();
+ @XmlElement(name = "create-streaming-job")
+ private final List<DorisCreateStreamingJobStatementTestCase>
createStreamingJobTestCases = new LinkedList<>();
+
@XmlElement(name = "alter-catalog")
private final List<AlterCatalogStatementTestCase> alterCatalogTestCases =
new LinkedList<>();
@@ -1299,6 +1304,9 @@ public final class RootSQLParserTestCases {
@XmlElement(name = "cancel-backup")
private final List<DorisCancelBackupStatementTestCase>
cancelBackupTestCases = new LinkedList<>();
+ @XmlElement(name = "cancel-task")
+ private final List<DorisCancelTaskStatementTestCase> cancelTaskTestCases =
new LinkedList<>();
+
@XmlElement(name = "drop-shadow-algorithm")
private final List<DropShadowAlgorithmStatementTestCase>
dropShadowAlgorithmTestCases = new LinkedList<>();
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCancelTaskStatementTestCase.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCancelTaskStatementTestCase.java
new file mode 100644
index 00000000000..61c202b0a0d
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCancelTaskStatementTestCase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris;
+
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Cancel task statement test case for Doris.
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@Getter
+@Setter
+public final class DorisCancelTaskStatementTestCase extends SQLParserTestCase {
+
+ @XmlElement(name = "job-name")
+ private String jobName;
+
+ @XmlElement(name = "task-id")
+ private long taskId;
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementTestCase.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementTestCase.java
new file mode 100644
index 00000000000..396fa3e26c0
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementTestCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris;
+
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.job.ExpectedJobComment;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.job.ExpectedJobName;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.table.ExpectedSimpleTable;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.PropertyTestCase;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Create streaming job statement test case for Doris.
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@Getter
+@Setter
+public final class DorisCreateStreamingJobStatementTestCase extends
SQLParserTestCase {
+
+ @XmlElement(name = "job-name")
+ private ExpectedJobName jobName;
+
+ @XmlElement(name = "job-comment")
+ private ExpectedJobComment comment;
+
+ @XmlAttribute(name = "source-type")
+ private String sourceType;
+
+ @XmlAttribute(name = "target-database")
+ private String targetDatabase;
+
+ @XmlElement(name = "source-property")
+ private final List<PropertyTestCase> sourceProperties = new LinkedList<>();
+
+ @XmlElement(name = "target-property")
+ private final List<PropertyTestCase> targetProperties = new LinkedList<>();
+
+ @XmlElement(name = "job-property")
+ private final List<PropertyTestCase> jobProperties = new LinkedList<>();
+
+ @XmlElement(name = "insert-table")
+ private ExpectedSimpleTable insertTable;
+
+ @XmlElement(name = "insert-select-table")
+ private ExpectedSimpleTable insertSelectTable;
+}
diff --git a/test/it/parser/src/main/resources/case/ddl/cancel-task.xml
b/test/it/parser/src/main/resources/case/ddl/cancel-task.xml
new file mode 100644
index 00000000000..7b1252871c8
--- /dev/null
+++ b/test/it/parser/src/main/resources/case/ddl/cancel-task.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<sql-parser-test-cases>
+ <cancel-task sql-case-id="cancel_task_with_job_name_and_task_id">
+ <job-name>example</job-name>
+ <task-id>378912</task-id>
+ </cancel-task>
+
+ <cancel-task sql-case-id="cancel_task_with_double_quotes">
+ <job-name>my_job</job-name>
+ <task-id>100</task-id>
+ </cancel-task>
+</sql-parser-test-cases>
diff --git
a/test/it/parser/src/main/resources/case/ddl/create-streaming-job.xml
b/test/it/parser/src/main/resources/case/ddl/create-streaming-job.xml
new file mode 100644
index 00000000000..1e053751ba6
--- /dev/null
+++ b/test/it/parser/src/main/resources/case/ddl/create-streaming-job.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<sql-parser-test-cases>
+ <create-streaming-job sql-case-id="create_streaming_job_from_postgres"
source-type="POSTGRES" target-database="target_test_db">
+ <job-name name="test_postgres_job" start-index="11" stop-index="27" />
+ <source-property key="jdbc_url"
value="jdbc:postgresql://127.0.0.1:5432/postgres" start-index="57"
stop-index="112" />
+ <source-property key="driver_url" value="postgresql-42.5.0.jar"
start-index="115" stop-index="152" />
+ <source-property key="driver_class" value="org.postgresql.Driver"
start-index="155" stop-index="194" />
+ <source-property key="user" value="postgres" start-index="197"
stop-index="215" />
+ <source-property key="password" value="postgres" start-index="218"
stop-index="240" />
+ <source-property key="database" value="postgres" start-index="243"
stop-index="265" />
+ <source-property key="schema" value="public" start-index="268"
stop-index="286" />
+ <source-property key="include_tables" value="test_tbls"
start-index="289" stop-index="318" />
+ <source-property key="offset" value="latest" start-index="321"
stop-index="339" />
+ <target-property key="table.create.properties.replication_num"
value="1" start-index="370" stop-index="416" />
+ </create-streaming-job>
+
+ <create-streaming-job sql-case-id="create_streaming_job_from_mysql"
source-type="MYSQL" target-database="target_test_db">
+ <job-name name="multi_table_sync" start-index="11" stop-index="26" />
+ <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306"
start-index="53" stop-index="94" />
+ <source-property key="driver_url" value="mysql-connector-j-8.0.31.jar"
start-index="97" stop-index="141" />
+ <source-property key="driver_class" value="com.mysql.cj.jdbc.Driver"
start-index="144" stop-index="186" />
+ <source-property key="user" value="root" start-index="189"
stop-index="203" />
+ <source-property key="password" value="123456" start-index="206"
stop-index="226" />
+ <source-property key="database" value="test" start-index="229"
stop-index="247" />
+ <source-property key="include_tables" value="user_info,order_info"
start-index="250" stop-index="290" />
+ <source-property key="offset" value="initial" start-index="293"
stop-index="312" />
+ <target-property key="table.create.properties.replication_num"
value="1" start-index="343" stop-index="389" />
+ </create-streaming-job>
+
+ <create-streaming-job sql-case-id="create_streaming_job_do_insert">
+ <job-name name="my_job" start-index="11" stop-index="16" />
+ <insert-table name="tbl1" start-index="46" stop-index="53">
+ <owner name="db1" start-index="46" stop-index="48" />
+ </insert-table>
+ <insert-select-table name="tbl2" start-index="69" stop-index="76">
+ <owner name="db2" start-index="69" stop-index="71" />
+ </insert-select-table>
+ </create-streaming-job>
+
+ <create-streaming-job sql-case-id="create_streaming_job_with_comment"
source-type="MYSQL" target-database="target_db">
+ <job-name name="my_job" start-index="11" stop-index="16" />
+ <job-comment start-index="31" stop-index="53" value="streaming job" />
+ <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306"
start-index="67" stop-index="108" />
+ <source-property key="user" value="root" start-index="111"
stop-index="125" />
+ <source-property key="password" value="123456" start-index="128"
stop-index="148" />
+ <source-property key="database" value="test" start-index="151"
stop-index="169" />
+ <source-property key="include_tables" value="t1" start-index="172"
stop-index="194" />
+ <source-property key="offset" value="initial" start-index="197"
stop-index="216" />
+ </create-streaming-job>
+
+ <create-streaming-job
sql-case-id="create_streaming_job_with_job_properties" source-type="MYSQL"
target-database="target_db">
+ <job-name name="my_job" start-index="11" stop-index="16" />
+ <job-property key="max_interval" value="10s" start-index="43"
stop-index="64" />
+ <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306"
start-index="79" stop-index="120" />
+ <source-property key="user" value="root" start-index="123"
stop-index="137" />
+ <source-property key="password" value="123456" start-index="140"
stop-index="160" />
+ <source-property key="database" value="test" start-index="163"
stop-index="181" />
+ <source-property key="include_tables" value="t1" start-index="184"
stop-index="206" />
+ <source-property key="offset" value="initial" start-index="209"
stop-index="228" />
+ </create-streaming-job>
+
+ <create-streaming-job
sql-case-id="create_streaming_job_with_quoted_target_db" source-type="MYSQL"
target-database="target_db">
+ <job-name name="my_job" start-index="11" stop-index="16" />
+ <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306"
start-index="43" stop-index="84" />
+ <source-property key="user" value="root" start-index="87"
stop-index="101" />
+ <source-property key="password" value="123456" start-index="104"
stop-index="124" />
+ <source-property key="database" value="test" start-index="127"
stop-index="145" />
+ <source-property key="include_tables" value="t1" start-index="148"
stop-index="170" />
+ <source-property key="offset" value="initial" start-index="173"
stop-index="192" />
+ </create-streaming-job>
+</sql-parser-test-cases>
diff --git
a/test/it/parser/src/main/resources/sql/supported/ddl/cancel-task.xml
b/test/it/parser/src/main/resources/sql/supported/ddl/cancel-task.xml
new file mode 100644
index 00000000000..f58ac870595
--- /dev/null
+++ b/test/it/parser/src/main/resources/sql/supported/ddl/cancel-task.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<sql-cases>
+ <sql-case id="cancel_task_with_job_name_and_task_id" value="CANCEL TASK
WHERE jobName = 'example' AND taskId = 378912" db-types="Doris" />
+ <sql-case id="cancel_task_with_double_quotes" value="CANCEL TASK WHERE
jobName = "my_job" AND taskId = 100" db-types="Doris" />
+</sql-cases>
diff --git
a/test/it/parser/src/main/resources/sql/supported/ddl/create-streaming-job.xml
b/test/it/parser/src/main/resources/sql/supported/ddl/create-streaming-job.xml
new file mode 100644
index 00000000000..fb69f2d88e6
--- /dev/null
+++
b/test/it/parser/src/main/resources/sql/supported/ddl/create-streaming-job.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<sql-cases>
+ <sql-case id="create_streaming_job_from_postgres" value="CREATE JOB
test_postgres_job ON STREAMING FROM POSTGRES ("jdbc_url" =
"jdbc:postgresql://127.0.0.1:5432/postgres", "driver_url" =
"postgresql-42.5.0.jar", "driver_class" =
"org.postgresql.Driver", "user" = "postgres",
"password" = "postgres", "database" =
"postgres", "schema" = "public", &q [...]
+ <sql-case id="create_streaming_job_from_mysql" value="CREATE JOB
multi_table_sync ON STREAMING FROM MYSQL ("jdbc_url" =
"jdbc:mysql://127.0.0.1:3306", "driver_url" =
"mysql-connector-j-8.0.31.jar", "driver_class" =
"com.mysql.cj.jdbc.Driver", "user" = "root",
"password" = "123456", "database" =
"test", "include_tables" =
"user_info,order_info", & [...]
+ <sql-case id="create_streaming_job_do_insert" value="CREATE JOB my_job ON
STREAMING DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2" db-types="Doris" />
+ <sql-case id="create_streaming_job_with_comment" value="CREATE JOB my_job
ON STREAMING COMMENT 'streaming job' FROM MYSQL ("jdbc_url" =
"jdbc:mysql://127.0.0.1:3306", "user" = "root",
"password" = "123456", "database" =
"test", "include_tables" = "t1",
"offset" = "initial") TO DATABASE target_db"
db-types="Doris" />
+ <sql-case id="create_streaming_job_with_job_properties" value="CREATE JOB
my_job ON STREAMING PROPERTIES ("max_interval" = "10s")
FROM MYSQL ("jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
"user" = "root", "password" = "123456",
"database" = "test", "include_tables" =
"t1", "offset" = "initial") TO DATABASE
target_db" db-types="Doris" />
+ <sql-case id="create_streaming_job_with_quoted_target_db" value="CREATE
JOB my_job ON STREAMING FROM MYSQL ("jdbc_url" =
"jdbc:mysql://127.0.0.1:3306", "user" = "root",
"password" = "123456", "database" =
"test", "include_tables" = "t1",
"offset" = "initial") TO DATABASE `target_db`"
db-types="Doris" />
+</sql-cases>