This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c607591ceb Support parsing Doris CREATE ROUTINE LOAD syntax (#38137)
2c607591ceb is described below

commit 2c607591cebe14246526efa25786575b96786ecd
Author: cxy <[email protected]>
AuthorDate: Sun Feb 22 18:59:57 2026 +0800

    Support parsing Doris CREATE ROUTINE LOAD syntax (#38137)
    
    * Support parsing Doris CREATE ROUTINE LOAD syntax
    
    * Support parsing Doris CREATE ROUTINE LOAD syntax
---
 .../core/database/visitor/SQLVisitorRule.java      |   2 +
 .../src/main/antlr4/imports/doris/BaseRule.g4      |   1 +
 .../src/main/antlr4/imports/doris/DMLStatement.g4  |  68 ++++
 .../src/main/antlr4/imports/doris/DorisKeyword.g4  |   4 +
 .../sql/parser/autogen/DorisStatement.g4           |   1 +
 .../statement/type/DorisDMLStatementVisitor.java   | 120 +++++++
 .../segment/dml/column/ColumnMappingSegment.java   |  53 ++++
 .../doris/dml/DorisCreateRoutineLoadStatement.java | 195 ++++++++++++
 .../asserts/statement/dml/DMLStatementAssert.java  |   2 +
 .../DorisCreateRoutineLoadStatementAssert.java     | 192 ++++++++++++
 .../doris/DorisDMLStatementAssert.java}            |  19 +-
 .../cases/parser/jaxb/RootSQLParserTestCases.java  |   4 +
 .../segment/impl/column/ExpectedColumnMapping.java |  39 +++
 .../DorisCreateRoutineLoadStatementTestCase.java   |  91 ++++++
 .../resources/case/dml/create-routine-load.xml     | 346 +++++++++++++++++++++
 .../sql/supported/dml/create-routine-load.xml      |  30 ++
 16 files changed, 1157 insertions(+), 10 deletions(-)

diff --git 
a/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
 
b/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
index cbfb46c799c..67f75be8ac2 100644
--- 
a/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
+++ 
b/parser/sql/engine/core/src/main/java/org/apache/shardingsphere/sql/parser/engine/core/database/visitor/SQLVisitorRule.java
@@ -423,6 +423,8 @@ public enum SQLVisitorRule {
     
     SHOW_ROUTINE_LOAD("ShowRoutineLoad", SQLStatementType.DAL),
     
+    CREATE_ROUTINE_LOAD("CreateRoutineLoad", SQLStatementType.DML),
+    
     SHOW_CREATE_TABLE("ShowCreateTable", SQLStatementType.DAL),
     
     SHOW_OTHER("ShowOther", SQLStatementType.DAL),
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
index dfc371092bd..7a071cd7bd1 100644
--- a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
+++ b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/BaseRule.g4
@@ -610,6 +610,7 @@ identifierKeywordsAmbiguous3Roles
     : EVENT
     | FILE
     | JOB
+    | KAFKA
     | NONE
     | PROCESS
     | PROXY
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DMLStatement.g4 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DMLStatement.g4
index cd41583d94c..8520176e295 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DMLStatement.g4
+++ 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DMLStatement.g4
@@ -195,6 +195,74 @@ loadStatement
     : loadDataStatement | loadXmlStatement
     ;
 
+createRoutineLoad
+    : CREATE ROUTINE LOAD (owner DOT_)? jobName (ON tableName)?
+      (WITH mergeType)?
+      (loadProperty (COMMA_ loadProperty)*)?
+      jobProperties?
+      FROM dataSource
+      dataSourceProperties?
+      (COMMENT string_)?
+    ;
+
+loadProperty
+    : columnSeparatorClause
+    | columnsClause
+    | precedingFilterClause
+    | whereClause
+    | partitionNames
+    | deleteOnClause
+    | orderByClause
+    ;
+
+jobName
+    : identifier
+    ;
+
+mergeType
+    : MERGE | DELETE
+    ;
+
+columnSeparatorClause
+    : COLUMNS TERMINATED BY string_
+    ;
+
+columnsClause
+    : COLUMNS LP_ columnMapping (COMMA_ columnMapping)* RP_
+    ;
+
+columnMapping
+    : columnName (EQ_ expr)?
+    ;
+
+precedingFilterClause
+    : PRECEDING FILTER expr
+    ;
+
+deleteOnClause
+    : DELETE ON expr
+    ;
+
+jobProperties
+    : PROPERTIES LP_ jobProperty (COMMA_ jobProperty)* RP_
+    ;
+
+jobProperty
+    : (identifier | SINGLE_QUOTED_TEXT | DOUBLE_QUOTED_TEXT) EQ_? literals
+    ;
+
+dataSource
+    : KAFKA
+    ;
+
+dataSourceProperties
+    : LP_ dataSourceProperty (COMMA_ dataSourceProperty)* RP_
+    ;
+
+dataSourceProperty
+    : (identifier | SINGLE_QUOTED_TEXT | DOUBLE_QUOTED_TEXT) EQ_? literals
+    ;
+
 loadDataStatement
     : LOAD DATA
       (LOW_PRIORITY | CONCURRENT)? LOCAL? 
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
index 00228a93593..85d2c6f4557 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
+++ 
b/parser/sql/engine/dialect/doris/src/main/antlr4/imports/doris/DorisKeyword.g4
@@ -1259,6 +1259,10 @@ JOB
     : J O B
     ;
 
+KAFKA
+    : K A F K A
+    ;
+
 KEY
     : K E Y
     ;
diff --git 
a/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
 
b/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
index 9a219f6f7db..f1c9eba4214 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
+++ 
b/parser/sql/engine/dialect/doris/src/main/antlr4/org/apache/shardingsphere/sql/parser/autogen/DorisStatement.g4
@@ -86,6 +86,7 @@ execute
     | install
     | kill
     | loadStatement
+    | createRoutineLoad
     | cacheIndex
     | loadIndexInfo
     | optimizeTable
diff --git 
a/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDMLStatementVisitor.java
 
b/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDMLStatementVisitor.java
index 9b7b1320c81..53ba2071911 100644
--- 
a/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDMLStatementVisitor.java
+++ 
b/parser/sql/engine/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/engine/doris/visitor/statement/type/DorisDMLStatementVisitor.java
@@ -17,15 +17,23 @@
 
 package 
org.apache.shardingsphere.sql.parser.engine.doris.visitor.statement.type;
 
+import org.antlr.v4.runtime.tree.TerminalNode;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import org.apache.shardingsphere.sql.parser.api.ASTNode;
 import 
org.apache.shardingsphere.sql.parser.api.visitor.statement.type.DMLStatementVisitor;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CallContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ColumnsClauseContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ColumnMappingContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.CreateRoutineLoadContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.DataSourcePropertyContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.DoStatementContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.HandlerStatementContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.ImportStatementContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.IdentifierContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.IndexHintContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.JobPropertyContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.LoadDataStatementContext;
+import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.LoadPropertyContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.LoadStatementContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.LoadXmlStatementContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.WindowClauseContext;
@@ -34,17 +42,28 @@ import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.WindowI
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.WindowSpecificationContext;
 import 
org.apache.shardingsphere.sql.parser.autogen.DorisStatementParser.WindowingClauseContext;
 import 
org.apache.shardingsphere.sql.parser.engine.doris.visitor.statement.DorisStatementVisitor;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dal.PartitionSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.job.JobNameSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertiesSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertySegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.column.ColumnMappingSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.column.ColumnSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.expr.ExpressionSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.expr.FunctionSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.expr.complex.CommonExpressionSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.order.OrderBySegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.predicate.WhereSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.DatabaseSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.OwnerSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.WindowItemSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.WindowSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.IndexHintSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.CallStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DoStatement;
+import org.apache.shardingsphere.sql.parser.statement.core.util.SQLUtils;
 import 
org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.dml.DorisCreateRoutineLoadStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.mysql.dml.MySQLHandlerStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.mysql.dml.MySQLImportStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.mysql.dml.MySQLLoadDataStatement;
@@ -101,6 +120,107 @@ public final class DorisDMLStatementVisitor extends 
DorisStatementVisitor implem
         return new MySQLLoadXMLStatement(getDatabaseType(), 
(SimpleTableSegment) visit(ctx.tableName()));
     }
     
+    @Override
+    public ASTNode visitCreateRoutineLoad(final CreateRoutineLoadContext ctx) {
+        DorisCreateRoutineLoadStatement result = new 
DorisCreateRoutineLoadStatement(getDatabaseType());
+        if (null != ctx.jobName()) {
+            JobNameSegment jobName = new 
JobNameSegment(ctx.jobName().start.getStartIndex(), 
ctx.jobName().stop.getStopIndex(), new 
IdentifierValue(ctx.jobName().getText()));
+            if (null != ctx.owner()) {
+                OwnerSegment owner = (OwnerSegment) visit(ctx.owner());
+                jobName.setOwner(owner);
+                result.setDatabase(new 
DatabaseSegment(ctx.owner().start.getStartIndex(), 
ctx.owner().stop.getStopIndex(), new IdentifierValue(ctx.owner().getText())));
+            }
+            result.setJobName(jobName);
+        }
+        if (null != ctx.tableName()) {
+            result.setTable((SimpleTableSegment) visit(ctx.tableName()));
+        }
+        if (null != ctx.mergeType()) {
+            result.setMergeType(ctx.mergeType().getText());
+        }
+        if (null != ctx.loadProperty()) {
+            for (int i = 0; i < ctx.loadProperty().size(); i++) {
+                LoadPropertyContext loadPropCtx = ctx.loadProperty(i);
+                if (null != loadPropCtx.columnSeparatorClause()) {
+                    
result.setColumnSeparator(SQLUtils.getExactlyValue(loadPropCtx.columnSeparatorClause().string_().getText()));
+                }
+                if (null != loadPropCtx.columnsClause()) {
+                    processColumnMappings(loadPropCtx.columnsClause(), result);
+                }
+                if (null != loadPropCtx.precedingFilterClause()) {
+                    result.setPrecedingFilter((ExpressionSegment) 
visit(loadPropCtx.precedingFilterClause().expr()));
+                }
+                if (null != loadPropCtx.whereClause()) {
+                    result.setWhere((WhereSegment) 
visit(loadPropCtx.whereClause()));
+                }
+                if (null != loadPropCtx.partitionNames()) {
+                    for (IdentifierContext each : 
loadPropCtx.partitionNames().identifier()) {
+                        PartitionSegment partitionSegment = new 
PartitionSegment(each.getStart().getStartIndex(), 
each.getStop().getStopIndex(), (IdentifierValue) visit(each));
+                        result.getPartitions().add(partitionSegment);
+                    }
+                }
+                if (null != loadPropCtx.deleteOnClause()) {
+                    result.setDeleteOn((ExpressionSegment) 
visit(loadPropCtx.deleteOnClause().expr()));
+                }
+                if (null != loadPropCtx.orderByClause()) {
+                    result.setOrderBy((OrderBySegment) 
visit(loadPropCtx.orderByClause()));
+                }
+            }
+        }
+        if (null != ctx.jobProperties()) {
+            PropertiesSegment propertiesSegment = new 
PropertiesSegment(ctx.jobProperties().start.getStartIndex(), 
ctx.jobProperties().stop.getStopIndex());
+            for (int i = 0; i < ctx.jobProperties().jobProperty().size(); i++) 
{
+                JobPropertyContext propertyCtx = 
ctx.jobProperties().jobProperty(i);
+                String key = getPropertyKey(propertyCtx.identifier(), 
propertyCtx.SINGLE_QUOTED_TEXT(), propertyCtx.DOUBLE_QUOTED_TEXT());
+                String value = 
SQLUtils.getExactlyValue(propertyCtx.literals().getText());
+                PropertySegment propertySegment = new 
PropertySegment(propertyCtx.start.getStartIndex(), 
propertyCtx.stop.getStopIndex(), key, value);
+                propertiesSegment.getProperties().add(propertySegment);
+            }
+            result.setJobProperties(propertiesSegment);
+        }
+        if (null != ctx.dataSource()) {
+            result.setDataSource(ctx.dataSource().getText());
+        }
+        if (null != ctx.dataSourceProperties() && null != 
ctx.dataSourceProperties().dataSourceProperty()) {
+            PropertiesSegment propertiesSegment = new 
PropertiesSegment(ctx.dataSourceProperties().start.getStartIndex(), 
ctx.dataSourceProperties().stop.getStopIndex());
+            for (int i = 0; i < 
ctx.dataSourceProperties().dataSourceProperty().size(); i++) {
+                DataSourcePropertyContext propertyCtx = 
ctx.dataSourceProperties().dataSourceProperty(i);
+                String key = getPropertyKey(propertyCtx.identifier(), 
propertyCtx.SINGLE_QUOTED_TEXT(), propertyCtx.DOUBLE_QUOTED_TEXT());
+                String value = 
SQLUtils.getExactlyValue(propertyCtx.literals().getText());
+                PropertySegment propertySegment = new 
PropertySegment(propertyCtx.start.getStartIndex(), 
propertyCtx.stop.getStopIndex(), key, value);
+                propertiesSegment.getProperties().add(propertySegment);
+            }
+            result.setDataSourceProperties(propertiesSegment);
+        }
+        if (null != ctx.string_()) {
+            
result.setComment(SQLUtils.getExactlyValue(ctx.string_().getText()));
+        }
+        result.addParameterMarkers(getParameterMarkerSegments());
+        return result;
+    }
+    
+    private void processColumnMappings(final ColumnsClauseContext 
columnsClauseCtx, final DorisCreateRoutineLoadStatement statement) {
+        for (int i = 0; i < columnsClauseCtx.columnMapping().size(); i++) {
+            ColumnMappingContext mappingCtx = 
columnsClauseCtx.columnMapping(i);
+            ColumnSegment column = (ColumnSegment) 
visit(mappingCtx.columnName());
+            ColumnMappingSegment columnMapping = new 
ColumnMappingSegment(mappingCtx.start.getStartIndex(), 
mappingCtx.stop.getStopIndex(), column);
+            if (null != mappingCtx.expr()) {
+                columnMapping.setMappingExpression((ExpressionSegment) 
visit(mappingCtx.expr()));
+            }
+            statement.getColumnMappings().add(columnMapping);
+        }
+    }
+    
+    private String getPropertyKey(final IdentifierContext identifier, final 
TerminalNode singleQuotedText, final TerminalNode doubleQuotedText) {
+        if (null != singleQuotedText) {
+            return SQLUtils.getExactlyValue(singleQuotedText.getText());
+        }
+        if (null != doubleQuotedText) {
+            return SQLUtils.getExactlyValue(doubleQuotedText.getText());
+        }
+        return SQLUtils.getExactlyValue(identifier.getText());
+    }
+    
     @Override
     public ASTNode visitIndexHint(final IndexHintContext ctx) {
         Collection<String> indexNames = new LinkedList<>();
diff --git 
a/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/segment/dml/column/ColumnMappingSegment.java
 
b/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/segment/dml/column/ColumnMappingSegment.java
new file mode 100644
index 00000000000..f9a5c7f0b8c
--- /dev/null
+++ 
b/parser/sql/statement/core/src/main/java/org/apache/shardingsphere/sql/parser/statement/core/segment/dml/column/ColumnMappingSegment.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.statement.core.segment.dml.column;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.sql.parser.statement.core.segment.SQLSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.expr.ExpressionSegment;
+
+import java.util.Optional;
+
+/**
+ * Column mapping segment.
+ * Represents a column name with an optional mapping expression, e.g., "v3 = 
k1 * 100".
+ */
+@RequiredArgsConstructor
+@Getter
+@Setter
+public final class ColumnMappingSegment implements SQLSegment {
+    
+    private final int startIndex;
+    
+    private final int stopIndex;
+    
+    private final ColumnSegment column;
+    
+    private ExpressionSegment mappingExpression;
+    
+    /**
+     * Get mapping expression.
+     *
+     * @return mapping expression
+     */
+    public Optional<ExpressionSegment> getMappingExpression() {
+        return Optional.ofNullable(mappingExpression);
+    }
+}
diff --git 
a/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/dml/DorisCreateRoutineLoadStatement.java
 
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/dml/DorisCreateRoutineLoadStatement.java
new file mode 100644
index 00000000000..3b8fb7d005c
--- /dev/null
+++ 
b/parser/sql/statement/dialect/doris/src/main/java/org/apache/shardingsphere/sql/parser/statement/doris/dml/DorisCreateRoutineLoadStatement.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.statement.doris.dml;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.job.JobNameSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertiesSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dal.PartitionSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.column.ColumnMappingSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.expr.ExpressionSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.order.OrderBySegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.predicate.WhereSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.DatabaseSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DMLStatement;
+
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Create routine load statement for Doris.
+ */
+@Getter
+@Setter
+public final class DorisCreateRoutineLoadStatement extends DMLStatement {
+    
+    private DatabaseSegment database;
+    
+    private JobNameSegment jobName;
+    
+    private SimpleTableSegment table;
+    
+    private String mergeType;
+    
+    private String columnSeparator;
+    
+    private final Collection<ColumnMappingSegment> columnMappings = new 
LinkedList<>();
+    
+    private final Collection<PartitionSegment> partitions = new LinkedList<>();
+    
+    private ExpressionSegment precedingFilter;
+    
+    private WhereSegment where;
+    
+    private ExpressionSegment deleteOn;
+    
+    private OrderBySegment orderBy;
+    
+    private PropertiesSegment jobProperties;
+    
+    private String dataSource;
+    
+    private PropertiesSegment dataSourceProperties;
+    
+    private String comment;
+    
+    public DorisCreateRoutineLoadStatement(final DatabaseType databaseType) {
+        super(databaseType);
+    }
+    
+    /**
+     * Get database.
+     *
+     * @return database segment
+     */
+    public Optional<DatabaseSegment> getDatabase() {
+        return Optional.ofNullable(database);
+    }
+    
+    /**
+     * Get job name.
+     *
+     * @return job name segment
+     */
+    public Optional<JobNameSegment> getJobName() {
+        return Optional.ofNullable(jobName);
+    }
+    
+    /**
+     * Get table.
+     *
+     * @return table segment
+     */
+    public Optional<SimpleTableSegment> getTable() {
+        return Optional.ofNullable(table);
+    }
+    
+    /**
+     * Get merge type.
+     *
+     * @return merge type
+     */
+    public Optional<String> getMergeType() {
+        return Optional.ofNullable(mergeType);
+    }
+    
+    /**
+     * Get column separator.
+     *
+     * @return column separator
+     */
+    public Optional<String> getColumnSeparator() {
+        return Optional.ofNullable(columnSeparator);
+    }
+    
+    /**
+     * Get where segment.
+     *
+     * @return where segment
+     */
+    public Optional<WhereSegment> getWhere() {
+        return Optional.ofNullable(where);
+    }
+    
+    /**
+     * Get preceding filter expression.
+     *
+     * @return preceding filter expression
+     */
+    public Optional<ExpressionSegment> getPrecedingFilter() {
+        return Optional.ofNullable(precedingFilter);
+    }
+    
+    /**
+     * Get delete on expression.
+     *
+     * @return delete on expression
+     */
+    public Optional<ExpressionSegment> getDeleteOn() {
+        return Optional.ofNullable(deleteOn);
+    }
+    
+    /**
+     * Get order by segment.
+     *
+     * @return order by segment
+     */
+    public Optional<OrderBySegment> getOrderBy() {
+        return Optional.ofNullable(orderBy);
+    }
+    
+    /**
+     * Get job properties.
+     *
+     * @return job properties segment
+     */
+    public Optional<PropertiesSegment> getJobProperties() {
+        return Optional.ofNullable(jobProperties);
+    }
+    
+    /**
+     * Get data source.
+     *
+     * @return data source
+     */
+    public Optional<String> getDataSource() {
+        return Optional.ofNullable(dataSource);
+    }
+    
+    /**
+     * Get data source properties.
+     *
+     * @return data source properties segment
+     */
+    public Optional<PropertiesSegment> getDataSourceProperties() {
+        return Optional.ofNullable(dataSourceProperties);
+    }
+    
+    /**
+     * Get comment.
+     *
+     * @return comment
+     */
+    public Optional<String> getComment() {
+        return Optional.ofNullable(comment);
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/DMLStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/DMLStatementAssert.java
index 60766999bff..235b8b9df59 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/DMLStatementAssert.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/DMLStatementAssert.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DMLStatement;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.dialect.doris.DorisDMLStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.dialect.mysql.MySQLDMLStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.dialect.postgresql.PostgreSQLDMLStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.standard.StandardDMLStatementAssert;
@@ -43,5 +44,6 @@ public final class DMLStatementAssert {
         StandardDMLStatementAssert.assertIs(assertContext, actual, expected);
         MySQLDMLStatementAssert.assertIs(assertContext, actual, expected);
         PostgreSQLDMLStatementAssert.assertIs(assertContext, actual, expected);
+        DorisDMLStatementAssert.assertIs(assertContext, actual, expected);
     }
 }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/dialect/doris/DorisCreateRoutineLoadStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/dialect/doris/DorisCreateRoutineLoadStatementAssert.java
new file mode 100644
index 00000000000..6494b29dc78
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/dialect/doris/DorisCreateRoutineLoadStatementAssert.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.dialect.doris;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dal.PartitionSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.property.PropertySegment;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.dml.column.ColumnMappingSegment;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.dml.DorisCreateRoutineLoadStatement;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.SQLSegmentAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.column.ColumnAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.expression.ExpressionAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.orderby.OrderByClauseAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.owner.OwnerAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.partition.PartitionAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.table.TableAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.segment.where.WhereClauseAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.column.ExpectedColumnMapping;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.PropertyTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.dialect.doris.DorisCreateRoutineLoadStatementTestCase;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+
+/**
+ * Create routine load statement assert for Doris.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DorisCreateRoutineLoadStatementAssert {
+    
+    /**
+     * Assert create routine load statement is correct with expected parser 
result.
+     *
+     * @param assertContext assert context
+     * @param actual actual create routine load statement
+     * @param expected expected create routine load statement test case
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, 
final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        assertJobName(assertContext, actual, expected);
+        assertTable(assertContext, actual, expected);
+        assertMergeType(assertContext, actual, expected);
+        assertColumnSeparator(assertContext, actual, expected);
+        assertColumnMappings(assertContext, actual, expected);
+        assertPartitions(assertContext, actual, expected);
+        assertPrecedingFilter(assertContext, actual, expected);
+        assertWhere(assertContext, actual, expected);
+        assertDeleteOn(assertContext, actual, expected);
+        assertOrderBy(assertContext, actual, expected);
+        assertJobProperties(assertContext, actual, expected);
+        assertDataSource(assertContext, actual, expected);
+        assertDataSourceProperties(assertContext, actual, expected);
+        assertComment(assertContext, actual, expected);
+    }
+    
+    private static void assertJobName(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (actual.getJobName().isPresent()) {
+            MatcherAssert.assertThat(assertContext.getText("Job name does not 
match: "), actual.getJobName().get().getIdentifier().getValue(), 
CoreMatchers.is(expected.getJobName()));
+            if (null != expected.getOwner()) {
+                OwnerAssert.assertIs(assertContext, 
actual.getJobName().get().getOwner().orElse(null), expected.getOwner());
+            }
+        }
+    }
+    
+    private static void assertTable(final SQLCaseAssertContext assertContext, 
final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getTable()) {
+            TableAssert.assertIs(assertContext, 
actual.getTable().orElse(null), expected.getTable());
+        }
+    }
+    
+    private static void assertMergeType(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getMergeType()) {
+            MatcherAssert.assertThat(assertContext.getText("Merge type does 
not match: "), actual.getMergeType().orElse(null), 
CoreMatchers.is(expected.getMergeType()));
+        }
+    }
+    
+    private static void assertColumnSeparator(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getColumnSeparator()) {
+            MatcherAssert.assertThat(assertContext.getText("Column separator 
does not match: "), actual.getColumnSeparator().orElse(null), 
CoreMatchers.is(expected.getColumnSeparator()));
+        }
+    }
+    
+    private static void assertColumnMappings(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (!expected.getColumnMappings().isEmpty()) {
+            MatcherAssert.assertThat(assertContext.getText("Column mappings 
size does not match: "), actual.getColumnMappings().size(), 
CoreMatchers.is(expected.getColumnMappings().size()));
+            int count = 0;
+            for (ColumnMappingSegment each : actual.getColumnMappings()) {
+                assertColumnMapping(assertContext, each, 
expected.getColumnMappings().get(count));
+                count++;
+            }
+        }
+    }
+    
+    private static void assertColumnMapping(final SQLCaseAssertContext 
assertContext, final ColumnMappingSegment actual, final ExpectedColumnMapping 
expected) {
+        ColumnAssert.assertIs(assertContext, actual.getColumn(), 
expected.getColumn());
+        if (null != expected.getMappingExpression()) {
+            ExpressionAssert.assertExpression(assertContext, 
actual.getMappingExpression().orElse(null), expected.getMappingExpression());
+        }
+        SQLSegmentAssert.assertIs(assertContext, actual, expected);
+    }
+    
+    private static void assertPartitions(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (!expected.getPartitions().isEmpty()) {
+            MatcherAssert.assertThat(assertContext.getText("Partitions size 
does not match: "), actual.getPartitions().size(), 
CoreMatchers.is(expected.getPartitions().size()));
+            int count = 0;
+            for (PartitionSegment each : actual.getPartitions()) {
+                PartitionAssert.assertIs(assertContext, each, 
expected.getPartitions().get(count));
+                count++;
+            }
+        }
+    }
+    
+    private static void assertPrecedingFilter(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getPrecedingFilter()) {
+            ExpressionAssert.assertExpression(assertContext, 
actual.getPrecedingFilter().orElse(null), expected.getPrecedingFilter());
+        }
+    }
+    
+    private static void assertWhere(final SQLCaseAssertContext assertContext, 
final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getWhereClause()) {
+            WhereClauseAssert.assertIs(assertContext, 
actual.getWhere().orElse(null), expected.getWhereClause());
+        }
+    }
+    
+    private static void assertDeleteOn(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getDeleteOn()) {
+            ExpressionAssert.assertExpression(assertContext, 
actual.getDeleteOn().orElse(null), expected.getDeleteOn());
+        }
+    }
+    
+    private static void assertOrderBy(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getOrderByClause()) {
+            OrderByClauseAssert.assertIs(assertContext, 
actual.getOrderBy().orElse(null), expected.getOrderByClause());
+        }
+    }
+    
+    private static void assertJobProperties(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (actual.getJobProperties().isPresent() && null != 
expected.getJobProperties() && !expected.getJobProperties().isEmpty()) {
+            Assertions.assertNotNull(actual.getJobProperties().get(), 
assertContext.getText("Job properties should not be null"));
+            MatcherAssert.assertThat(assertContext.getText("Job properties 
size does not match: "), actual.getJobProperties().get().getProperties().size(),
+                    CoreMatchers.is(expected.getJobProperties().size()));
+            for (int i = 0; i < expected.getJobProperties().size(); i++) {
+                assertProperty(assertContext, 
actual.getJobProperties().get().getProperties().get(i), 
expected.getJobProperties().get(i));
+            }
+        }
+    }
+    
+    private static void assertDataSource(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getDataSource()) {
+            MatcherAssert.assertThat(assertContext.getText("Data source does 
not match: "), actual.getDataSource().orElse(null), 
CoreMatchers.is(expected.getDataSource()));
+        }
+    }
+    
+    private static void assertDataSourceProperties(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (actual.getDataSourceProperties().isPresent() && null != 
expected.getDataSourceProperties() && 
!expected.getDataSourceProperties().isEmpty()) {
+            Assertions.assertNotNull(actual.getDataSourceProperties().get(), 
assertContext.getText("Data source properties should not be null"));
+            MatcherAssert.assertThat(assertContext.getText("Data source 
properties size does not match: "), 
actual.getDataSourceProperties().get().getProperties().size(),
+                    
CoreMatchers.is(expected.getDataSourceProperties().size()));
+            for (int i = 0; i < expected.getDataSourceProperties().size(); 
i++) {
+                assertProperty(assertContext, 
actual.getDataSourceProperties().get().getProperties().get(i), 
expected.getDataSourceProperties().get(i));
+            }
+        }
+    }
+    
+    private static void assertProperty(final SQLCaseAssertContext 
assertContext, final PropertySegment actual, final PropertyTestCase expected) {
+        MatcherAssert.assertThat(assertContext.getText(String.format("Property 
key '%s' assertion error: ", expected.getKey())), actual.getKey(), 
CoreMatchers.is(expected.getKey()));
+        MatcherAssert.assertThat(assertContext.getText(String.format("Property 
value for key '%s' assertion error: ", expected.getKey())), actual.getValue(), 
CoreMatchers.is(expected.getValue()));
+        SQLSegmentAssert.assertIs(assertContext, actual, expected);
+    }
+    
+    private static void assertComment(final SQLCaseAssertContext 
assertContext, final DorisCreateRoutineLoadStatement actual, final 
DorisCreateRoutineLoadStatementTestCase expected) {
+        if (null != expected.getComment()) {
+            MatcherAssert.assertThat(assertContext.getText("Comment does not 
match: "), actual.getComment().orElse(null), 
CoreMatchers.is(expected.getComment()));
+        }
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/DMLStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/dialect/doris/DorisDMLStatementAssert.java
similarity index 67%
copy from 
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/DMLStatementAssert.java
copy to 
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/dialect/doris/DorisDMLStatementAssert.java
index 60766999bff..088a0d2693a 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/DMLStatementAssert.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/dml/dialect/doris/DorisDMLStatementAssert.java
@@ -15,33 +15,32 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml;
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.dialect.doris;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DMLStatement;
+import 
org.apache.shardingsphere.sql.parser.statement.doris.dml.DorisCreateRoutineLoadStatement;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.dialect.mysql.MySQLDMLStatementAssert;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.dialect.postgresql.PostgreSQLDMLStatementAssert;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.dml.standard.StandardDMLStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.dialect.doris.DorisCreateRoutineLoadStatementTestCase;
 
 /**
- * DML statement assert.
+ * Doris DML statement assert.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class DMLStatementAssert {
+public final class DorisDMLStatementAssert {
     
     /**
-     * Assert DML statement is correct with expected parser result.
+     * Assert Doris DML statement is correct with expected parser result.
      *
      * @param assertContext assert context
      * @param actual actual DML statement
      * @param expected expected parser result
      */
     public static void assertIs(final SQLCaseAssertContext assertContext, 
final DMLStatement actual, final SQLParserTestCase expected) {
-        StandardDMLStatementAssert.assertIs(assertContext, actual, expected);
-        MySQLDMLStatementAssert.assertIs(assertContext, actual, expected);
-        PostgreSQLDMLStatementAssert.assertIs(assertContext, actual, expected);
+        if (actual instanceof DorisCreateRoutineLoadStatement) {
+            DorisCreateRoutineLoadStatementAssert.assertIs(assertContext, 
(DorisCreateRoutineLoadStatement) actual, 
(DorisCreateRoutineLoadStatementTestCase) expected);
+        }
     }
 }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 4d4fc92f6d7..3f3db1a1b42 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.DorisShowRoutineLoadStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.DorisUnsetVariableStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.standard.catalog.AlterCatalogStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.dialect.doris.DorisCreateRoutineLoadStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.tcl.HiveAbortStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.mysql.MySQLCloneStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.mysql.MySQLCreateLoadableFunctionTestCase;
@@ -619,6 +620,9 @@ public final class RootSQLParserTestCases {
     @XmlElement(name = "show-routine-load")
     private final List<DorisShowRoutineLoadStatementTestCase> 
showRoutineLoadTestCases = new LinkedList<>();
     
+    @XmlElement(name = "create-routine-load")
+    private final List<DorisCreateRoutineLoadStatementTestCase> 
createRoutineLoadTestCases = new LinkedList<>();
+    
     @XmlElement(name = "set-constraints")
     private final List<SetConstraintsStatementTestCase> 
setConstraintsTestCases = new LinkedList<>();
     
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/column/ExpectedColumnMapping.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/column/ExpectedColumnMapping.java
new file mode 100644
index 00000000000..f24f354f7ea
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/column/ExpectedColumnMapping.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.column;
+
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.AbstractExpectedDelimiterSQLSegment;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.expr.ExpectedExpression;
+
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Expected column mapping.
+ */
+@Getter
+@Setter
+public final class ExpectedColumnMapping extends 
AbstractExpectedDelimiterSQLSegment {
+    
+    @XmlElement
+    private ExpectedColumn column;
+    
+    @XmlElement(name = "mapping-expr")
+    private ExpectedExpression mappingExpression;
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/dml/dialect/doris/DorisCreateRoutineLoadStatementTestCase.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/dml/dialect/doris/DorisCreateRoutineLoadStatementTestCase.java
new file mode 100644
index 00000000000..c3f02c169d4
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/dml/dialect/doris/DorisCreateRoutineLoadStatementTestCase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.dialect.doris;
+
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.column.ExpectedColumnMapping;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.expr.ExpectedExpression;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.index.ExpectedPartition;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.orderby.ExpectedOrderByClause;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.table.ExpectedOwner;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.table.ExpectedSimpleTable;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.where.ExpectedWhereClause;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.dialect.doris.PropertyTestCase;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Create routine load statement test case for Doris.
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@Getter
+@Setter
+public final class DorisCreateRoutineLoadStatementTestCase extends 
SQLParserTestCase {
+    
+    @XmlAttribute(name = "job-name")
+    private String jobName;
+    
+    @XmlElement
+    private ExpectedOwner owner;
+    
+    @XmlElement
+    private ExpectedSimpleTable table;
+    
+    @XmlAttribute(name = "merge-type")
+    private String mergeType;
+    
+    @XmlAttribute(name = "column-separator")
+    private String columnSeparator;
+    
+    @XmlElement(name = "column-mapping")
+    private final List<ExpectedColumnMapping> columnMappings = new 
LinkedList<>();
+    
+    @XmlElement(name = "partition")
+    private final List<ExpectedPartition> partitions = new LinkedList<>();
+    
+    @XmlElement(name = "preceding-filter")
+    private ExpectedExpression precedingFilter;
+    
+    @XmlElement(name = "where")
+    private ExpectedWhereClause whereClause;
+    
+    @XmlElement(name = "delete-on")
+    private ExpectedExpression deleteOn;
+    
+    @XmlElement(name = "order-by")
+    private ExpectedOrderByClause orderByClause;
+    
+    @XmlElement(name = "job-property")
+    private final List<PropertyTestCase> jobProperties = new LinkedList<>();
+    
+    @XmlAttribute(name = "data-source")
+    private String dataSource;
+    
+    @XmlElement(name = "data-source-property")
+    private final List<PropertyTestCase> dataSourceProperties = new 
LinkedList<>();
+    
+    @XmlAttribute
+    private String comment;
+}
diff --git a/test/it/parser/src/main/resources/case/dml/create-routine-load.xml 
b/test/it/parser/src/main/resources/case/dml/create-routine-load.xml
new file mode 100644
index 00000000000..65bf3f9f689
--- /dev/null
+++ b/test/it/parser/src/main/resources/case/dml/create-routine-load.xml
@@ -0,0 +1,346 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-parser-test-cases>
+    <create-routine-load sql-case-id="create_routine_load_simple" 
job-name="test1">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <job-property key="desired_concurrent_number" value="3" 
start-index="49" stop-index="79" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="94" stop-index="129" />
+    </create-routine-load>
+    
+    <create-routine-load sql-case-id="create_routine_load_with_table" 
job-name="test1">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="40" stop-index="50" />
+        <job-property key="desired_concurrent_number" value="3" 
start-index="64" stop-index="94" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="109" stop-index="144" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="147" stop-index="172" />
+    </create-routine-load>
+    
+    <create-routine-load sql-case-id="create_routine_load_with_columns" 
job-name="test1" column-separator=",">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="40" stop-index="50" />
+        <column-mapping start-index="87" stop-index="88">
+            <column name="k1" start-index="87" stop-index="88" />
+        </column-mapping>
+        <column-mapping start-index="91" stop-index="92">
+            <column name="k2" start-index="91" stop-index="92" />
+        </column-mapping>
+        <column-mapping start-index="95" stop-index="96">
+            <column name="k3" start-index="95" stop-index="96" />
+        </column-mapping>
+        <job-property key="desired_concurrent_number" value="3" 
start-index="111" stop-index="141" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="156" stop-index="191" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="194" stop-index="219" />
+    </create-routine-load>
+    
+    <create-routine-load sql-case-id="create_routine_load_with_where" 
job-name="test1">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="40" stop-index="50" />
+        <column-mapping start-index="60" stop-index="61">
+            <column name="k1" start-index="60" stop-index="61" />
+        </column-mapping>
+        <column-mapping start-index="64" stop-index="65">
+            <column name="k2" start-index="64" stop-index="65" />
+        </column-mapping>
+        <column-mapping start-index="68" stop-index="69">
+            <column name="k3" start-index="68" stop-index="69" />
+        </column-mapping>
+        <where start-index="73" stop-index="108">
+            <expr>
+                <binary-operation-expression start-index="79" stop-index="108">
+                    <left>
+                        <binary-operation-expression start-index="79" 
stop-index="86">
+                            <left>
+                                <column name="k1" start-index="79" 
stop-index="80" />
+                            </left>
+                            <operator>&gt;</operator>
+                            <right>
+                                <literal-expression value="100" 
start-index="84" stop-index="86" />
+                            </right>
+                        </binary-operation-expression>
+                    </left>
+                    <operator>and</operator>
+                    <right>
+                        <binary-operation-expression start-index="92" 
stop-index="108">
+                            <left>
+                                <column name="k2" start-index="92" 
stop-index="93" />
+                            </left>
+                            <operator>LIKE</operator>
+                            <right>
+                                <list-expression start-index="100" 
stop-index="108">
+                                    <items>
+                                        <literal-expression value="%doris%" 
start-index="100" stop-index="108" />
+                                    </items>
+                                </list-expression>
+                            </right>
+                        </binary-operation-expression>
+                    </right>
+                </binary-operation-expression>
+            </expr>
+        </where>
+        <job-property key="max_batch_interval" value="20" start-index="122" 
stop-index="148" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="163" stop-index="198" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="201" stop-index="226" />
+    </create-routine-load>
+    
+    <create-routine-load sql-case-id="create_routine_load_with_merge" 
job-name="test1" merge-type="MERGE">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="40" stop-index="50" />
+        <column-mapping start-index="71" stop-index="72">
+            <column name="k1" start-index="71" stop-index="72" />
+        </column-mapping>
+        <column-mapping start-index="75" stop-index="76">
+            <column name="k2" start-index="75" stop-index="76" />
+        </column-mapping>
+        <column-mapping start-index="79" stop-index="80">
+            <column name="k3" start-index="79" stop-index="80" />
+        </column-mapping>
+        <column-mapping start-index="83" stop-index="84">
+            <column name="v1" start-index="83" stop-index="84" />
+        </column-mapping>
+        <column-mapping start-index="87" stop-index="88">
+            <column name="v2" start-index="87" stop-index="88" />
+        </column-mapping>
+        <column-mapping start-index="91" stop-index="92">
+            <column name="v3" start-index="91" stop-index="92" />
+        </column-mapping>
+        <where start-index="96" stop-index="131">
+            <expr>
+                <binary-operation-expression start-index="102" 
stop-index="131">
+                    <left>
+                        <binary-operation-expression start-index="102" 
stop-index="109">
+                            <left>
+                                <column name="k1" start-index="102" 
stop-index="103" />
+                            </left>
+                            <operator>&gt;</operator>
+                            <right>
+                                <literal-expression value="100" 
start-index="107" stop-index="109" />
+                            </right>
+                        </binary-operation-expression>
+                    </left>
+                    <operator>and</operator>
+                    <right>
+                        <binary-operation-expression start-index="115" 
stop-index="131">
+                            <left>
+                                <column name="k2" start-index="115" 
stop-index="116" />
+                            </left>
+                            <operator>LIKE</operator>
+                            <right>
+                                <list-expression start-index="123" 
stop-index="131">
+                                    <items>
+                                        <literal-expression value="%doris%" 
start-index="123" stop-index="131" />
+                                    </items>
+                                </list-expression>
+                            </right>
+                        </binary-operation-expression>
+                    </right>
+                </binary-operation-expression>
+            </expr>
+        </where>
+        <delete-on start-index="151" stop-index="158">
+            <binary-operation-expression start-index="144" stop-index="151">
+                <left>
+                    <column name="v3" start-index="144" stop-index="145" />
+                </left>
+                <operator>&gt;</operator>
+                <right>
+                    <literal-expression value="100" start-index="149" 
stop-index="151" />
+                </right>
+            </binary-operation-expression>
+        </delete-on>
+        <job-property key="max_batch_interval" value="20" start-index="165" 
stop-index="191" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="206" stop-index="241" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="244" stop-index="269" />
+        <data-source-property key="kafka_partitions" value="0,1,2,3" 
start-index="272" stop-index="301" />
+        <data-source-property key="kafka_offsets" value="101,0,0,200" 
start-index="304" stop-index="334" />
+    </create-routine-load>
+    
+    <create-routine-load sql-case-id="create_routine_load_with_order_by" 
job-name="test_job" column-separator=",">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="43" stop-index="53" />
+        <column-mapping start-index="90" stop-index="91">
+            <column name="k1" start-index="90" stop-index="91" />
+        </column-mapping>
+        <column-mapping start-index="93" stop-index="94">
+            <column name="k2" start-index="93" stop-index="94" />
+        </column-mapping>
+        <column-mapping start-index="96" stop-index="110">
+            <column name="source_sequence" start-index="96" stop-index="110" />
+        </column-mapping>
+        <column-mapping start-index="112" stop-index="113">
+            <column name="v1" start-index="112" stop-index="113" />
+        </column-mapping>
+        <column-mapping start-index="115" stop-index="116">
+            <column name="v2" start-index="115" stop-index="116" />
+        </column-mapping>
+        <order-by>
+            <column-item name="source_sequence" start-index="129" 
stop-index="143" />
+        </order-by>
+        <job-property key="desired_concurrent_number" value="3" 
start-index="157" stop-index="187" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="202" stop-index="237" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="240" stop-index="265" />
+    </create-routine-load>
+    
+    <create-routine-load sql-case-id="create_routine_load_with_comment" 
job-name="test1" comment="Test routine load job">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="40" stop-index="50" />
+        <job-property key="desired_concurrent_number" value="3" 
start-index="64" stop-index="94" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="109" stop-index="144" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="147" stop-index="172" />
+    </create-routine-load>
+    
+    <create-routine-load sql-case-id="create_routine_load_comprehensive" 
job-name="name" merge-type="MERGE" column-separator="," comment="test">
+        <owner name="demo_db" start-index="20" stop-index="26" />
+        <table name="demo_tbl" start-index="36" stop-index="43" />
+        <column-mapping start-index="92" stop-index="93">
+            <column name="id" start-index="92" stop-index="93" />
+        </column-mapping>
+        <column-mapping start-index="96" stop-index="99">
+            <column name="name" start-index="96" stop-index="99" />
+        </column-mapping>
+        <column-mapping start-index="102" stop-index="106">
+            <column name="score" start-index="102" stop-index="106" />
+        </column-mapping>
+        <column-mapping start-index="109" stop-index="119">
+            <column name="create_time" start-index="109" stop-index="119" />
+        </column-mapping>
+        <partition name="p202601" start-index="169" stop-index="175" />
+        <partition name="p202602" start-index="178" stop-index="184" />
+        <where start-index="123" stop-index="155">
+            <expr>
+                <binary-operation-expression start-index="129" 
stop-index="155">
+                    <left>
+                        <column name="create_time" start-index="129" 
stop-index="139" />
+                    </left>
+                    <operator>&gt;=</operator>
+                    <right>
+                        <literal-expression value="2026-01-01" 
start-index="144" stop-index="155" />
+                    </right>
+                </binary-operation-expression>
+            </expr>
+        </where>
+        <delete-on start-index="198" stop-index="208">
+            <binary-operation-expression start-index="198" stop-index="208">
+                <left>
+                    <column name="score" start-index="198" stop-index="202" />
+                </left>
+                <operator>&gt;</operator>
+                <right>
+                    <literal-expression value="100" start-index="206" 
stop-index="208" />
+                </right>
+            </binary-operation-expression>
+        </delete-on>
+        <order-by>
+            <column-item name="create_time" start-index="220" stop-index="230" 
/>
+        </order-by>
+        <job-property key="desired_concurrent_number" value="5" 
start-index="244" stop-index="276" />
+        <job-property key="max_batch_interval" value="30" start-index="279" 
stop-index="305" />
+        <job-property key="max_batch_rows" value="500000" start-index="308" 
stop-index="334" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" 
value="broker1:9092,broker2:9092,broker3:9092" start-index="349" 
stop-index="410" />
+        <data-source-property key="kafka_topic" value="demo_topic" 
start-index="413" stop-index="440" />
+        <data-source-property key="kafka_partitions" value="0,1,2,3" 
start-index="443" stop-index="472" />
+    </create-routine-load>
+    
+    <create-routine-load 
sql-case-id="create_routine_load_with_preceding_filter" job-name="test1">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="40" stop-index="50" />
+        <column-mapping start-index="60" stop-index="61">
+            <column name="k1" start-index="60" stop-index="61" />
+        </column-mapping>
+        <column-mapping start-index="64" stop-index="65">
+            <column name="k2" start-index="64" stop-index="65" />
+        </column-mapping>
+        <column-mapping start-index="68" stop-index="69">
+            <column name="k3" start-index="68" stop-index="69" />
+        </column-mapping>
+        <preceding-filter start-index="90" stop-index="95">
+            <binary-operation-expression start-index="90" stop-index="95">
+                <left>
+                    <column name="k1" start-index="90" stop-index="91" />
+                </left>
+                <operator>=</operator>
+                <right>
+                    <literal-expression value="1" start-index="95" 
stop-index="95" />
+                </right>
+            </binary-operation-expression>
+        </preceding-filter>
+        <job-property key="desired_concurrent_number" value="3" 
start-index="109" stop-index="139" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="154" stop-index="189" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="192" stop-index="217" />
+    </create-routine-load>
+
+    <create-routine-load sql-case-id="create_routine_load_with_column_mapping" 
job-name="test1" column-separator=",">
+        <owner name="example_db" start-index="20" stop-index="29" />
+        <table name="example_tbl" start-index="40" stop-index="50" />
+        <column-mapping start-index="87" stop-index="88">
+            <column name="k1" start-index="87" stop-index="88" />
+        </column-mapping>
+        <column-mapping start-index="91" stop-index="92">
+            <column name="k2" start-index="91" stop-index="92" />
+        </column-mapping>
+        <column-mapping start-index="95" stop-index="96">
+            <column name="k3" start-index="95" stop-index="96" />
+        </column-mapping>
+        <column-mapping start-index="99" stop-index="100">
+            <column name="v1" start-index="99" stop-index="100" />
+        </column-mapping>
+        <column-mapping start-index="103" stop-index="104">
+            <column name="v2" start-index="103" stop-index="104" />
+        </column-mapping>
+        <column-mapping start-index="107" stop-index="119">
+            <column name="v3" start-index="107" stop-index="108" />
+            <mapping-expr>
+                <binary-operation-expression start-index="112" 
stop-index="119">
+                    <left>
+                        <column name="k1" start-index="112" stop-index="113" />
+                    </left>
+                    <operator>*</operator>
+                    <right>
+                        <literal-expression value="100" start-index="117" 
stop-index="119" />
+                    </right>
+                </binary-operation-expression>
+            </mapping-expr>
+        </column-mapping>
+        <column-mapping start-index="122" stop-index="153">
+            <column name="dt" start-index="122" stop-index="123" />
+            <mapping-expr>
+                <function start-index="127" stop-index="153" 
function-name="from_unixtime" text="from_unixtime(ts, '%Y%m%d')">
+                    <parameter>
+                        <column name="ts" start-index="141" stop-index="142" />
+                    </parameter>
+                    <parameter>
+                        <literal-expression value="%Y%m%d" start-index="145" 
stop-index="152" />
+                    </parameter>
+                </function>
+            </mapping-expr>
+        </column-mapping>
+        <job-property key="desired_concurrent_number" value="3" 
start-index="168" stop-index="198" />
+        <data-source>KAFKA</data-source>
+        <data-source-property key="kafka_broker_list" value="broker1:9092" 
start-index="213" stop-index="248" />
+        <data-source-property key="kafka_topic" value="my_topic" 
start-index="251" stop-index="276" />
+    </create-routine-load>
+</sql-parser-test-cases>
diff --git 
a/test/it/parser/src/main/resources/sql/supported/dml/create-routine-load.xml 
b/test/it/parser/src/main/resources/sql/supported/dml/create-routine-load.xml
new file mode 100644
index 00000000000..d8c2918d295
--- /dev/null
+++ 
b/test/it/parser/src/main/resources/sql/supported/dml/create-routine-load.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-cases>
+    <sql-case id="create_routine_load_simple" value="CREATE ROUTINE LOAD 
example_db.test1 PROPERTIES ('desired_concurrent_number'='3') FROM KAFKA 
('kafka_broker_list' = 'broker1:9092')" db-types="Doris" />
+    <sql-case id="create_routine_load_with_table" value="CREATE ROUTINE LOAD 
example_db.test1 ON example_tbl PROPERTIES ('desired_concurrent_number'='3') 
FROM KAFKA ('kafka_broker_list' = 'broker1:9092', 'kafka_topic' = 'my_topic')" 
db-types="Doris" />
+    <sql-case id="create_routine_load_with_columns" value="CREATE ROUTINE LOAD 
example_db.test1 ON example_tbl COLUMNS TERMINATED BY ',', COLUMNS(k1, k2, k3) 
PROPERTIES ('desired_concurrent_number'='3') FROM KAFKA ('kafka_broker_list' = 
'broker1:9092', 'kafka_topic' = 'my_topic')" db-types="Doris" />
+    <sql-case id="create_routine_load_with_where" value="CREATE ROUTINE LOAD 
example_db.test1 ON example_tbl COLUMNS(k1, k2, k3), WHERE k1 &gt; 100 and k2 
LIKE '%doris%' PROPERTIES ('max_batch_interval' = '20') FROM KAFKA 
('kafka_broker_list' = 'broker1:9092', 'kafka_topic' = 'my_topic')" 
db-types="Doris" />
+    <sql-case id="create_routine_load_with_merge" value="CREATE ROUTINE LOAD 
example_db.test1 ON example_tbl WITH MERGE COLUMNS(k1, k2, k3, v1, v2, v3), 
WHERE k1 &gt; 100 and k2 LIKE '%doris%', DELETE ON v3 &gt; 100 PROPERTIES 
('max_batch_interval' = '20') FROM KAFKA ('kafka_broker_list' = 'broker1:9092', 
'kafka_topic' = 'my_topic', 'kafka_partitions' = '0,1,2,3', 'kafka_offsets' = 
'101,0,0,200')" db-types="Doris" />
+    <sql-case id="create_routine_load_with_order_by" value="CREATE ROUTINE 
LOAD example_db.test_job ON example_tbl COLUMNS TERMINATED BY ',', 
COLUMNS(k1,k2,source_sequence,v1,v2), ORDER BY source_sequence PROPERTIES 
('desired_concurrent_number'='3') FROM KAFKA ('kafka_broker_list' = 
'broker1:9092', 'kafka_topic' = 'my_topic')" db-types="Doris" />
+    <sql-case id="create_routine_load_with_comment" value="CREATE ROUTINE LOAD 
example_db.test1 ON example_tbl PROPERTIES ('desired_concurrent_number'='3') 
FROM KAFKA ('kafka_broker_list' = 'broker1:9092', 'kafka_topic' = 'my_topic') 
COMMENT 'Test routine load job'" db-types="Doris" />
+    <sql-case id="create_routine_load_comprehensive" value="CREATE ROUTINE 
LOAD demo_db.name ON demo_tbl WITH MERGE COLUMNS TERMINATED BY ',', COLUMNS 
(id, name, score, create_time), WHERE create_time &gt;= '2026-01-01', PARTITION 
(p202601, p202602), DELETE ON score &gt; 100, ORDER BY create_time PROPERTIES 
('desired_concurrent_number' = '5', 'max_batch_interval' = '30', 
'max_batch_rows' = '500000') FROM KAFKA ('kafka_broker_list' = 
'broker1:9092,broker2:9092,broker3:9092', 'kafka_topic' [...]
+    <sql-case id="create_routine_load_with_preceding_filter" value="CREATE 
ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3), PRECEDING 
FILTER k1 = 1 PROPERTIES ('desired_concurrent_number'='3') FROM KAFKA 
('kafka_broker_list' = 'broker1:9092', 'kafka_topic' = 'my_topic')" 
db-types="Doris" />
+    <sql-case id="create_routine_load_with_column_mapping" value="CREATE 
ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS TERMINATED BY ',', 
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100, dt = from_unixtime(ts, '%Y%m%d')) 
PROPERTIES ('desired_concurrent_number'='3') FROM KAFKA ('kafka_broker_list' = 
'broker1:9092', 'kafka_topic' = 'my_topic')" db-types="Doris" />
+</sql-cases>

Reply via email to