This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f1fcde [Improve]Schema change parses ddl sql using jsqlparser 
framework (#422)
13f1fcde is described below

commit 13f1fcdeea2b7cf89e500539b8d888accf24c909
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Wed Jul 17 15:10:15 2024 +0800

    [Improve]Schema change parses ddl sql using jsqlparser framework (#422)
---
 flink-doris-connector/pom.xml                      |   6 +
 .../doris/flink/catalog/doris/DorisSystem.java     |   3 +
 .../flink/sink/schema/SQLParserSchemaManager.java  | 218 +++++++++++++++++++++
 .../flink/sink/schema/SchemaChangeHelper.java      |   5 +-
 .../doris/flink/sink/schema/SchemaChangeMode.java  |  33 ++++
 .../serializer/JsonDebeziumSchemaSerializer.java   |  43 +++-
 .../jsondebezium/JsonDebeziumChangeContext.java    |   4 +-
 .../jsondebezium/JsonDebeziumChangeUtils.java      |  33 ++++
 .../jsondebezium/JsonDebeziumSchemaChange.java     |  69 +++++++
 .../jsondebezium/JsonDebeziumSchemaChangeImpl.java |  11 +-
 .../JsonDebeziumSchemaChangeImplV2.java            | 102 +++-------
 .../jsondebezium/SQLParserSchemaChange.java        |  93 +++++++++
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |   2 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |   7 +
 .../sink/schema/SQLParserSchemaManagerTest.java    | 206 +++++++++++++++++++
 .../jsondebezium/TestSQLParserSchemaChange.java    | 141 +++++++++++++
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |   5 +-
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |   2 +-
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |   2 +-
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |   2 +-
 20 files changed, 895 insertions(+), 92 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index b5080fb0..052180c4 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -92,6 +92,7 @@ under the License.
         <testcontainers.version>1.17.6</testcontainers.version>
         <junit.version>4.12</junit.version>
         <hamcrest.version>1.3</hamcrest.version>
+        <jsqlparser.version>4.9</jsqlparser.version>
     </properties>
 
     <dependencies>
@@ -354,6 +355,11 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.jsqlparser</groupId>
+            <artifactId>jsqlparser</artifactId>
+            <version>${jsqlparser.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index ab26e308..0d33eb9f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -285,6 +285,9 @@ public class DorisSystem implements Serializable {
     }
 
     public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
         return "`" + name + "`";
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
new file mode 100644
index 00000000..6f157cdc
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.alter.AlterExpression;
+import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType;
+import net.sf.jsqlparser.statement.alter.AlterOperation;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Use {@link net.sf.jsqlparser.parser.CCJSqlParserUtil} to parse SQL 
statements. */
+public class SQLParserSchemaManager implements Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SQLParserSchemaManager.class);
+    private static final String DEFAULT = "DEFAULT";
+    private static final String COMMENT = "COMMENT";
+
+    /**
+     * Doris' schema change only supports ADD, DROP, and RENAME operations. 
This method is only used
+     * to parse the above schema change operations.
+     */
+    public List<String> parserAlterDDLs(
+            SourceConnector sourceConnector, String ddl, String dorisTable) {
+        List<String> ddlList = new ArrayList<>();
+        try {
+            Statement statement = CCJSqlParserUtil.parse(ddl);
+            if (statement instanceof Alter) {
+                Alter alterStatement = (Alter) statement;
+                List<AlterExpression> alterExpressions = 
alterStatement.getAlterExpressions();
+                for (AlterExpression alterExpression : alterExpressions) {
+                    AlterOperation operation = alterExpression.getOperation();
+                    switch (operation) {
+                        case DROP:
+                            String dropColumnDDL =
+                                    
processDropColumnOperation(alterExpression, dorisTable);
+                            ddlList.add(dropColumnDDL);
+                            break;
+                        case ADD:
+                            List<String> addColumnDDL =
+                                    processAddColumnOperation(
+                                            sourceConnector, alterExpression, 
dorisTable);
+                            ddlList.addAll(addColumnDDL);
+                            break;
+                        case CHANGE:
+                            String changeColumnDDL =
+                                    
processChangeColumnOperation(alterExpression, dorisTable);
+                            ddlList.add(changeColumnDDL);
+                            break;
+                        case RENAME:
+                            String renameColumnDDL =
+                                    
processRenameColumnOperation(alterExpression, dorisTable);
+                            ddlList.add(renameColumnDDL);
+                            break;
+                        default:
+                            LOG.warn(
+                                    "Unsupported alter ddl operations, 
operation={}, ddl={}",
+                                    operation.name(),
+                                    ddl);
+                    }
+                }
+            } else {
+                LOG.warn("Unsupported ddl operations, ddl={}", ddl);
+            }
+        } catch (JSQLParserException e) {
+            LOG.warn("Failed to parse DDL SQL, SQL={}", ddl, e);
+        }
+        return ddlList;
+    }
+
+    private String processDropColumnOperation(AlterExpression alterExpression, 
String dorisTable) {
+        String dropColumnDDL =
+                SchemaChangeHelper.buildDropColumnDDL(dorisTable, 
alterExpression.getColumnName());
+        LOG.info("Parsed drop column DDL SQL is: {}", dropColumnDDL);
+        return dropColumnDDL;
+    }
+
+    private List<String> processAddColumnOperation(
+            SourceConnector sourceConnector, AlterExpression alterExpression, 
String dorisTable) {
+        List<ColumnDataType> colDataTypeList = 
alterExpression.getColDataTypeList();
+        List<String> addColumnList = new ArrayList<>();
+        for (ColumnDataType columnDataType : colDataTypeList) {
+            String columnName = columnDataType.getColumnName();
+            ColDataType colDataType = columnDataType.getColDataType();
+            String datatype = colDataType.getDataType();
+            Integer length = null;
+            Integer scale = null;
+            if 
(CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
+                List<String> argumentsStringList = 
colDataType.getArgumentsStringList();
+                length = Integer.parseInt(argumentsStringList.get(0));
+                if (argumentsStringList.size() == 2) {
+                    scale = Integer.parseInt(argumentsStringList.get(1));
+                }
+            }
+            datatype =
+                    JsonDebeziumChangeUtils.buildDorisTypeName(
+                            sourceConnector, datatype, length, scale);
+
+            List<String> columnSpecs = columnDataType.getColumnSpecs();
+            String defaultValue = extractDefaultValue(columnSpecs);
+            String comment = extractComment(columnSpecs);
+            FieldSchema fieldSchema = new FieldSchema(columnName, datatype, 
defaultValue, comment);
+            String addColumnDDL = 
SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema);
+            LOG.info("Parsed add column DDL SQL is: {}", addColumnDDL);
+            addColumnList.add(addColumnDDL);
+        }
+        return addColumnList;
+    }
+
+    private String processChangeColumnOperation(
+            AlterExpression alterExpression, String dorisTable) {
+        String columnNewName = 
alterExpression.getColDataTypeList().get(0).getColumnName();
+        String columnOldName = alterExpression.getColumnOldName();
+        String renameColumnDDL =
+                SchemaChangeHelper.buildRenameColumnDDL(dorisTable, 
columnOldName, columnNewName);
+        LOG.warn(
+                "Note: Only rename column names are supported in doris. "
+                        + "Therefore, the change syntax used here only 
supports the use of rename."
+                        + " Parsed change column DDL SQL is: {}",
+                renameColumnDDL);
+        return renameColumnDDL;
+    }
+
+    private String processRenameColumnOperation(
+            AlterExpression alterExpression, String dorisTable) {
+        String columnNewName = alterExpression.getColumnName();
+        String columnOldName = alterExpression.getColumnOldName();
+        String renameColumnDDL =
+                SchemaChangeHelper.buildRenameColumnDDL(dorisTable, 
columnOldName, columnNewName);
+        LOG.info("Parsed rename column DDL SQL is: {}", renameColumnDDL);
+        return renameColumnDDL;
+    }
+
+    @VisibleForTesting
+    public String extractDefaultValue(List<String> columnSpecs) {
+        return extractAdjacentString(columnSpecs, DEFAULT);
+    }
+
+    private String extractAdjacentString(List<String> columnSpecs, String key) 
{
+        int columnSpecsSize = columnSpecs.size();
+        for (int i = 0; i < columnSpecsSize; i++) {
+            String columnSpec = columnSpecs.get(i);
+            if (key.equalsIgnoreCase(columnSpec) && i < columnSpecsSize - 1) {
+                String adjacentString = columnSpecs.get(i + 1);
+                if (!(DEFAULT.equalsIgnoreCase(adjacentString))
+                        && !(COMMENT.equalsIgnoreCase(adjacentString))) {
+                    return removeQuotes(adjacentString);
+                }
+                LOG.warn(
+                        "Failed to extract adjacent string value. 
columnSpecs={}, key={}",
+                        String.join(",", columnSpecs),
+                        key);
+            }
+        }
+        return null;
+    }
+
+    @VisibleForTesting
+    public String extractComment(List<String> columnSpecs) {
+        return extractAdjacentString(columnSpecs, COMMENT);
+    }
+
+    private String removeQuotes(String content) {
+        content = removeContinuousChar(content, '\'');
+        content = removeContinuousChar(content, '\"');
+        return content;
+    }
+
+    /**
+     * remove the continuous char in the string from both sides.
+     *
+     * @param str the input string, target the char to be removed
+     * @return the string without continuous chars from both sides
+     */
+    @VisibleForTesting
+    public String removeContinuousChar(String str, char target) {
+        if (str == null || str.length() < 2) {
+            return str;
+        }
+        int start = 0;
+        int end = str.length() - 1;
+        while (start <= end && str.charAt(start) == target) {
+            start++;
+        }
+        while (end >= start && str.charAt(end) == target) {
+            end--;
+        }
+        return str.substring(start, end + 1);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 8d365ffc..06546877 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -17,10 +17,9 @@
 
 package org.apache.doris.flink.sink.schema;
 
-import org.apache.flink.util.StringUtils;
-
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.catalog.doris.DorisSystem;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 
@@ -179,7 +178,7 @@ public class SchemaChangeHelper {
     }
 
     private static void commentColumn(StringBuilder ddl, String comment) {
-        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+        if (StringUtils.isNotEmpty(comment)) {
             ddl.append(" COMMENT 
'").append(DorisSystem.quoteComment(comment)).append("'");
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java
new file mode 100644
index 00000000..e55a4d31
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+public enum SchemaChangeMode {
+    DEBEZIUM_STRUCTURE("debezium_structure"),
+    SQL_PARSER("sql_parser");
+
+    private final String name;
+
+    SchemaChangeMode(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index e657864a..c1ed1de1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -29,12 +29,14 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.schema.SchemaChangeMode;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
+import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,6 +78,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private String targetTableSuffix;
     private JsonDebeziumDataChange dataChange;
     private JsonDebeziumSchemaChange schemaChange;
+    private SchemaChangeMode schemaChangeMode;
     private final Set<String> initTableSet = new HashSet<>();
 
     public JsonDebeziumSchemaSerializer(
@@ -120,13 +123,15 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             Map<String, String> tableProperties,
             String targetDatabase,
             String targetTablePrefix,
-            String targetTableSuffix) {
+            String targetTableSuffix,
+            SchemaChangeMode schemaChangeMode) {
         this(dorisOptions, pattern, sourceTableName, newSchemaChange, 
executionOptions);
         this.tableMapping = tableMapping;
         this.tableProperties = tableProperties;
         this.targetDatabase = targetDatabase;
         this.targetTablePrefix = targetTablePrefix;
         this.targetTableSuffix = targetTableSuffix;
+        this.schemaChangeMode = schemaChangeMode;
         init();
     }
 
@@ -144,13 +149,29 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                         ignoreUpdateBefore,
                         targetTablePrefix,
                         targetTableSuffix);
-        this.schemaChange =
-                newSchemaChange
-                        ? new JsonDebeziumSchemaChangeImplV2(changeContext)
-                        : new JsonDebeziumSchemaChangeImpl(changeContext);
+        initSchemaChangeInstance(changeContext);
         this.dataChange = new JsonDebeziumDataChange(changeContext);
     }
 
+    private void initSchemaChangeInstance(JsonDebeziumChangeContext 
changeContext) {
+        if (!newSchemaChange) {
+            LOG.info(
+                    "newSchemaChange set to false, instantiation schema change 
uses JsonDebeziumSchemaChangeImpl.");
+            this.schemaChange = new 
JsonDebeziumSchemaChangeImpl(changeContext);
+            return;
+        }
+
+        if (Objects.nonNull(schemaChangeMode)
+                && SchemaChangeMode.SQL_PARSER.equals(schemaChangeMode)) {
+            LOG.info(
+                    "SchemaChangeMode set to SQL_PARSER, instantiation schema 
change uses SQLParserService.");
+            this.schemaChange = new SQLParserSchemaChange(changeContext);
+        } else {
+            LOG.info("instantiation schema change uses 
JsonDebeziumSchemaChangeImplV2.");
+            this.schemaChange = new 
JsonDebeziumSchemaChangeImplV2(changeContext);
+        }
+    }
+
     @Override
     public DorisRecord serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
@@ -201,6 +222,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         private Pattern addDropDDLPattern;
         private String sourceTableName;
         private boolean newSchemaChange = true;
+        private SchemaChangeMode schemaChangeMode;
         private DorisExecutionOptions executionOptions;
         private Map<String, String> tableMapping;
         private Map<String, String> tableProperties;
@@ -218,6 +240,14 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return this;
         }
 
+        public JsonDebeziumSchemaSerializer.Builder setSchemaChangeMode(String 
schemaChangeMode) {
+            if 
(org.apache.commons.lang3.StringUtils.isEmpty(schemaChangeMode)) {
+                return this;
+            }
+            this.schemaChangeMode = 
SchemaChangeMode.valueOf(schemaChangeMode.toUpperCase());
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern 
addDropDDLPattern) {
             this.addDropDDLPattern = addDropDDLPattern;
             return this;
@@ -273,7 +303,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                     tableProperties,
                     targetDatabase,
                     targetTablePrefix,
-                    targetTableSuffix);
+                    targetTableSuffix,
+                    schemaChangeMode);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index a7253d2f..2a3eebe0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -38,8 +38,8 @@ public class JsonDebeziumChangeContext implements 
Serializable {
     private final Pattern pattern;
     private final String lineDelimiter;
     private final boolean ignoreUpdateBefore;
-    private String targetTablePrefix;
-    private String targetTableSuffix;
+    private final String targetTablePrefix;
+    private final String targetTableSuffix;
 
     public JsonDebeziumChangeContext(
             DorisOptions dorisOptions,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
index 921607df..36acecd4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -23,10 +23,20 @@ import org.apache.flink.util.StringUtils;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+import org.apache.doris.flink.tools.cdc.oracle.OracleType;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
 
 import java.util.Map;
 
+import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL;
+import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE;
+import static org.apache.doris.flink.tools.cdc.SourceConnector.POSTGRES;
+import static org.apache.doris.flink.tools.cdc.SourceConnector.SQLSERVER;
+
 public class JsonDebeziumChangeUtils {
 
     public static String getDorisTableIdentifier(
@@ -62,4 +72,27 @@ public class JsonDebeziumChangeUtils {
                 ? record.get(key).asText()
                 : null;
     }
+
+    public static String buildDorisTypeName(
+            SourceConnector sourceConnector, String dataType, Integer length, 
Integer scale) {
+        String dorisTypeName;
+        switch (sourceConnector) {
+            case MYSQL:
+                dorisTypeName = MysqlType.toDorisType(dataType, length, scale);
+                break;
+            case ORACLE:
+                dorisTypeName = OracleType.toDorisType(dataType, length, 
scale);
+                break;
+            case POSTGRES:
+                dorisTypeName = PostgresType.toDorisType(dataType, length, 
scale);
+                break;
+            case SQLSERVER:
+                dorisTypeName = SqlServerType.toDorisType(dataType, length, 
scale);
+                break;
+            default:
+                String errMsg = sourceConnector + " not support " + dataType + 
" schema change.";
+                throw new UnsupportedOperationException(errMsg);
+        }
+        return dorisTypeName;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index ccb20469..a2164b72 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -25,11 +25,20 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 /**
@@ -43,6 +52,7 @@ import java.util.regex.Pattern;
  * be enabled by configuring use-new-schema-change.
  */
 public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JsonDebeziumSchemaChange.class);
     protected static String addDropDDLRegex =
             
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
     protected Pattern addDropDDLPattern;
@@ -55,6 +65,7 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
     protected Map<String, String> tableMapping;
     protected SchemaChangeManager schemaChangeManager;
     protected JsonDebeziumChangeContext changeContext;
+    protected SourceConnector sourceConnector;
 
     public abstract boolean schemaChange(JsonNode recordRoot);
 
@@ -89,6 +100,12 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
                 : null;
     }
 
+    /**
+     * Parse doris database and table as a tuple.
+     *
+     * @param record from flink cdc.
+     * @return Tuple(database, table)
+     */
     protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
         String identifier =
                 JsonDebeziumChangeUtils.getDorisTableIdentifier(record, 
dorisOptions, tableMapping);
@@ -120,6 +137,58 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
         return record;
     }
 
+    /** Parse event type. */
+    protected EventType extractEventType(JsonNode record) throws 
JsonProcessingException {
+        JsonNode tableChange = extractTableChange(record);
+        if (tableChange == null || tableChange.get("type") == null) {
+            return null;
+        }
+        String type = tableChange.get("type").asText();
+        if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
+            return EventType.ALTER;
+        } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
+            return EventType.CREATE;
+        }
+        LOG.warn("Not supported this event type. type={}", type);
+        return null;
+    }
+
+    protected JsonNode extractTableChange(JsonNode record) throws 
JsonProcessingException {
+        JsonNode historyRecord = extractHistoryRecord(record);
+        JsonNode tableChanges = historyRecord.get("tableChanges");
+        if (Objects.nonNull(tableChanges)) {
+            return tableChanges.get(0);
+        }
+        LOG.warn("Failed to extract tableChanges. record={}", record);
+        return null;
+    }
+
+    protected boolean executeAlterDDLs(
+            List<String> ddlSqlList,
+            JsonNode recordRoot,
+            Tuple2<String, String> dorisTableTuple,
+            boolean status)
+            throws IOException, IllegalArgumentException {
+        if (CollectionUtils.isEmpty(ddlSqlList)) {
+            LOG.info("The recordRoot cannot extract ddl sql. recordRoot={}", 
recordRoot);
+            return false;
+        }
+
+        for (String ddlSql : ddlSqlList) {
+            status = schemaChangeManager.execute(ddlSql, dorisTableTuple.f0);
+            LOG.info("schema change status:{}, ddl: {}", status, ddlSql);
+        }
+        return status;
+    }
+
+    protected void extractSourceConnector(JsonNode record) {
+        if (Objects.isNull(sourceConnector)) {
+            sourceConnector =
+                    SourceConnector.valueOf(
+                            
record.get("source").get("connector").asText().toUpperCase());
+        }
+    }
+
     public Map<String, String> getTableMapping() {
         return tableMapping;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
index 614f06a7..09f0f3a6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
@@ -35,7 +35,16 @@ import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-/** Use expression to match ddl sql. */
+/**
+ * Use expression to match ddl sql.
+ *
+ * <p>The way of parsing DDL statements relies on regular expression matching, 
and this parsing
+ * method has many flaws. In order to solve this problem, we introduced the 
com.github.jsqlparser
+ * framework, which can accurately parse the schema change of DDL.
+ *
+ * <p>This class is no longer recommended, we recommend using {@link 
SQLParserSchemaChange}
+ */
+@Deprecated
 public class JsonDebeziumSchemaChangeImpl extends JsonDebeziumSchemaChange {
     private static final Logger LOG = 
LoggerFactory.getLogger(JsonDebeziumSchemaChangeImpl.class);
     // alter table tbl add cloumn aca int
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 9b41e2fd..7ef975e2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -40,10 +40,6 @@ import 
org.apache.doris.flink.sink.schema.SchemaChangeManager;
 import org.apache.doris.flink.sink.writer.EventType;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
-import org.apache.doris.flink.tools.cdc.oracle.OracleType;
-import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
-import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +51,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -74,12 +71,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                     Pattern.CASE_INSENSITIVE);
     // schemaChange saves table names, field, and field column information
     private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new 
LinkedHashMap<>();
-    private SourceConnector sourceConnector;
     // create table properties
     private final Map<String, String> tableProperties;
-    private String targetDatabase;
-    private String targetTablePrefix;
-    private String targetTableSuffix;
+    private final String targetDatabase;
+    private final String targetTablePrefix;
+    private final String targetTableSuffix;
     private final Set<String> filledTables = new HashSet<>();
 
     public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext 
changeContext) {
@@ -124,6 +120,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
 
             EventType eventType = extractEventType(recordRoot);
             if (eventType == null) {
+                LOG.warn("Failed to parse eventType. recordRoot={}", 
recordRoot);
                 return false;
             }
             if (eventType.equals(EventType.CREATE)) {
@@ -137,43 +134,20 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                     LOG.info("create table ddl status: {}", status);
                 }
             } else if (eventType.equals(EventType.ALTER)) {
-                // db,table
-                Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
-                if (tuple == null) {
+                Tuple2<String, String> dorisTableTuple = 
getDorisTableTuple(recordRoot);
+                if (dorisTableTuple == null) {
+                    LOG.warn("Failed to get doris table tuple. record={}", 
recordRoot);
                     return false;
                 }
                 List<String> ddlSqlList = extractDDLList(recordRoot);
-                if (CollectionUtils.isEmpty(ddlSqlList)) {
-                    LOG.info("ddl can not do schema change:{}", recordRoot);
-                    return false;
-                }
-                List<DDLSchema> ddlSchemas = 
SchemaChangeHelper.getDdlSchemas();
-                for (int i = 0; i < ddlSqlList.size(); i++) {
-                    DDLSchema ddlSchema = ddlSchemas.get(i);
-                    String ddlSql = ddlSqlList.get(i);
-                    boolean doSchemaChange = checkSchemaChange(tuple.f0, 
tuple.f1, ddlSchema);
-                    status = doSchemaChange && 
schemaChangeManager.execute(ddlSql, tuple.f0);
-                    LOG.info("schema change status:{}, ddl:{}", status, 
ddlSql);
-                }
-            } else {
-                LOG.info("Unsupported event type {}", eventType);
+                status = executeAlterDDLs(ddlSqlList, recordRoot, 
dorisTableTuple, status);
             }
         } catch (Exception ex) {
-            LOG.warn("schema change error :", ex);
+            LOG.warn("schema change error : ", ex);
         }
         return status;
     }
 
-    private JsonNode extractTableChange(JsonNode record) throws 
JsonProcessingException {
-        JsonNode historyRecord = extractHistoryRecord(record);
-        JsonNode tableChanges = historyRecord.get("tableChanges");
-        if (!Objects.isNull(tableChanges)) {
-            JsonNode tableChange = tableChanges.get(0);
-            return tableChange;
-        }
-        return null;
-    }
-
     /** Parse Alter Event. */
     @VisibleForTesting
     public List<String> extractDDLList(JsonNode record) throws IOException {
@@ -181,11 +155,19 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                 JsonDebeziumChangeUtils.getDorisTableIdentifier(record, 
dorisOptions, tableMapping);
         JsonNode historyRecord = extractHistoryRecord(record);
         String ddl = extractJsonNode(historyRecord, "ddl");
+        extractSourceConnector(record);
+        return parserDebeziumStructure(dorisTable, ddl, record);
+    }
+
+    private List<String> parserDebeziumStructure(String dorisTable, String 
ddl, JsonNode record)
+            throws JsonProcessingException {
         JsonNode tableChange = extractTableChange(record);
-        EventType eventType = extractEventType(record);
-        if (Objects.isNull(tableChange)
-                || Objects.isNull(ddl)
-                || !eventType.equals(EventType.ALTER)) {
+        if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
+            LOG.warn(
+                    "tableChange or ddl is empty, cannot do schema change. 
dorisTable={}, tableChange={}, ddl={}",
+                    dorisTable,
+                    tableChange,
+                    ddl);
             return null;
         }
 
@@ -284,7 +266,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                 return tableBucketsMap.get(tableName);
             }
             // Secondly, iterate over the map to find a corresponding regular 
expression match,
-            for (Map.Entry<String, Integer> entry : 
tableBucketsMap.entrySet()) {
+            for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
 
                 Pattern pattern = Pattern.compile(entry.getKey());
                 if (pattern.matcher(tableName).matches()) {
@@ -301,7 +283,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
             return primaryKeys;
         }
         if (!fields.isEmpty()) {
-            Map.Entry<String, FieldSchema> firstField = 
fields.entrySet().iterator().next();
+            Entry<String, FieldSchema> firstField = 
fields.entrySet().iterator().next();
             return Collections.singletonList(firstField.getKey());
         }
         return new ArrayList<>();
@@ -320,21 +302,6 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         return schemaChangeManager.checkSchemaChange(database, table, param);
     }
 
-    /** Parse event type. */
-    protected EventType extractEventType(JsonNode record) throws 
JsonProcessingException {
-        JsonNode tableChange = extractTableChange(record);
-        if (tableChange == null || tableChange.get("type") == null) {
-            return null;
-        }
-        String type = tableChange.get("type").asText();
-        if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
-            return EventType.ALTER;
-        } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
-            return EventType.CREATE;
-        }
-        return null;
-    }
-
     private Map<String, Object> extractBeforeRow(JsonNode record) {
         return extractRow(record.get("before"));
     }
@@ -402,25 +369,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         int length = column.get("length") == null ? 0 : 
column.get("length").asInt();
         int scale = column.get("scale") == null ? 0 : 
column.get("scale").asInt();
         String sourceTypeName = column.get("typeName").asText();
-        String dorisTypeName;
-        switch (sourceConnector) {
-            case MYSQL:
-                dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, 
scale);
-                break;
-            case ORACLE:
-                dorisTypeName = OracleType.toDorisType(sourceTypeName, length, 
scale);
-                break;
-            case POSTGRES:
-                dorisTypeName = PostgresType.toDorisType(sourceTypeName, 
length, scale);
-                break;
-            case SQLSERVER:
-                dorisTypeName = SqlServerType.toDorisType(sourceTypeName, 
length, scale);
-                break;
-            default:
-                String errMsg = "Not support " + sourceTypeName + " schema 
change.";
-                throw new UnsupportedOperationException(errMsg);
-        }
-        return dorisTypeName;
+        return JsonDebeziumChangeUtils.buildDorisTypeName(
+                sourceConnector, sourceTypeName, length, scale);
     }
 
     private String handleDefaultValue(String defaultValue) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
new file mode 100644
index 00000000..6be3f72c
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.sink.schema.SQLParserSchemaManager;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SQLParserSchemaChange extends JsonDebeziumSchemaChange {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SQLParserSchemaChange.class);
+    private final SQLParserSchemaManager sqlParserSchemaManager;
+
+    public SQLParserSchemaChange(JsonDebeziumChangeContext changeContext) {
+        this.changeContext = changeContext;
+        this.dorisOptions = changeContext.getDorisOptions();
+        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+        this.sqlParserSchemaManager = new SQLParserSchemaManager();
+        this.tableMapping = changeContext.getTableMapping();
+        this.objectMapper = changeContext.getObjectMapper();
+    }
+
+    @Override
+    public void init(JsonNode recordRoot, String dorisTableName) {
+        // do nothing
+    }
+
+    @Override
+    public boolean schemaChange(JsonNode recordRoot) {
+        boolean status = false;
+        try {
+            if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && 
!checkTable(recordRoot)) {
+                return false;
+            }
+
+            EventType eventType = extractEventType(recordRoot);
+            if (eventType == null) {
+                LOG.warn("Failed to parse eventType. recordRoot={}", 
recordRoot);
+                return false;
+            }
+
+            if (eventType.equals(EventType.CREATE)) {
+                // TODO support auto create table
+                LOG.warn("Not auto support create table. recordRoot={}", 
recordRoot);
+            } else if (eventType.equals(EventType.ALTER)) {
+                Tuple2<String, String> dorisTableTuple = 
getDorisTableTuple(recordRoot);
+                if (dorisTableTuple == null) {
+                    LOG.warn("Failed to get doris table tuple. record={}", 
recordRoot);
+                    return false;
+                }
+                List<String> ddlList = tryParserAlterDDLs(recordRoot);
+                status = executeAlterDDLs(ddlList, recordRoot, 
dorisTableTuple, status);
+            }
+        } catch (Exception ex) {
+            LOG.warn("schema change error : ", ex);
+        }
+        return status;
+    }
+
+    @VisibleForTesting
+    public List<String> tryParserAlterDDLs(JsonNode record) throws IOException 
{
+        String dorisTable =
+                JsonDebeziumChangeUtils.getDorisTableIdentifier(record, 
dorisOptions, tableMapping);
+        JsonNode historyRecord = extractHistoryRecord(record);
+        String ddl = extractJsonNode(historyRecord, "ddl");
+        extractSourceConnector(record);
+        return sqlParserSchemaManager.parserAlterDDLs(sourceConnector, ddl, 
dorisTable);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 38b942ea..55e864ca 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -130,6 +130,7 @@ public class CdcTools {
         String excludingTables = params.get("excluding-tables");
         String multiToOneOrigin = params.get("multi-to-one-origin");
         String multiToOneTarget = params.get("multi-to-one-target");
+        String schemaChangeMode = params.get("schema-change-mode");
         boolean createTableOnly = params.has("create-table-only");
         boolean ignoreDefaultValue = params.has("ignore-default-value");
         boolean ignoreIncompatible = params.has("ignore-incompatible");
@@ -157,6 +158,7 @@ public class CdcTools {
                 .setCreateTableOnly(createTableOnly)
                 .setSingleSink(singleSink)
                 .setIgnoreIncompatible(ignoreIncompatible)
+                .setSchemaChangeMode(schemaChangeMode)
                 .create();
         databaseSync.build();
         if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 5cea70f9..9c4f2ac4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -78,6 +78,7 @@ public abstract class DatabaseSync {
     public StreamExecutionEnvironment env;
     private boolean createTableOnly = false;
     private boolean newSchemaChange = true;
+    private String schemaChangeMode;
     protected String includingTables;
     protected String excludingTables;
     protected String multiToOneOrigin;
@@ -342,6 +343,7 @@ public abstract class DatabaseSync {
         return JsonDebeziumSchemaSerializer.builder()
                 .setDorisOptions(dorisBuilder.build())
                 .setNewSchemaChange(newSchemaChange)
+                .setSchemaChangeMode(schemaChangeMode)
                 .setExecutionOptions(executionOptions)
                 .setTableMapping(tableMapping)
                 .setTableProperties(tableConfig)
@@ -560,6 +562,11 @@ public abstract class DatabaseSync {
         return this;
     }
 
+    public DatabaseSync setSchemaChangeMode(String schemaChangeMode) {
+        this.schemaChangeMode = schemaChangeMode.trim();
+        return this;
+    }
+
     public DatabaseSync setSingleSink(boolean singleSink) {
         this.singleSink = singleSink;
         return this;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
new file mode 100644
index 00000000..cbe3f08a
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class SQLParserSchemaManagerTest {
+    private SQLParserSchemaManager schemaManager;
+    private String dorisTable;
+
+    @Before
+    public void init() {
+        schemaManager = new SQLParserSchemaManager();
+        dorisTable = "doris.tab";
+    }
+
+    @Test
+    public void testParserAlterDDLs() {
+        List<String> expectDDLs = new ArrayList<>();
+        expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c1`");
+        expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c2`");
+        expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `c3` INT DEFAULT 
'100'");
+        expectDDLs.add(
+                "ALTER TABLE `doris`.`tab` ADD COLUMN `decimal_type` 
DECIMALV3(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment'");
+        expectDDLs.add(
+                "ALTER TABLE `doris`.`tab` ADD COLUMN `create_time` 
DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'");
+        expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`");
+        expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`");
+
+        SourceConnector mysql = SourceConnector.MYSQL;
+        String ddl =
+                "alter table t1 drop c1, drop column c2, add c3 int default 
100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 
'decimal_type_comment', add `create_time` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change 
column c12 c13 varchar(10)";
+        List<String> actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, 
dorisTable);
+        for (String actualDDL : actualDDLs) {
+            Assert.assertTrue(expectDDLs.contains(actualDDL));
+        }
+    }
+
+    @Test
+    public void testParserAlterDDLsAdd() {
+        List<String> expectDDLs = new ArrayList<>();
+        expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number` 
VARCHAR(60)");
+        expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address` 
VARCHAR(765)");
+
+        SourceConnector mysql = SourceConnector.ORACLE;
+        String ddl =
+                "ALTER TABLE employees ADD (phone_number VARCHAR2(20), address 
VARCHAR2(255));";
+        List<String> actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, 
dorisTable);
+        for (String actualDDL : actualDDLs) {
+            Assert.assertTrue(expectDDLs.contains(actualDDL));
+        }
+    }
+
+    @Test
+    public void testParserAlterDDLsChange() {
+        List<String> expectDDLs = new ArrayList<>();
+        expectDDLs.add(
+                "ALTER TABLE `doris`.`tab` RENAME COLUMN `old_phone_number` 
`new_phone_number`");
+        expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `old_address` 
`new_address`");
+
+        SourceConnector mysql = SourceConnector.MYSQL;
+        String ddl =
+                "ALTER TABLE employees\n"
+                        + "CHANGE COLUMN old_phone_number new_phone_number 
VARCHAR(20) NOT NULL,\n"
+                        + "CHANGE COLUMN old_address new_address VARCHAR(255) 
DEFAULT 'Unknown',\n"
+                        + "MODIFY COLUMN hire_date TIMESTAMP NOT NULL DEFAULT 
CURRENT_TIMESTAMP;";
+        List<String> actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, 
dorisTable);
+        for (String actualDDL : actualDDLs) {
+            Assert.assertTrue(expectDDLs.contains(actualDDL));
+        }
+    }
+
+    @Test
+    public void testExtractCommentValue() {
+        String expectComment = "";
+        List<String> columnSpecs = Arrays.asList("default", "'100'", 
"COMMENT", "''");
+        String actualComment = schemaManager.extractComment(columnSpecs);
+        Assert.assertEquals(expectComment, actualComment);
+    }
+
+    @Test
+    public void testExtractCommentValueQuotes() {
+        String expectComment = "comment_test";
+        List<String> columnSpecs =
+                Arrays.asList("Default", "\"100\"", "comment", 
"\"comment_test\"");
+        String actualComment = schemaManager.extractComment(columnSpecs);
+        Assert.assertEquals(expectComment, actualComment);
+    }
+
+    @Test
+    public void testExtractCommentValueNull() {
+        List<String> columnSpecs = Arrays.asList("default", null, "CommenT", 
null);
+        String actualComment = schemaManager.extractComment(columnSpecs);
+        Assert.assertNull(actualComment);
+    }
+
+    @Test
+    public void testExtractCommentValueEmpty() {
+        List<String> columnSpecs = Arrays.asList("default", null, "comment");
+        String actualComment = schemaManager.extractComment(columnSpecs);
+        Assert.assertNull(actualComment);
+    }
+
+    @Test
+    public void testExtractCommentValueA() {
+        String expectComment = "test";
+        List<String> columnSpecs = Arrays.asList("comment", "test");
+        String actualComment = schemaManager.extractComment(columnSpecs);
+        Assert.assertEquals(expectComment, actualComment);
+    }
+
+    @Test
+    public void testExtractDefaultValue() {
+        String expectDefault = "100";
+        List<String> columnSpecs = Arrays.asList("default", "'100'", 
"comment", "");
+        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        Assert.assertEquals(expectDefault, actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueQuotes() {
+        String expectDefault = "100";
+        List<String> columnSpecs = Arrays.asList("default", "\"100\"", 
"comment", "");
+        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        Assert.assertEquals(expectDefault, actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueNull() {
+        List<String> columnSpecs = Arrays.asList("Default", null, "comment", 
null);
+        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        Assert.assertNull(actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueEmpty() {
+        String expectDefault = null;
+        List<String> columnSpecs = Arrays.asList("DEFAULT", "comment", null);
+        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        Assert.assertEquals(expectDefault, actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueA() {
+        String expectDefault = "aaa";
+        List<String> columnSpecs = Arrays.asList("default", "aaa");
+        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        Assert.assertEquals(expectDefault, actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueNULL() {
+        List<String> columnSpecs = Collections.singletonList("default");
+        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        Assert.assertNull(actualDefault);
+    }
+
+    @Test
+    public void testRemoveContinuousChar() {
+        // Test removing continuous target characters from both ends
+        Assert.assertEquals("bc", 
schemaManager.removeContinuousChar("aaaabcaaa", 'a'));
+        Assert.assertEquals("bcde", 
schemaManager.removeContinuousChar("abcdea", 'a'));
+
+        // Test cases with no target character
+        Assert.assertEquals("abc", schemaManager.removeContinuousChar("abc", 
'x'));
+
+        // Test cases with only target characters
+        Assert.assertEquals("", schemaManager.removeContinuousChar("aaaa", 
'a'));
+        Assert.assertEquals("", schemaManager.removeContinuousChar("xxxxxxxx", 
'x'));
+
+        // Test empty and null strings
+        Assert.assertNull(schemaManager.removeContinuousChar(null, 'a'));
+        Assert.assertEquals("", schemaManager.removeContinuousChar("", 'a'));
+
+        // Test single character strings
+        Assert.assertEquals("b", schemaManager.removeContinuousChar("b", 'a'));
+
+        // Test removing quotes
+        Assert.assertEquals("abc", 
schemaManager.removeContinuousChar("\"abc\"", '\"'));
+        Assert.assertEquals("a\"bc\"d", 
schemaManager.removeContinuousChar("\"a\"bc\"d\"", '\"'));
+        Assert.assertEquals("abc", schemaManager.removeContinuousChar("'abc'", 
'\''));
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
new file mode 100644
index 00000000..d31ab04a
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase {
+
+    private SQLParserSchemaChange schemaChange;
+
+    @Before
+    public void setUp() {
+        super.setUp();
+        JsonDebeziumChangeContext changeContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        null,
+                        null,
+                        null,
+                        objectMapper,
+                        null,
+                        lineDelimiter,
+                        ignoreUpdateBefore,
+                        "",
+                        "");
+        schemaChange = new SQLParserSchemaChange(changeContext);
+    }
+
+    @Test
+    public void testExtractDDLListMultipleColumns() throws IOException {
+        String sql0 = "ALTER TABLE `test`.`t1` DROP COLUMN `c11`";
+        String sql1 = "ALTER TABLE `test`.`t1` DROP COLUMN `c3`";
+        String sql2 = "ALTER TABLE `test`.`t1` ADD COLUMN `c12` INT DEFAULT 
'100'";
+        List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2);
+
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+        for (int i = 0; i < ddlSQLList.size(); i++) {
+            String srcSQL = srcSqlList.get(i);
+            String targetSQL = ddlSQLList.get(i);
+            Assert.assertEquals(srcSQL, targetSQL);
+        }
+    }
+
+    @Test
+    public void testExtractDDLListChangeColumn() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+
+        String result = "ALTER TABLE `test`.`t1` RENAME COLUMN `c555` `c777`";
+        Assert.assertEquals(result, ddlSQLList.get(0));
+    }
+
+    @Test
+    public void testExtractDDLListRenameColumn() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+        Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c22` 
`c33`", ddlSQLList.get(0));
+    }
+
+    @Test
+    public void testExtractDDlListChangeName() throws IOException {
+        String columnInfo =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710925209991,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000288\",\"pos\":81654,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81654,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\
 [...]
+        JsonNode record = objectMapper.readTree(columnInfo);
+        List<String> changeNameList = schemaChange.tryParserAlterDDLs(record);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`t1` RENAME COLUMN `age` `age1`", 
changeNameList.get(0));
+    }
+
+    @Test
+    public void testExtractDDlListChangeNameWithColumn() throws IOException {
+        String columnInfo =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1711088321412,\"snapshot\":\"false\",\"db\":\"doris_test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000292\",\"pos\":55695,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55695,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":nu
 [...]
+        JsonNode record = objectMapper.readTree(columnInfo);
+        List<String> changeNameList = schemaChange.tryParserAlterDDLs(record);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`t1` RENAME COLUMN `key` `key_word`", 
changeNameList.get(0));
+    }
+
+    @Test
+    public void testAddDatetimeColumn() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720596740767,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":10192,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10192,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
 [...]
+        JsonNode recordJsonNode = objectMapper.readTree(record);
+        List<String> changeNameList = 
schemaChange.tryParserAlterDDLs(recordJsonNode);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`t1` ADD COLUMN `create_time` 
DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP COMMENT 'datatime_test'",
+                changeNameList.get(0));
+    }
+
+    @Test
+    public void testDropColumn() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720599133910,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":12084,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12084,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
 [...]
+        JsonNode recordJsonNode = objectMapper.readTree(record);
+        List<String> changeNameList = 
schemaChange.tryParserAlterDDLs(recordJsonNode);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`t1` DROP COLUMN `create_time`", 
changeNameList.get(0));
+    }
+
+    @Test
+    public void testChangeColumn() throws IOException {
+        String record =
+                
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720598926291,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":11804,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":11804,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
 [...]
+        JsonNode recordJsonNode = objectMapper.readTree(record);
+        List<String> changeNameList = 
schemaChange.tryParserAlterDDLs(recordJsonNode);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`t1` RENAME COLUMN `create_time2` 
`create_time`",
+                changeNameList.get(0));
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 2410ddac..07744e37 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.doris.flink.sink.schema.SchemaChangeMode;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
 
 import java.util.HashMap;
@@ -71,7 +72,8 @@ public class CdcMysqlSyncDatabaseCase {
         String multiToOneOrigin = "a_.*|b_.*";
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
-        boolean useNewSchemaChange = false;
+        boolean useNewSchemaChange = true;
+        String schemaChangeMode = 
SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();
         boolean singleSink = false;
         boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
@@ -90,6 +92,7 @@ public class CdcMysqlSyncDatabaseCase {
                 .setTableConfig(tableConfig)
                 .setCreateTableOnly(false)
                 .setNewSchemaChange(useNewSchemaChange)
+                .setSchemaChangeMode(schemaChangeMode)
                 .setSingleSink(singleSink)
                 .setIgnoreIncompatible(ignoreIncompatible)
                 .create();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index fba5866c..35a5719a 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -76,7 +76,7 @@ public class CdcOraclelSyncDatabaseCase {
         String multiToOneOrigin = "a_.*|b_.*";
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
-        boolean useNewSchemaChange = false;
+        boolean useNewSchemaChange = true;
         boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new OracleDatabaseSync();
         databaseSync
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 6c933409..4d5a56f7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -78,7 +78,7 @@ public class CdcPostgresSyncDatabaseCase {
         String multiToOneOrigin = "a_.*|b_.*";
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
-        boolean useNewSchemaChange = false;
+        boolean useNewSchemaChange = true;
         boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new PostgresDatabaseSync();
         databaseSync
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 9fec63b6..3d6e1e99 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -76,7 +76,7 @@ public class CdcSqlServerSyncDatabaseCase {
         String multiToOneOrigin = "a_.*|b_.*";
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
-        boolean useNewSchemaChange = false;
+        boolean useNewSchemaChange = true;
         boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new SqlServerDatabaseSync();
         databaseSync


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to