This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a47e4e8f056 Support parsing Doris CREATE STREAMING JOB and CANCEL TASK 
syntax (#38318)
a47e4e8f056 is described below

commit a47e4e8f05619260f64530bcd27e49c69340e6fe
Author: cxy <[email protected]>
AuthorDate: Wed Mar 4 23:11:04 2026 +0800

    Support parsing Doris CREATE STREAMING JOB and CANCEL TASK syntax (#38318)
    
    * Support parsing Doris CREATE STREAMING JOB and CANCEL TASK syntax
    
    * Support parsing Doris CREATE STREAMING JOB and CANCEL TASK syntax
---
 .../core/database/visitor/SQLVisitorRule.java      |   4 +
 .../src/main/antlr4/imports/doris/BaseRule.g4      |   3 +
 .../src/main/antlr4/imports/doris/DDLStatement.g4  |  14 +++
 .../src/main/antlr4/imports/doris/DorisKeyword.g4  |  12 ++
 .../sql/parser/autogen/DorisStatement.g4           |   2 +
 .../statement/type/DorisDDLStatementVisitor.java   |  35 ++++++
 .../doris/ddl/DorisCancelTaskStatement.java        |  39 ++++++
 .../ddl/DorisCreateStreamingJobStatement.java      | 111 ++++++++++++++++
 .../doris/DorisCancelTaskStatementAssert.java      |  48 +++++++
 .../DorisCreateStreamingJobStatementAssert.java    | 140 +++++++++++++++++++++
 .../ddl/dialect/doris/DorisDDLStatementAssert.java |   8 ++
 .../cases/parser/jaxb/RootSQLParserTestCases.java  |   8 ++
 .../doris/DorisCancelTaskStatementTestCase.java    |  41 ++++++
 .../DorisCreateStreamingJobStatementTestCase.java  |  69 ++++++++++
 .../src/main/resources/case/ddl/cancel-task.xml    |  29 +++++
 .../resources/case/ddl/create-streaming-job.xml    |  88 +++++++++++++
 .../resources/sql/supported/ddl/cancel-task.xml    |  22 ++++
 .../sql/supported/ddl/create-streaming-job.xml     |  26 ++++
 18 files changed, 699 insertions(+)

diff --git 
a/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
 
b/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
index ea5de1f229b..e8867cc0887 100644
--- 
a/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
+++ 
b/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
@@ -291,6 +291,10 @@ public enum SQLVisitorRule {
     
     CREATE_JOB("CreateJob", SQLStatementType.DDL),
     
+    CREATE_STREAMING_JOB("CreateStreamingJob", SQLStatementType.DDL),
+    
+    CANCEL_TASK("CancelTask", SQLStatementType.DDL),
+    
     RESUME_JOB("ResumeJob", SQLStatementType.DDL),
     
     RESUME_SYNC_JOB("ResumeSyncJob", SQLStatementType.DDL),
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
index 2f23f17222e..412b7d7261a 100644
--- a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
+++ b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
@@ -350,6 +350,7 @@ identifierKeywordsUnambiguous
     | MULTIPOINT
     | MULTIPOLYGON
     | MUTEX
+    | MYSQL
     | Doris_ERRNO
     | NAMES
     | NAME
@@ -401,6 +402,7 @@ identifierKeywordsUnambiguous
     | POINT
     | POLYGON
     | PORT
+    | POSTGRES
     | PRECEDING
     | PRESERVE
     | PREV
@@ -495,6 +497,7 @@ identifierKeywordsUnambiguous
     | STATUS
     | STORAGE
     | STREAM
+    | STREAMING
     | STRING
     // DORIS ADDED BEGIN
     | STRRIGHT
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4
index b48a8b52d82..fc2764dd78a 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4
+++ 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DDLStatement.g4
@@ -999,6 +999,16 @@ createJob
     : CREATE JOB jobName ON SCHEDULE jobScheduleExpression (COMMENT string_)? 
DO insert
     ;
 
+createStreamingJob
+    : CREATE JOB jobName ON STREAMING (PROPERTIES LP_ jobProps=properties 
RP_)? (COMMENT string_)?
+      (DO insert | FROM streamingSourceType LP_ sourceProps=properties RP_ TO 
DATABASE databaseName (PROPERTIES? LP_ targetProps=properties RP_)?)
+    ;
+
+streamingSourceType
+    : MYSQL
+    | POSTGRES
+    ;
+
 jobScheduleExpression
     : AT timestampValue
     | EVERY intervalValue (STARTS timestampValue)? (ENDS timestampValue)?
@@ -1020,6 +1030,10 @@ alterJob
     : ALTER JOB jobName propertiesClause? (DO insert)?
     ;
 
+cancelTask
+    : CANCEL TASK WHERE jobName EQ_ stringLiterals AND identifier EQ_ 
numberLiterals
+    ;
+
 resumeSyncJob
     : RESUME SYNC JOB (owner DOT_)? identifier
     ;
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
index 91e0c971f8f..bd5f3356fec 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
+++ 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
@@ -1689,6 +1689,10 @@ MUTEX
     : M U T E X
     ;
 
+MYSQL
+    : M Y S Q L
+    ;
+
 Doris_ERRNO
     : M Y S Q L UL_ E R R N O
     ;
@@ -2006,6 +2010,10 @@ PORT
     : P O R T
     ;
 
+POSTGRES
+    : P O S T G R E S
+    ;
+
 PRECEDES
     : P R E C E D E S
     ;
@@ -2720,6 +2728,10 @@ STREAM
     : S T R E A M
     ;
 
+STREAMING
+    : S T R E A M I N G
+    ;
+
 STRING
     : S T R I N G
     ;
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
 
b/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
index 2f11cd11595..da0266851b3 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
+++ 
b/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
@@ -162,6 +162,8 @@ execute
     | sync
     | unsetVariable
     | createJob
+    | createStreamingJob
+    | cancelTask
     | backup
     | cancelBackup
     // TODO consider refactor following sytax to SEMI_? EOF
diff --git 
a/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
 
b/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
index 16023d3dae3..01b61c36810 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
+++ 
b/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDDLStatementVisitor.java
@@ -73,6 +73,7 @@ import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateF
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateFunctionContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateIndexContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateJobContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateStreamingJobContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateLikeClauseContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateLogfileGroupContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateMaterializedViewContext;
@@ -135,6 +136,7 @@ import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RenameR
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RenameTableContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RepeatStatementContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ReplaceTableContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CancelTaskContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ResumeJobContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ResumeSyncJobContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.RollupItemContext;
@@ -267,9 +269,11 @@ import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterColoca
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterStoragePolicyStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateFunctionStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateJobStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateStreamingJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateSyncJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropFunctionStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseSyncJobStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCancelTaskStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseJobStatement;
@@ -520,6 +524,37 @@ public final class DorisDDLStatementVisitor extends 
DorisStatementVisitor implem
         return result;
     }
     
+    @Override
+    public ASTNode visitCreateStreamingJob(final CreateStreamingJobContext 
ctx) {
+        DorisCreateStreamingJobStatement result = new 
DorisCreateStreamingJobStatement(getDatabaseType());
+        result.setJobName(new 
JobNameSegment(ctx.jobName().start.getStartIndex(), 
ctx.jobName().stop.getStopIndex(), (IdentifierValue) 
visit(ctx.jobName().identifier())));
+        if (null != ctx.COMMENT() && null != ctx.string_()) {
+            result.setComment(new 
JobCommentSegment(ctx.COMMENT().getSymbol().getStartIndex(), 
ctx.string_().stop.getStopIndex(), 
SQLUtils.getExactlyValue(ctx.string_().getText())));
+        }
+        if (null != ctx.jobProps) {
+            
result.setJobProperties(extractPropertiesSegmentFromPropertiesContext(ctx.jobProps));
+        }
+        if (null != ctx.streamingSourceType()) {
+            
result.setSourceType(ctx.streamingSourceType().getText().toUpperCase());
+            
result.setSourceProperties(extractPropertiesSegmentFromPropertiesContext(ctx.sourceProps));
+            result.setTargetDatabase(new 
IdentifierValue(ctx.databaseName().getText()).getValue());
+            if (null != ctx.targetProps) {
+                
result.setTargetProperties(extractPropertiesSegmentFromPropertiesContext(ctx.targetProps));
+            }
+        }
+        if (null != ctx.insert()) {
+            result.setInsertStatement((InsertStatement) visit(ctx.insert()));
+        }
+        return result;
+    }
+    
+    @Override
+    public ASTNode visitCancelTask(final CancelTaskContext ctx) {
+        String jobName = 
SQLUtils.getExactlyValue(ctx.stringLiterals().getText());
+        long taskId = Long.parseLong(ctx.numberLiterals().getText());
+        return new DorisCancelTaskStatement(getDatabaseType(), jobName, 
taskId);
+    }
+    
     private JobScheduleSegment createJobScheduleSegment(final 
JobScheduleExpressionContext ctx) {
         boolean everySchedule = null == ctx.AT();
         JobScheduleSegment result = new 
JobScheduleSegment(ctx.start.getStartIndex(), ctx.stop.getStopIndex(), 
everySchedule);
diff --git 
a/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCancelTaskStatement.java
 
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCancelTaskStatement.java
new file mode 100644
index 00000000000..79d0456dc56
--- /dev/null
+++ 
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCancelTaskStatement.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.statement.doris.ddl;
+
+import lombok.Getter;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
+
+/**
+ * Cancel task statement for Doris.
+ */
+@Getter
+public final class DorisCancelTaskStatement extends DDLStatement {
+    
+    private final String jobName;
+    
+    private final long taskId;
+    
+    public DorisCancelTaskStatement(final DatabaseType databaseType, final 
String jobName, final long taskId) {
+        super(databaseType);
+        this.jobName = jobName;
+        this.taskId = taskId;
+    }
+}
diff --git 
a/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCreateStreamingJobStatement.java
 
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCreateStreamingJobStatement.java
new file mode 100644
index 00000000000..52028e99cd7
--- /dev/null
+++ 
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/ddl/DorisCreateStreamingJobStatement.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.statement.doris.ddl;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.job.JobCommentSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.job.JobNameSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertiesSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
+
+import java.util.Optional;
+
+/**
+ * Create streaming job statement for Doris.
+ */
+@Getter
+@Setter
+public final class DorisCreateStreamingJobStatement extends DDLStatement {
+    
+    private JobNameSegment jobName;
+    
+    private PropertiesSegment jobProperties;
+    
+    private JobCommentSegment comment;
+    
+    private String sourceType;
+    
+    private PropertiesSegment sourceProperties;
+    
+    private String targetDatabase;
+    
+    private PropertiesSegment targetProperties;
+    
+    private InsertStatement insertStatement;
+    
+    public DorisCreateStreamingJobStatement(final DatabaseType databaseType) {
+        super(databaseType);
+    }
+    
+    /**
+     * Get job properties.
+     *
+     * @return job properties
+     */
+    public Optional<PropertiesSegment> getJobProperties() {
+        return Optional.ofNullable(jobProperties);
+    }
+    
+    /**
+     * Get comment.
+     *
+     * @return comment
+     */
+    public Optional<JobCommentSegment> getComment() {
+        return Optional.ofNullable(comment);
+    }
+    
+    /**
+     * Get source type.
+     *
+     * @return source type
+     */
+    public Optional<String> getSourceType() {
+        return Optional.ofNullable(sourceType);
+    }
+    
+    /**
+     * Get source properties.
+     *
+     * @return source properties
+     */
+    public Optional<PropertiesSegment> getSourceProperties() {
+        return Optional.ofNullable(sourceProperties);
+    }
+    
+    /**
+     * Get target database.
+     *
+     * @return target database
+     */
+    public Optional<String> getTargetDatabase() {
+        return Optional.ofNullable(targetDatabase);
+    }
+    
+    /**
+     * Get target properties.
+     *
+     * @return target properties
+     */
+    public Optional<PropertiesSegment> getTargetProperties() {
+        return Optional.ofNullable(targetProperties);
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCancelTaskStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCancelTaskStatementAssert.java
new file mode 100644
index 00000000000..3180e788705
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCancelTaskStatementAssert.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ddl.dialect.doris;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCancelTaskStatement;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCancelTaskStatementTestCase;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Cancel task statement assert for Doris.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DorisCancelTaskStatementAssert {
+    
+    /**
+     * Assert cancel task statement is correct with expected parser result.
+     *
+     * @param assertContext assert context
+     * @param actual actual cancel task statement
+     * @param expected expected cancel task statement test case
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, 
final DorisCancelTaskStatement actual, final DorisCancelTaskStatementTestCase 
expected) {
+        if (null != expected.getJobName()) {
+            assertThat(assertContext.getText("Job name does not match: "), 
actual.getJobName(), is(expected.getJobName()));
+        }
+        assertThat(assertContext.getText("Task ID does not match: "), 
actual.getTaskId(), is(expected.getTaskId()));
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementAssert.java
new file mode 100644
index 00000000000..c6b1919f973
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementAssert.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ddl.dialect.doris;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertySegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateStreamingJobStatement;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.SQLSegmentAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.table.TableAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.PropertyTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateStreamingJobStatementTestCase;
+
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Create streaming job statement assert for Doris.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DorisCreateStreamingJobStatementAssert {
+    
+    /**
+     * Assert create streaming job statement is correct with expected parser 
result.
+     *
+     * @param assertContext assert context
+     * @param actual actual create streaming job statement
+     * @param expected expected create streaming job statement test case
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, 
final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        assertJobName(assertContext, actual, expected);
+        assertComment(assertContext, actual, expected);
+        assertSourceType(assertContext, actual, expected);
+        assertTargetDatabase(assertContext, actual, expected);
+        assertSourceProperties(assertContext, actual, expected);
+        assertTargetProperties(assertContext, actual, expected);
+        assertJobProperties(assertContext, actual, expected);
+        assertInsertStatement(assertContext, actual, expected);
+    }
+    
+    private static void assertJobName(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        assertNotNull(actual.getJobName(), assertContext.getText("Job name 
should exist."));
+        if (null != expected.getJobName()) {
+            assertThat(assertContext.getText("Job name does not match: "), 
actual.getJobName().getIdentifier().getValue(), 
is(expected.getJobName().getName()));
+            SQLSegmentAssert.assertIs(assertContext, actual.getJobName(), 
expected.getJobName());
+        }
+    }
+    
+    private static void assertComment(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        if (null != expected.getComment()) {
+            assertTrue(actual.getComment().isPresent(), 
assertContext.getText("Comment should exist."));
+            assertThat(assertContext.getText("Comment value does not match: 
"), actual.getComment().get().getValue(), is(expected.getComment().getValue()));
+            SQLSegmentAssert.assertIs(assertContext, 
actual.getComment().get(), expected.getComment());
+        }
+    }
+    
+    private static void assertSourceType(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        if (null != expected.getSourceType()) {
+            assertTrue(actual.getSourceType().isPresent(), 
assertContext.getText("Source type should exist."));
+            assertThat(assertContext.getText("Source type does not match: "), 
actual.getSourceType().get(), is(expected.getSourceType()));
+        }
+    }
+    
+    private static void assertTargetDatabase(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        if (null != expected.getTargetDatabase()) {
+            assertTrue(actual.getTargetDatabase().isPresent(), 
assertContext.getText("Target database should exist."));
+            assertThat(assertContext.getText("Target database does not match: 
"), actual.getTargetDatabase().get(), is(expected.getTargetDatabase()));
+        }
+    }
+    
+    private static void assertSourceProperties(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        if (!expected.getSourceProperties().isEmpty()) {
+            assertTrue(actual.getSourceProperties().isPresent(), 
assertContext.getText("Source properties should exist."));
+            assertProperties(assertContext, 
actual.getSourceProperties().get().getProperties(), 
expected.getSourceProperties(), "Source");
+        }
+    }
+    
+    private static void assertTargetProperties(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        if (!expected.getTargetProperties().isEmpty()) {
+            assertTrue(actual.getTargetProperties().isPresent(), 
assertContext.getText("Target properties should exist."));
+            assertProperties(assertContext, 
actual.getTargetProperties().get().getProperties(), 
expected.getTargetProperties(), "Target");
+        }
+    }
+    
+    private static void assertJobProperties(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        if (!expected.getJobProperties().isEmpty()) {
+            assertTrue(actual.getJobProperties().isPresent(), 
assertContext.getText("Job properties should exist."));
+            assertProperties(assertContext, 
actual.getJobProperties().get().getProperties(), expected.getJobProperties(), 
"Job");
+        }
+    }
+    
+    private static void assertInsertStatement(final SQLCaseAssertContext 
assertContext, final DorisCreateStreamingJobStatement actual, final 
DorisCreateStreamingJobStatementTestCase expected) {
+        if (null != expected.getInsertTable()) {
+            assertNotNull(actual.getInsertStatement(), 
assertContext.getText("Insert statement should exist."));
+            assertTrue(actual.getInsertStatement().getTable().isPresent(), 
assertContext.getText("Insert target table should exist."));
+            TableAssert.assertIs(assertContext, 
actual.getInsertStatement().getTable().get(), expected.getInsertTable());
+        }
+        if (null != expected.getInsertSelectTable()) {
+            assertNotNull(actual.getInsertStatement(), 
assertContext.getText("Insert statement should exist."));
+            
assertTrue(actual.getInsertStatement().getInsertSelect().isPresent(), 
assertContext.getText("Insert select should exist."));
+            SimpleTableSegment fromTable = (SimpleTableSegment) 
actual.getInsertStatement().getInsertSelect().get().getSelect().getFrom().orElse(null);
+            assertNotNull(fromTable, assertContext.getText("Insert select from 
table should exist."));
+            TableAssert.assertIs(assertContext, fromTable, 
expected.getInsertSelectTable());
+        }
+    }
+    
+    private static void assertProperties(final SQLCaseAssertContext 
assertContext, final List<PropertySegment> actual, final List<PropertyTestCase> 
expected, final String type) {
+        assertThat(assertContext.getText(type + " properties size does not 
match: "), actual.size(), is(expected.size()));
+        for (int i = 0; i < expected.size(); i++) {
+            assertProperty(assertContext, actual.get(i), expected.get(i));
+        }
+    }
+    
+    private static void assertProperty(final SQLCaseAssertContext 
assertContext, final PropertySegment actual, final PropertyTestCase expected) {
+        assertThat(assertContext.getText(String.format("Property key '%s' 
assertion error: ", expected.getKey())), actual.getKey(), 
is(expected.getKey()));
+        assertThat(assertContext.getText(String.format("Property value for key 
'%s' assertion error: ", expected.getKey())), actual.getValue(), 
is(expected.getValue()));
+        SQLSegmentAssert.assertIs(assertContext, actual, expected);
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
index 44fc4cce063..a5994765726 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ddl/dialect/doris/DorisDDLStatementAssert.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterColoca
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterStoragePolicyStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateFunctionStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropFunctionStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCancelTaskStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisAlterJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisDropJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseJobStatement;
@@ -32,6 +33,7 @@ import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisResumeSyncJ
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisPauseSyncJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisStopSyncJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateJobStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateStreamingJobStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.doris.ddl.DorisCreateSyncJobStatement;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ddl.dialect.doris.type.DorisAlterColocateGroupStatementAssert;
@@ -41,6 +43,7 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisAlterStoragePolicyStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisDropFunctionStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCancelTaskStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisAlterJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisDropJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisPauseJobStatementTestCase;
@@ -48,6 +51,7 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisResumeSyncJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisPauseSyncJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisStopSyncJobStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateStreamingJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateSyncJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.standard.function.CreateFunctionStatementTestCase;
 
@@ -91,6 +95,10 @@ public final class DorisDDLStatementAssert {
             DorisDropJobStatementAssert.assertIs(assertContext, 
(DorisDropJobStatement) actual, (DorisDropJobStatementTestCase) expected);
         } else if (actual instanceof DorisAlterJobStatement) {
             DorisAlterJobStatementAssert.assertIs(assertContext, 
(DorisAlterJobStatement) actual, (DorisAlterJobStatementTestCase) expected);
+        } else if (actual instanceof DorisCreateStreamingJobStatement) {
+            DorisCreateStreamingJobStatementAssert.assertIs(assertContext, 
(DorisCreateStreamingJobStatement) actual, 
(DorisCreateStreamingJobStatementTestCase) expected);
+        } else if (actual instanceof DorisCancelTaskStatement) {
+            DorisCancelTaskStatementAssert.assertIs(assertContext, 
(DorisCancelTaskStatement) actual, (DorisCancelTaskStatementTestCase) expected);
         }
     }
 }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 88e13c5c753..9dc40c235af 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -38,6 +38,8 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisPauseSyncJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisStopSyncJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateJobStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCancelTaskStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateStreamingJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris.DorisCreateSyncJobStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.DorisAlterSqlBlockRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.DorisDropSqlBlockRuleStatementTestCase;
@@ -585,6 +587,9 @@ public final class RootSQLParserTestCases {
     @XmlElement(name = "create-job")
     private final List<DorisCreateJobStatementTestCase> createJobTestCases = 
new LinkedList<>();
     
+    @XmlElement(name = "create-streaming-job")
+    private final List<DorisCreateStreamingJobStatementTestCase> 
createStreamingJobTestCases = new LinkedList<>();
+    
     @XmlElement(name = "alter-catalog")
     private final List<AlterCatalogStatementTestCase> alterCatalogTestCases = 
new LinkedList<>();
     
@@ -1299,6 +1304,9 @@ public final class RootSQLParserTestCases {
     @XmlElement(name = "cancel-backup")
     private final List<DorisCancelBackupStatementTestCase> 
cancelBackupTestCases = new LinkedList<>();
     
+    @XmlElement(name = "cancel-task")
+    private final List<DorisCancelTaskStatementTestCase> cancelTaskTestCases = 
new LinkedList<>();
+    
     @XmlElement(name = "drop-shadow-algorithm")
     private final List<DropShadowAlgorithmStatementTestCase> 
dropShadowAlgorithmTestCases = new LinkedList<>();
     
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCancelTaskStatementTestCase.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCancelTaskStatementTestCase.java
new file mode 100644
index 00000000000..61c202b0a0d
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCancelTaskStatementTestCase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris;
+
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Cancel task statement test case for Doris.
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@Getter
+@Setter
+public final class DorisCancelTaskStatementTestCase extends SQLParserTestCase {
+    
+    @XmlElement(name = "job-name")
+    private String jobName;
+    
+    @XmlElement(name = "task-id")
+    private long taskId;
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementTestCase.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementTestCase.java
new file mode 100644
index 00000000000..396fa3e26c0
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ddl/dialect/doris/DorisCreateStreamingJobStatementTestCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.dialect.doris;
+
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.job.ExpectedJobComment;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.job.ExpectedJobName;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.table.ExpectedSimpleTable;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.PropertyTestCase;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Create streaming job statement test case for Doris.
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@Getter
+@Setter
+public final class DorisCreateStreamingJobStatementTestCase extends 
SQLParserTestCase {
+    
+    @XmlElement(name = "job-name")
+    private ExpectedJobName jobName;
+    
+    @XmlElement(name = "job-comment")
+    private ExpectedJobComment comment;
+    
+    @XmlAttribute(name = "source-type")
+    private String sourceType;
+    
+    @XmlAttribute(name = "target-database")
+    private String targetDatabase;
+    
+    @XmlElement(name = "source-property")
+    private final List<PropertyTestCase> sourceProperties = new LinkedList<>();
+    
+    @XmlElement(name = "target-property")
+    private final List<PropertyTestCase> targetProperties = new LinkedList<>();
+    
+    @XmlElement(name = "job-property")
+    private final List<PropertyTestCase> jobProperties = new LinkedList<>();
+    
+    @XmlElement(name = "insert-table")
+    private ExpectedSimpleTable insertTable;
+    
+    @XmlElement(name = "insert-select-table")
+    private ExpectedSimpleTable insertSelectTable;
+}
diff --git a/test/it/parser/src/main/resources/case/ddl/cancel-task.xml 
b/test/it/parser/src/main/resources/case/ddl/cancel-task.xml
new file mode 100644
index 00000000000..7b1252871c8
--- /dev/null
+++ b/test/it/parser/src/main/resources/case/ddl/cancel-task.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-parser-test-cases>
+    <cancel-task sql-case-id="cancel_task_with_job_name_and_task_id">
+        <job-name>example</job-name>
+        <task-id>378912</task-id>
+    </cancel-task>
+
+    <cancel-task sql-case-id="cancel_task_with_double_quotes">
+        <job-name>my_job</job-name>
+        <task-id>100</task-id>
+    </cancel-task>
+</sql-parser-test-cases>
diff --git 
a/test/it/parser/src/main/resources/case/ddl/create-streaming-job.xml 
b/test/it/parser/src/main/resources/case/ddl/create-streaming-job.xml
new file mode 100644
index 00000000000..1e053751ba6
--- /dev/null
+++ b/test/it/parser/src/main/resources/case/ddl/create-streaming-job.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-parser-test-cases>
+    <create-streaming-job sql-case-id="create_streaming_job_from_postgres" 
source-type="POSTGRES" target-database="target_test_db">
+        <job-name name="test_postgres_job" start-index="11" stop-index="27" />
+        <source-property key="jdbc_url" 
value="jdbc:postgresql://127.0.0.1:5432/postgres" start-index="57" 
stop-index="112" />
+        <source-property key="driver_url" value="postgresql-42.5.0.jar" 
start-index="115" stop-index="152" />
+        <source-property key="driver_class" value="org.postgresql.Driver" 
start-index="155" stop-index="194" />
+        <source-property key="user" value="postgres" start-index="197" 
stop-index="215" />
+        <source-property key="password" value="postgres" start-index="218" 
stop-index="240" />
+        <source-property key="database" value="postgres" start-index="243" 
stop-index="265" />
+        <source-property key="schema" value="public" start-index="268" 
stop-index="286" />
+        <source-property key="include_tables" value="test_tbls" 
start-index="289" stop-index="318" />
+        <source-property key="offset" value="latest" start-index="321" 
stop-index="339" />
+        <target-property key="table.create.properties.replication_num" 
value="1" start-index="370" stop-index="416" />
+    </create-streaming-job>
+
+    <create-streaming-job sql-case-id="create_streaming_job_from_mysql" 
source-type="MYSQL" target-database="target_test_db">
+        <job-name name="multi_table_sync" start-index="11" stop-index="26" />
+        <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306" 
start-index="53" stop-index="94" />
+        <source-property key="driver_url" value="mysql-connector-j-8.0.31.jar" 
start-index="97" stop-index="141" />
+        <source-property key="driver_class" value="com.mysql.cj.jdbc.Driver" 
start-index="144" stop-index="186" />
+        <source-property key="user" value="root" start-index="189" 
stop-index="203" />
+        <source-property key="password" value="123456" start-index="206" 
stop-index="226" />
+        <source-property key="database" value="test" start-index="229" 
stop-index="247" />
+        <source-property key="include_tables" value="user_info,order_info" 
start-index="250" stop-index="290" />
+        <source-property key="offset" value="initial" start-index="293" 
stop-index="312" />
+        <target-property key="table.create.properties.replication_num" 
value="1" start-index="343" stop-index="389" />
+    </create-streaming-job>
+
+    <create-streaming-job sql-case-id="create_streaming_job_do_insert">
+        <job-name name="my_job" start-index="11" stop-index="16" />
+        <insert-table name="tbl1" start-index="46" stop-index="53">
+            <owner name="db1" start-index="46" stop-index="48" />
+        </insert-table>
+        <insert-select-table name="tbl2" start-index="69" stop-index="76">
+            <owner name="db2" start-index="69" stop-index="71" />
+        </insert-select-table>
+    </create-streaming-job>
+
+    <create-streaming-job sql-case-id="create_streaming_job_with_comment" 
source-type="MYSQL" target-database="target_db">
+        <job-name name="my_job" start-index="11" stop-index="16" />
+        <job-comment start-index="31" stop-index="53" value="streaming job" />
+        <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306" 
start-index="67" stop-index="108" />
+        <source-property key="user" value="root" start-index="111" 
stop-index="125" />
+        <source-property key="password" value="123456" start-index="128" 
stop-index="148" />
+        <source-property key="database" value="test" start-index="151" 
stop-index="169" />
+        <source-property key="include_tables" value="t1" start-index="172" 
stop-index="194" />
+        <source-property key="offset" value="initial" start-index="197" 
stop-index="216" />
+    </create-streaming-job>
+
+    <create-streaming-job 
sql-case-id="create_streaming_job_with_job_properties" source-type="MYSQL" 
target-database="target_db">
+        <job-name name="my_job" start-index="11" stop-index="16" />
+        <job-property key="max_interval" value="10s" start-index="43" 
stop-index="64" />
+        <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306" 
start-index="79" stop-index="120" />
+        <source-property key="user" value="root" start-index="123" 
stop-index="137" />
+        <source-property key="password" value="123456" start-index="140" 
stop-index="160" />
+        <source-property key="database" value="test" start-index="163" 
stop-index="181" />
+        <source-property key="include_tables" value="t1" start-index="184" 
stop-index="206" />
+        <source-property key="offset" value="initial" start-index="209" 
stop-index="228" />
+    </create-streaming-job>
+
+    <create-streaming-job 
sql-case-id="create_streaming_job_with_quoted_target_db" source-type="MYSQL" 
target-database="target_db">
+        <job-name name="my_job" start-index="11" stop-index="16" />
+        <source-property key="jdbc_url" value="jdbc:mysql://127.0.0.1:3306" 
start-index="43" stop-index="84" />
+        <source-property key="user" value="root" start-index="87" 
stop-index="101" />
+        <source-property key="password" value="123456" start-index="104" 
stop-index="124" />
+        <source-property key="database" value="test" start-index="127" 
stop-index="145" />
+        <source-property key="include_tables" value="t1" start-index="148" 
stop-index="170" />
+        <source-property key="offset" value="initial" start-index="173" 
stop-index="192" />
+    </create-streaming-job>
+</sql-parser-test-cases>
diff --git 
a/test/it/parser/src/main/resources/sql/supported/ddl/cancel-task.xml 
b/test/it/parser/src/main/resources/sql/supported/ddl/cancel-task.xml
new file mode 100644
index 00000000000..f58ac870595
--- /dev/null
+++ b/test/it/parser/src/main/resources/sql/supported/ddl/cancel-task.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-cases>
+    <sql-case id="cancel_task_with_job_name_and_task_id" value="CANCEL TASK 
WHERE jobName = 'example' AND taskId = 378912" db-types="Doris" />
+    <sql-case id="cancel_task_with_double_quotes" value="CANCEL TASK WHERE 
jobName = &quot;my_job&quot; AND taskId = 100" db-types="Doris" />
+</sql-cases>
diff --git 
a/test/it/parser/src/main/resources/sql/supported/ddl/create-streaming-job.xml 
b/test/it/parser/src/main/resources/sql/supported/ddl/create-streaming-job.xml
new file mode 100644
index 00000000000..fb69f2d88e6
--- /dev/null
+++ 
b/test/it/parser/src/main/resources/sql/supported/ddl/create-streaming-job.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-cases>
+    <sql-case id="create_streaming_job_from_postgres" value="CREATE JOB 
test_postgres_job ON STREAMING FROM POSTGRES (&quot;jdbc_url&quot; = 
&quot;jdbc:postgresql://127.0.0.1:5432/postgres&quot;, &quot;driver_url&quot; = 
&quot;postgresql-42.5.0.jar&quot;, &quot;driver_class&quot; = 
&quot;org.postgresql.Driver&quot;, &quot;user&quot; = &quot;postgres&quot;, 
&quot;password&quot; = &quot;postgres&quot;, &quot;database&quot; = 
&quot;postgres&quot;, &quot;schema&quot; = &quot;public&quot;, &q [...]
+    <sql-case id="create_streaming_job_from_mysql" value="CREATE JOB 
multi_table_sync ON STREAMING FROM MYSQL (&quot;jdbc_url&quot; = 
&quot;jdbc:mysql://127.0.0.1:3306&quot;, &quot;driver_url&quot; = 
&quot;mysql-connector-j-8.0.31.jar&quot;, &quot;driver_class&quot; = 
&quot;com.mysql.cj.jdbc.Driver&quot;, &quot;user&quot; = &quot;root&quot;, 
&quot;password&quot; = &quot;123456&quot;, &quot;database&quot; = 
&quot;test&quot;, &quot;include_tables&quot; = 
&quot;user_info,order_info&quot;, & [...]
+    <sql-case id="create_streaming_job_do_insert" value="CREATE JOB my_job ON 
STREAMING DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2" db-types="Doris" />
+    <sql-case id="create_streaming_job_with_comment" value="CREATE JOB my_job 
ON STREAMING COMMENT 'streaming job' FROM MYSQL (&quot;jdbc_url&quot; = 
&quot;jdbc:mysql://127.0.0.1:3306&quot;, &quot;user&quot; = &quot;root&quot;, 
&quot;password&quot; = &quot;123456&quot;, &quot;database&quot; = 
&quot;test&quot;, &quot;include_tables&quot; = &quot;t1&quot;, 
&quot;offset&quot; = &quot;initial&quot;) TO DATABASE target_db" 
db-types="Doris" />
+    <sql-case id="create_streaming_job_with_job_properties" value="CREATE JOB 
my_job ON STREAMING PROPERTIES (&quot;max_interval&quot; = &quot;10s&quot;) 
FROM MYSQL (&quot;jdbc_url&quot; = &quot;jdbc:mysql://127.0.0.1:3306&quot;, 
&quot;user&quot; = &quot;root&quot;, &quot;password&quot; = &quot;123456&quot;, 
&quot;database&quot; = &quot;test&quot;, &quot;include_tables&quot; = 
&quot;t1&quot;, &quot;offset&quot; = &quot;initial&quot;) TO DATABASE 
target_db" db-types="Doris" />
+    <sql-case id="create_streaming_job_with_quoted_target_db" value="CREATE 
JOB my_job ON STREAMING FROM MYSQL (&quot;jdbc_url&quot; = 
&quot;jdbc:mysql://127.0.0.1:3306&quot;, &quot;user&quot; = &quot;root&quot;, 
&quot;password&quot; = &quot;123456&quot;, &quot;database&quot; = 
&quot;test&quot;, &quot;include_tables&quot; = &quot;t1&quot;, 
&quot;offset&quot; = &quot;initial&quot;) TO DATABASE `target_db`" 
db-types="Doris" />
+</sql-cases>

Reply via email to