This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 415f37a  [Improve]The change basis of table schema is changed to parse 
data column field (#17)
415f37a is described below

commit 415f37a7aa2f6bf27c969c8363499edd0399cfd0
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Thu May 16 14:05:32 2024 +0800

    [Improve]The change basis of table schema is changed to parse data column 
field (#17)
---
 .../connector/jdbc/util}/DateTimeUtils.java        |   6 +-
 .../debezium/connector/jdbc/util/SchemaUtils.java  |  58 +++++
 .../doris/kafka/connector/cfg/DorisOptions.java    |  17 +-
 .../connector/cfg/DorisSinkConnectorConfig.java    |   5 +-
 .../connector/converter/RecordDescriptor.java      |  67 ++---
 .../kafka/connector/converter/RecordService.java   | 148 ++++++++++-
 .../schema/SchemaChangeManager.java                |  84 ++++--
 .../SchemaEvolutionMode.java}                      |  24 +-
 .../connector/converter/type/AbstractDateType.java |  11 +-
 .../connector/converter/type/AbstractTimeType.java |  28 +-
 .../converter/type/AbstractTimestampType.java      |  23 +-
 .../connector/converter/type/AbstractType.java     |  23 ++
 .../doris/kafka/connector/converter/type/Type.java |   3 +
 .../converter/type/connect/ConnectBooleanType.java |   8 +
 .../converter/type/connect/ConnectBytesType.java   |   7 +
 .../converter/type/connect/ConnectDateType.java    |   2 +-
 .../converter/type/connect/ConnectDecimalType.java |  13 +
 .../converter/type/connect/ConnectFloat32Type.java |   8 +
 .../converter/type/connect/ConnectFloat64Type.java |   8 +
 .../converter/type/connect/ConnectInt16Type.java   |   8 +
 .../converter/type/connect/ConnectInt32Type.java   |   8 +
 .../converter/type/connect/ConnectInt64Type.java   |   8 +
 .../converter/type/connect/ConnectInt8Type.java    |   8 +
 .../connect/ConnectMapToConnectStringType.java     |   6 +
 .../converter/type/connect/ConnectStringType.java  |  19 ++
 .../converter/type/connect/ConnectTimeType.java    |   2 +-
 .../type/connect/ConnectTimestampType.java         |   2 +-
 .../converter/type/debezium/DateType.java          |   2 +-
 .../converter/type/debezium/MicroTimeType.java     |   2 +-
 .../type/debezium/MicroTimestampType.java          |   2 +-
 .../converter/type/debezium/NanoTimeType.java      |   2 +-
 .../converter/type/debezium/NanoTimestampType.java |   2 +-
 .../converter/type/debezium/TimeType.java          |   2 +-
 .../type/debezium/VariableScaleDecimalType.java    |  12 +
 .../type/doris}/DorisType.java                     |   5 +-
 .../type/doris/DorisTypeProperties.java}           |   8 +-
 .../kafka/connector/dialect/mysql/MysqlType.java   | 213 ---------------
 .../connector/service/DorisDefaultSinkService.java |  26 +-
 .../kafka/connector/utils/ConfigCheckUtils.java    |  29 ---
 .../writer/schema/DebeziumSchemaChange.java        | 289 ---------------------
 .../writer/schema/SchemaChangeHelper.java          | 159 ------------
 .../connector/converter/TestRecordService.java     |  99 ++++++-
 .../connector/writer/TestDebeziumSchemaChange.java | 133 ----------
 .../kafka/connector/writer/TestRecordBuffer.java   |  11 +-
 44 files changed, 640 insertions(+), 960 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
 b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
similarity index 95%
rename from 
src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
rename to src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
index 09ee9d0..941254d 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
+++ b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
@@ -15,8 +15,12 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
+ * Copied from
+ * 
https://github.com/debezium/debezium-connector-jdbc/blob/main/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java
  */
-package org.apache.doris.kafka.connector.converter.utils;
+
+package io.debezium.connector.jdbc.util;
 
 import io.debezium.time.Conversions;
 import java.sql.Timestamp;
diff --git a/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java 
b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java
new file mode 100644
index 0000000..178507c
--- /dev/null
+++ b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Copied from
+ * 
https://github.com/debezium/debezium-connector-jdbc/blob/main/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java
+ */
+
+package io.debezium.connector.jdbc.util;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.connect.data.Schema;
+
+public class SchemaUtils {
+    private static final String SCHEMA_PARAMETER_COLUMN_TYPE = 
"__debezium.source.column.type";
+    private static final String SCHEMA_PARAMETER_COLUMN_LENGTH = 
"__debezium.source.column.length";
+    private static final String SCHEMA_PARAMETER_COLUMN_PRECISION =
+            "__debezium.source.column.scale";
+    private static final String SCHEMA_PARAMETER_COLUMN_NAME = 
"__debezium.source.column.name";
+
+    public static Optional<String> getSourceColumnType(Schema schema) {
+        return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_TYPE);
+    }
+
+    public static Optional<String> getSourceColumnLength(Schema schema) {
+        return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_LENGTH);
+    }
+
+    public static Optional<String> getSourceColumnPrecision(Schema schema) {
+        return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_PRECISION);
+    }
+
+    public static Optional<String> getSourceColumnName(Schema schema) {
+        return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_NAME);
+    }
+
+    public static Optional<String> getSchemaParameter(Schema schema, String 
parameterName) {
+        if (!Objects.isNull(schema.parameters())) {
+            return Optional.ofNullable(schema.parameters().get(parameterName));
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index e9f1297..4596f69 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import org.apache.doris.kafka.connector.converter.ConverterMode;
+import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -44,7 +45,6 @@ public class DorisOptions {
     private final String password;
     private final String database;
     private final Map<String, String> topicMap;
-    private final String schemaTopic;
     private final int fileSize;
     private final int recordNum;
     private long flushTime;
@@ -62,6 +62,7 @@ public class DorisOptions {
     private LoadModel loadModel;
     private DeliveryGuarantee deliveryGuarantee;
     private ConverterMode converterMode;
+    private SchemaEvolutionMode schemaEvolutionMode;
 
     public DorisOptions(Map<String, String> config) {
         this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -91,6 +92,11 @@ public class DorisOptions {
                         config.getOrDefault(
                                 DorisSinkConnectorConfig.CONVERT_MODE,
                                 
DorisSinkConnectorConfig.CONVERT_MODE_DEFAULT));
+        this.schemaEvolutionMode =
+                SchemaEvolutionMode.of(
+                        config.getOrDefault(
+                                DorisSinkConnectorConfig.SCHEMA_EVOLUTION,
+                                
DorisSinkConnectorConfig.SCHEMA_EVOLUTION_DEFAULT));
 
         this.fileSize = 
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES));
         this.recordNum =
@@ -105,7 +111,6 @@ public class DorisOptions {
             this.flushTime = 
DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN;
         }
         this.topicMap = getTopicToTableMap(config);
-        this.schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
 
         enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT;
         if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) {
@@ -281,6 +286,10 @@ public class DorisOptions {
         return this.converterMode;
     }
 
+    public SchemaEvolutionMode getSchemaEvolutionMode() {
+        return this.schemaEvolutionMode;
+    }
+
     public boolean isAutoRedirect() {
         return autoRedirect;
     }
@@ -293,10 +302,6 @@ public class DorisOptions {
         return enableDelete;
     }
 
-    public String getSchemaTopic() {
-        return schemaTopic;
-    }
-
     /**
      * parse topic to table map
      *
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 94ea08e..5c33da4 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.Map;
 import org.apache.doris.kafka.connector.DorisSinkConnector;
 import org.apache.doris.kafka.connector.converter.ConverterMode;
+import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -78,9 +79,11 @@ public class DorisSinkConnectorConfig {
     public static final String DELIVERY_GUARANTEE_DEFAULT = 
DeliveryGuarantee.AT_LEAST_ONCE.name();
     public static final String CONVERT_MODE = "converter.mode";
     public static final String CONVERT_MODE_DEFAULT = 
ConverterMode.NORMAL.getName();
-    public static final String SCHEMA_TOPIC = "schema.topic";
+
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+    public static final String SCHEMA_EVOLUTION = "schema.evolution";
+    public static final String SCHEMA_EVOLUTION_DEFAULT = 
SchemaEvolutionMode.NONE.getName();
 
     // metrics
     public static final String JMX_OPT = "jmx";
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
index 11d097f..c4f7243 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
@@ -23,7 +23,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import org.apache.doris.kafka.connector.dialect.mysql.MysqlType;
+import org.apache.doris.kafka.connector.converter.type.Type;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
@@ -119,27 +119,34 @@ public class RecordDescriptor {
     public static class FieldDescriptor {
         private final Schema schema;
         private final String name;
+        private final Map<String, Type> typeRegistry;
+        private final Type type;
+        private final String typeName;
         private final String schemaTypeName;
         private final String schemaName;
         private String comment;
         private String defaultValue;
 
-        public FieldDescriptor(
-                Schema schema, String name, String schemaTypeName, String 
schemaName) {
+        public FieldDescriptor(Schema schema, String name, Map<String, Type> 
typeRegistry) {
             this.schema = schema;
             this.name = name;
-            this.schemaTypeName = schemaTypeName;
-            this.schemaName = schemaName;
+            this.typeRegistry = typeRegistry;
+            this.schemaName = schema.name();
+            this.schemaTypeName = schema.type().name();
+            this.type =
+                    Objects.nonNull(schema.name())
+                            ? typeRegistry.get(schema.name())
+                            : typeRegistry.get(schema.type().name());
+            this.typeName = type.getTypeName(schema);
         }
 
         public FieldDescriptor(
                 Schema schema,
                 String name,
-                String schemaTypeName,
-                String schemaName,
+                Map<String, Type> typeRegistry,
                 String comment,
                 String defaultValue) {
-            this(schema, name, schemaTypeName, schemaName);
+            this(schema, name, typeRegistry);
             this.comment = comment;
             this.defaultValue = defaultValue;
         }
@@ -148,6 +155,14 @@ public class RecordDescriptor {
             return name;
         }
 
+        public Type getType() {
+            return type;
+        }
+
+        public String getTypeName() {
+            return typeName;
+        }
+
         public String getSchemaName() {
             return schemaName;
         }
@@ -172,7 +187,7 @@ public class RecordDescriptor {
     public static class Builder {
 
         private SinkRecord sinkRecord;
-        private Struct tableChange;
+        private Map<String, Type> typeRegistry;
 
         // Internal build state
         private final List<String> keyFieldNames = new ArrayList<>();
@@ -184,8 +199,8 @@ public class RecordDescriptor {
             return this;
         }
 
-        public Builder withTableChange(Struct tableChange) {
-            this.tableChange = tableChange;
+        public Builder withTypeRegistry(Map<String, Type> typeRegistry) {
+            this.typeRegistry = typeRegistry;
             return this;
         }
 
@@ -193,11 +208,7 @@ public class RecordDescriptor {
             Objects.requireNonNull(sinkRecord, "The sink record must be 
provided.");
 
             final boolean flattened = !isTombstone(sinkRecord) && 
isFlattened(sinkRecord);
-            if (Objects.nonNull(tableChange)) {
-                readTableChangeData(tableChange);
-            } else {
-                readSinkRecordNonKeyData(sinkRecord, flattened);
-            }
+            readSinkRecordNonKeyData(sinkRecord, flattened);
 
             return new RecordDescriptor(
                     sinkRecord,
@@ -208,27 +219,6 @@ public class RecordDescriptor {
                     flattened);
         }
 
-        private void readTableChangeData(final Struct tableChange) {
-            Struct tableChangeTable = tableChange.getStruct("table");
-            List<Object> tableChangeColumns = 
tableChangeTable.getArray("columns");
-            for (Object column : tableChangeColumns) {
-                Struct columnStruct = (Struct) column;
-                Schema schema = columnStruct.schema();
-                String name = columnStruct.getString("name");
-                String typeName = columnStruct.getString("typeName");
-                Integer length = columnStruct.getInt32("length");
-                Integer scale = columnStruct.getInt32("scale");
-                String dorisType = MysqlType.toDorisType(typeName, length, 
scale);
-                String comment = columnStruct.getString("comment");
-                String defaultValue = 
columnStruct.getString("defaultValueExpression");
-                nonKeyFieldNames.add(name);
-                allFields.put(
-                        name,
-                        new FieldDescriptor(
-                                schema, name, dorisType, schema.name(), 
comment, defaultValue));
-            }
-        }
-
         private boolean isFlattened(SinkRecord record) {
             return record.valueSchema().name() == null
                     || !record.valueSchema().name().contains("Envelope");
@@ -266,8 +256,7 @@ public class RecordDescriptor {
         }
 
         private void applyNonKeyField(String name, Schema schema) {
-            FieldDescriptor fieldDescriptor =
-                    new FieldDescriptor(schema, name, schema.type().name(), 
schema.name());
+            FieldDescriptor fieldDescriptor = new FieldDescriptor(schema, 
name, typeRegistry);
             nonKeyFieldNames.add(fieldDescriptor.getName());
             allFields.put(fieldDescriptor.getName(), fieldDescriptor);
         }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index 1390761..8bc2480 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -21,17 +21,32 @@ package org.apache.doris.kafka.connector.converter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.debezium.util.Strings;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.StringJoiner;
+import java.util.stream.Collectors;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager;
+import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
 import org.apache.doris.kafka.connector.converter.type.Type;
 import org.apache.doris.kafka.connector.exception.DataFormatException;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.model.ColumnDescriptor;
+import org.apache.doris.kafka.connector.model.TableDescriptor;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
 import org.apache.doris.kafka.connector.writer.LoadConstants;
 import org.apache.doris.kafka.connector.writer.RecordBuffer;
 import org.apache.kafka.connect.data.Struct;
@@ -43,10 +58,14 @@ import org.slf4j.LoggerFactory;
 
 public class RecordService {
     private static final Logger LOG = 
LoggerFactory.getLogger(RecordService.class);
+    public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
     private static final ObjectMapper MAPPER = new ObjectMapper();
     private final JsonConverter converter;
+    private DorisSystemService dorisSystemService;
+    private SchemaChangeManager schemaChangeManager;
     private DorisOptions dorisOptions;
     private RecordTypeRegister recordTypeRegister;
+    private Set<RecordDescriptor.FieldDescriptor> missingFields;
 
     public RecordService() {
         this.converter = new JsonConverter();
@@ -59,6 +78,8 @@ public class RecordService {
         this();
         this.dorisOptions = dorisOptions;
         this.recordTypeRegister = new RecordTypeRegister(dorisOptions);
+        this.dorisSystemService = new DorisSystemService(dorisOptions);
+        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
     }
 
     /**
@@ -68,10 +89,14 @@ public class RecordService {
     public String processStructRecord(SinkRecord record) {
         String processedRecord;
         if (ConverterMode.DEBEZIUM_INGESTION == 
dorisOptions.getConverterMode()) {
+            validate(record);
             RecordDescriptor recordDescriptor = buildRecordDescriptor(record);
             if (recordDescriptor.isTombstone()) {
                 return null;
             }
+            String tableName = 
dorisOptions.getTopicMapTable(recordDescriptor.getTopicName());
+            checkAndApplyTableChangesIfNeeded(tableName, recordDescriptor);
+
             List<String> nonKeyFieldNames = 
recordDescriptor.getNonKeyFieldNames();
             if (recordDescriptor.isDelete()) {
                 processedRecord =
@@ -96,6 +121,101 @@ public class RecordService {
         return processedRecord;
     }
 
+    private void validate(SinkRecord record) {
+        if (isSchemaChange(record)) {
+            LOG.warn(
+                    "Schema change records are not supported by JDBC 
connector. Adjust `topics` or `topics.regex` to exclude schema change topic.");
+            throw new DorisException(
+                    "Schema change records are not supported by JDBC 
connector. Adjust `topics` or `topics.regex` to exclude schema change topic.");
+        }
+    }
+
+    private static boolean isSchemaChange(SinkRecord record) {
+        return record.valueSchema() != null
+                && !Strings.isNullOrEmpty(record.valueSchema().name())
+                && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
+    }
+
+    private void checkAndApplyTableChangesIfNeeded(
+            String tableName, RecordDescriptor recordDescriptor) {
+        if (!hasTable(tableName)) {
+            // TODO Table does not exist, lets attempt to create it.
+        } else {
+            // Table exists, lets attempt to alter it if necessary.
+            alterTableIfNeeded(tableName, recordDescriptor);
+        }
+    }
+
+    private boolean hasTable(String tableName) {
+        return dorisSystemService.tableExists(dorisOptions.getDatabase(), 
tableName);
+    }
+
+    private void alterTableIfNeeded(String tableName, RecordDescriptor record) 
{
+        // Resolve table metadata from the database
+        final TableDescriptor table = obtainTableSchema(tableName);
+
+        missingFields = resolveMissingFields(record, table);
+        if (missingFields.isEmpty()) {
+            // There are no missing fields, simply return
+            // TODO should we check column type changes or default value 
changes?
+            return;
+        }
+
+        LOG.info(
+                "Find some miss columns in {} table, try to alter add this 
columns={}.",
+                tableName,
+                missingFields.stream()
+                        .map(RecordDescriptor.FieldDescriptor::getName)
+                        .collect(Collectors.toList()));
+        if 
(SchemaEvolutionMode.NONE.equals(dorisOptions.getSchemaEvolutionMode())) {
+            LOG.warn(
+                    "Table '{}' cannot be altered because schema evolution is 
disabled.",
+                    tableName);
+            throw new SchemaChangeException(
+                    "Cannot alter table " + table + " because schema evolution 
is disabled");
+        }
+        for (RecordDescriptor.FieldDescriptor missingField : missingFields) {
+            schemaChangeManager.addColumnDDL(tableName, missingField);
+        }
+    }
+
+    private Set<RecordDescriptor.FieldDescriptor> resolveMissingFields(
+            RecordDescriptor record, TableDescriptor table) {
+        Set<RecordDescriptor.FieldDescriptor> missingFields = new HashSet<>();
+        for (Map.Entry<String, RecordDescriptor.FieldDescriptor> entry :
+                record.getFields().entrySet()) {
+            String filedName = entry.getKey();
+            if (!table.hasColumn(filedName)) {
+                missingFields.add(entry.getValue());
+            }
+        }
+        return missingFields;
+    }
+
+    private TableDescriptor obtainTableSchema(String tableName) {
+        // TODO when the table structure is obtained from doris for first 
time, it should be
+        // obtained in the cache later.
+        Schema schema =
+                RestService.getSchema(dorisOptions, 
dorisOptions.getDatabase(), tableName, LOG);
+        List<ColumnDescriptor> columnDescriptors = new ArrayList<>();
+        schema.getProperties()
+                .forEach(
+                        column -> {
+                            ColumnDescriptor columnDescriptor =
+                                    ColumnDescriptor.builder()
+                                            .columnName(column.getName())
+                                            .typeName(column.getType())
+                                            .comment(column.getComment())
+                                            .build();
+                            columnDescriptors.add(columnDescriptor);
+                        });
+        return TableDescriptor.builder()
+                .tableName(tableName)
+                .type(schema.getKeysType())
+                .columns(columnDescriptors)
+                .build();
+    }
+
     /** process list record from kafka [{"name":"doris1"},{"name":"doris2"}] */
     public String processListRecord(SinkRecord record) {
         try {
@@ -130,19 +250,13 @@ public class RecordService {
             RecordDescriptor record, Struct source, List<String> fields, 
boolean isDelete) {
         Map<String, Object> filedMapping = new LinkedHashMap<>();
         String filedResult = null;
-        final Map<String, Type> typeRegistry = 
recordTypeRegister.getTypeRegistry();
         for (String fieldName : fields) {
             final RecordDescriptor.FieldDescriptor field = 
record.getFields().get(fieldName);
-            String fieldSchemaName = field.getSchemaName();
-            String fieldSchemaTypeName = field.getSchemaTypeName();
+            Type type = field.getType();
             Object value =
                     field.getSchema().isOptional()
                             ? source.getWithoutDefault(fieldName)
                             : source.get(fieldName);
-            Type type =
-                    Objects.nonNull(fieldSchemaName)
-                            ? typeRegistry.get(fieldSchemaName)
-                            : typeRegistry.get(fieldSchemaTypeName);
             Object convertValue = type.getValue(value);
             if (Objects.nonNull(convertValue) && !type.isNumber()) {
                 filedMapping.put(fieldName, convertValue.toString());
@@ -186,10 +300,28 @@ public class RecordService {
     private RecordDescriptor buildRecordDescriptor(SinkRecord record) {
         RecordDescriptor recordDescriptor;
         try {
-            recordDescriptor = 
RecordDescriptor.builder().withSinkRecord(record).build();
+            recordDescriptor =
+                    RecordDescriptor.builder()
+                            .withSinkRecord(record)
+                            
.withTypeRegistry(recordTypeRegister.getTypeRegistry())
+                            .build();
         } catch (Exception e) {
             throw new ConnectException("Failed to process a sink record", e);
         }
         return recordDescriptor;
     }
+
+    public void setSchemaChangeManager(SchemaChangeManager 
schemaChangeManager) {
+        this.schemaChangeManager = schemaChangeManager;
+    }
+
+    @VisibleForTesting
+    public void setDorisSystemService(DorisSystemService dorisSystemService) {
+        this.dorisSystemService = dorisSystemService;
+    }
+
+    @VisibleForTesting
+    public Set<RecordDescriptor.FieldDescriptor> getMissingFields() {
+        return missingFields;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
similarity index 69%
rename from 
src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
rename to 
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
index 1ee9c1e..376edf9 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.doris.kafka.connector.writer.schema;
+package org.apache.doris.kafka.connector.converter.schema;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
@@ -28,8 +28,8 @@ import java.util.Map;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
 import org.apache.doris.kafka.connector.exception.SchemaChangeException;
-import org.apache.doris.kafka.connector.utils.HttpGetWithEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -44,8 +44,7 @@ import org.slf4j.LoggerFactory;
 public class SchemaChangeManager implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeManager.class);
-    private static final String CHECK_SCHEMA_CHANGE_API =
-            "http://%s/api/enable_light_schema_change/%s/%s";;
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
     private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final DorisOptions dorisOptions;
@@ -54,29 +53,6 @@ public class SchemaChangeManager implements Serializable {
         this.dorisOptions = dorisOptions;
     }
 
-    public static Map<String, Object> buildRequestParam(boolean dropColumn, 
String columnName) {
-        Map<String, Object> params = new HashMap<>();
-        params.put("isDropColumn", dropColumn);
-        params.put("columnName", columnName);
-        return params;
-    }
-
-    /** check ddl can do light schema change. */
-    public boolean checkSchemaChange(String database, String table, 
Map<String, Object> params)
-            throws IllegalArgumentException, IOException {
-        if (params.isEmpty()) {
-            return false;
-        }
-        String requestUrl =
-                String.format(CHECK_SCHEMA_CHANGE_API, 
dorisOptions.getHttpUrl(), database, table);
-        HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
-        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
-        httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(params)));
-        String responseEntity = "";
-        Map<String, Object> responseMap = handleResponse(httpGet, 
responseEntity);
-        return handleSchemaChange(responseMap, responseEntity);
-    }
-
     private boolean handleSchemaChange(Map<String, Object> responseMap, String 
responseEntity) {
         String code = responseMap.getOrDefault("code", "-1").toString();
         if (code.equals("0")) {
@@ -86,6 +62,60 @@ public class SchemaChangeManager implements Serializable {
         }
     }
 
+    public void addColumnDDL(String tableName, 
RecordDescriptor.FieldDescriptor field) {
+        try {
+            String addColumnDDL = 
buildAddColumnDDL(dorisOptions.getDatabase(), tableName, field);
+            boolean status = execute(addColumnDDL, dorisOptions.getDatabase());
+            LOG.info(
+                    "Add missing column for {} table, ddl={}, status={}",
+                    tableName,
+                    addColumnDDL,
+                    status);
+        } catch (Exception e) {
+            LOG.warn("Failed to add column for {}, cause by: ", tableName, e);
+            throw new SchemaChangeException(
+                    "Failed to add column for " + tableName + ", cause by:", 
e);
+        }
+    }
+
+    public static String buildAddColumnDDL(
+            String database, String tableName, 
RecordDescriptor.FieldDescriptor field) {
+        String name = field.getName();
+        String typeName = field.getTypeName();
+        String comment = field.getComment();
+        String defaultValue = field.getDefaultValue();
+
+        String addDDL =
+                String.format(
+                        ADD_DDL,
+                        identifier(database) + "." + identifier(tableName),
+                        identifier(name),
+                        typeName);
+        if (defaultValue != null) {
+            addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue);
+        }
+        if (StringUtils.isNotEmpty(comment)) {
+            addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'";
+        }
+        return addDDL;
+    }
+
+    private static String quoteComment(String comment) {
+        return comment.replaceAll("'", "\\\\'");
+    }
+
+    private static String identifier(String name) {
+        return "`" + name + "`";
+    }
+
+    private static String quoteDefaultValue(String defaultValue) {
+        // DEFAULT current_timestamp not need quote
+        if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+            return defaultValue;
+        }
+        return "'" + defaultValue + "'";
+    }
+
     /** execute sql in doris. */
     public boolean execute(String ddl, String database)
             throws IOException, IllegalArgumentException {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
similarity index 67%
copy from 
src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
copy to 
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
index 55c82cf..d9b6a9b 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
@@ -16,19 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.doris.kafka.connector.converter.type.connect;
 
-public class ConnectInt8Type extends AbstractConnectSchemaType {
+package org.apache.doris.kafka.connector.converter.schema;
 
-    public static final ConnectInt8Type INSTANCE = new ConnectInt8Type();
+public enum SchemaEvolutionMode {
+    NONE("none"),
 
-    @Override
-    public String[] getRegistrationKeys() {
-        return new String[] {"INT8"};
+    BASIC("basic");
+
+    private final String name;
+
+    SchemaEvolutionMode(String name) {
+        this.name = name;
+    }
+
+    public static SchemaEvolutionMode of(String name) {
+        return SchemaEvolutionMode.valueOf(name.toUpperCase());
     }
 
-    @Override
-    public boolean isNumber() {
-        return true;
+    public String getName() {
+        return name;
     }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
index de32cd6..ab25931 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
@@ -18,5 +18,14 @@
  */
 package org.apache.doris.kafka.connector.converter.type;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 /** An abstract base class for all temporal date implementations of {@link 
Type}. */
-public abstract class AbstractDateType extends AbstractTemporalType {}
+public abstract class AbstractDateType extends AbstractTemporalType {
+
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.DATE;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
index 533f1e1..79e0105 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
@@ -18,5 +18,31 @@
  */
 package org.apache.doris.kafka.connector.converter.type;
 
+import java.util.Optional;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import 
org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties;
+import org.apache.kafka.connect.data.Schema;
+
 /** An abstract temporal implementation of {@link Type} for {@code TIME} based 
columns. */
-public abstract class AbstractTimeType extends AbstractTemporalType {}
+public abstract class AbstractTimeType extends AbstractTemporalType {
+
+    @Override
+    public String getTypeName(Schema schema) {
+        // NOTE:
+        // The MySQL connector does not use the __debezium.source.column.scale 
parameter to pass
+        // the time column's precision but instead uses the 
__debezium.source.column.length key
+        // which differs from all other connector implementations.
+        //
+        final int precision = getTimePrecision(schema);
+        return String.format(
+                "%s(%s)",
+                DorisType.DATETIME,
+                Math.min(precision, 
DorisTypeProperties.MAX_SUPPORTED_DATE_TIME_PRECISION));
+    }
+
+    protected int getTimePrecision(Schema schema) {
+        final String length = getSourceColumnLength(schema).orElse("0");
+        final Optional<String> scale = getSourceColumnPrecision(schema);
+        return scale.map(Integer::parseInt).orElseGet(() -> 
Integer.parseInt(length));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
index 3d50376..0b8d45d 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
@@ -18,5 +18,26 @@
  */
 package org.apache.doris.kafka.connector.converter.type;
 
+import java.util.Optional;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import 
org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties;
+import org.apache.kafka.connect.data.Schema;
+
 /** An abstract temporal implementation of {@link Type} for {@code TIMESTAMP} 
based columns. */
-public abstract class AbstractTimestampType extends AbstractTemporalType {}
+public abstract class AbstractTimestampType extends AbstractTemporalType {
+
+    @Override
+    public String getTypeName(Schema schema) {
+        final int precision = getTimePrecision(schema);
+        return String.format(
+                "%s(%s)",
+                DorisType.DATETIME,
+                Math.min(precision, 
DorisTypeProperties.MAX_SUPPORTED_DATE_TIME_PRECISION));
+    }
+
+    protected int getTimePrecision(Schema schema) {
+        final String length = getSourceColumnLength(schema).orElse("0");
+        final Optional<String> scale = getSourceColumnPrecision(schema);
+        return scale.map(Integer::parseInt).orElseGet(() -> 
Integer.parseInt(length));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
index d915a89..650e792 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
@@ -18,7 +18,11 @@
  */
 package org.apache.doris.kafka.connector.converter.type;
 
+import io.debezium.connector.jdbc.util.SchemaUtils;
+import java.util.Objects;
+import java.util.Optional;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.kafka.connect.data.Schema;
 
 /** An abstract implementation of {@link Type}, which all types should extend. 
*/
 public abstract class AbstractType implements Type {
@@ -40,4 +44,23 @@ public abstract class AbstractType implements Type {
     public String toString() {
         return getClass().getSimpleName();
     }
+
+    protected Optional<String> getSchemaParameter(Schema schema, String 
parameterName) {
+        if (!Objects.isNull(schema.parameters())) {
+            return Optional.ofNullable(schema.parameters().get(parameterName));
+        }
+        return Optional.empty();
+    }
+
+    protected Optional<String> getSourceColumnType(Schema schema) {
+        return SchemaUtils.getSourceColumnType(schema);
+    }
+
+    protected Optional<String> getSourceColumnLength(Schema schema) {
+        return SchemaUtils.getSourceColumnLength(schema);
+    }
+
+    protected Optional<String> getSourceColumnPrecision(Schema schema) {
+        return SchemaUtils.getSourceColumnPrecision(schema);
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
index c284f0e..698e838 100644
--- a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
+++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
@@ -19,6 +19,7 @@
 package org.apache.doris.kafka.connector.converter.type;
 
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.kafka.connect.data.Schema;
 
 /**
  * A type indicates the type of each column of kafka record, including various 
column types of
@@ -42,5 +43,7 @@ public interface Type {
     /** Get the actual converted value based on the column type. */
     Object getValue(Object sourceValue);
 
+    String getTypeName(Schema schema);
+
     boolean isNumber();
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
index 18c5af3..dfac2d5 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
@@ -18,6 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 public class ConnectBooleanType extends AbstractConnectSchemaType {
 
     public static final ConnectBooleanType INSTANCE = new ConnectBooleanType();
@@ -26,4 +29,9 @@ public class ConnectBooleanType extends 
AbstractConnectSchemaType {
     public String[] getRegistrationKeys() {
         return new String[] {"BOOLEAN"};
     }
+
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.BOOLEAN;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
index 6c2701c..fbd07a9 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
@@ -19,6 +19,8 @@
 package org.apache.doris.kafka.connector.converter.type.connect;
 
 import java.nio.ByteBuffer;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
 
 public class ConnectBytesType extends AbstractConnectSchemaType {
 
@@ -37,6 +39,11 @@ public class ConnectBytesType extends 
AbstractConnectSchemaType {
         return bytesToHexString(getByteArrayFromValue(sourceValue));
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.STRING;
+    }
+
     private byte[] getByteArrayFromValue(Object value) {
         byte[] byteArray = null;
         if (value instanceof ByteBuffer) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
index 4106f8d..acac4af 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
@@ -18,8 +18,8 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 import org.apache.kafka.connect.data.Date;
 import org.apache.kafka.connect.errors.ConnectException;
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
index 8625883..a0ab7c9 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
@@ -19,7 +19,9 @@
 package org.apache.doris.kafka.connector.converter.type.connect;
 
 import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
 import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +36,17 @@ public class ConnectDecimalType extends AbstractType {
         return new String[] {Decimal.LOGICAL_NAME};
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        int scale = Integer.parseInt(getSchemaParameter(schema, 
"scale").orElse("0"));
+        int precision =
+                Integer.parseInt(
+                        getSchemaParameter(schema, 
"connect.decimal.precision").orElse("0"));
+        return precision <= 38
+                ? String.format("%s(%s,%s)", DorisType.DECIMAL, precision, 
Math.max(scale, 0))
+                : DorisType.STRING;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
index 98b6936..fc75ba9 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
@@ -18,6 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 public class ConnectFloat32Type extends AbstractConnectSchemaType {
 
     public static final ConnectFloat32Type INSTANCE = new ConnectFloat32Type();
@@ -27,6 +30,11 @@ public class ConnectFloat32Type extends 
AbstractConnectSchemaType {
         return new String[] {"FLOAT32"};
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.FLOAT;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
index f050c15..3a74391 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
@@ -18,6 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 public class ConnectFloat64Type extends AbstractConnectSchemaType {
 
     public static final ConnectFloat64Type INSTANCE = new ConnectFloat64Type();
@@ -27,6 +30,11 @@ public class ConnectFloat64Type extends 
AbstractConnectSchemaType {
         return new String[] {"FLOAT64"};
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.DOUBLE;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
index 573813b..6a61c77 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
@@ -18,6 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 public class ConnectInt16Type extends AbstractConnectSchemaType {
 
     public static final ConnectInt16Type INSTANCE = new ConnectInt16Type();
@@ -27,6 +30,11 @@ public class ConnectInt16Type extends 
AbstractConnectSchemaType {
         return new String[] {"INT16"};
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.SMALLINT;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
index 50dd6c7..e11ad5f 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
@@ -18,6 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 public class ConnectInt32Type extends AbstractConnectSchemaType {
 
     public static final ConnectInt32Type INSTANCE = new ConnectInt32Type();
@@ -27,6 +30,11 @@ public class ConnectInt32Type extends 
AbstractConnectSchemaType {
         return new String[] {"INT32"};
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.INT;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
index c08abb6..a322da6 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
@@ -18,6 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 public class ConnectInt64Type extends AbstractConnectSchemaType {
 
     public static final ConnectInt64Type INSTANCE = new ConnectInt64Type();
@@ -27,6 +30,11 @@ public class ConnectInt64Type extends 
AbstractConnectSchemaType {
         return new String[] {"INT64"};
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.BIGINT;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
index 55c82cf..5c3fae6 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
@@ -18,6 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
 public class ConnectInt8Type extends AbstractConnectSchemaType {
 
     public static final ConnectInt8Type INSTANCE = new ConnectInt8Type();
@@ -27,6 +30,11 @@ public class ConnectInt8Type extends 
AbstractConnectSchemaType {
         return new String[] {"INT8"};
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.TINYINT;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
index cac2624..707dd1c 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
@@ -19,12 +19,18 @@
 package org.apache.doris.kafka.connector.converter.type.connect;
 
 import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
 
 public class ConnectMapToConnectStringType extends AbstractConnectMapType {
 
     public static final ConnectMapToConnectStringType INSTANCE =
             new ConnectMapToConnectStringType();
 
+    @Override
+    public String getTypeName(Schema schema) {
+        return ConnectStringType.INSTANCE.getTypeName(schema);
+    }
+
     @Override
     public Object getValue(Object sourceValue) {
         if (sourceValue instanceof Map) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
index 0353020..bda5478 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
@@ -18,6 +18,10 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import 
org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties;
+import org.apache.kafka.connect.data.Schema;
+
 /**
  * An implementation of {@link 
org.apache.doris.kafka.connector.converter.type.Type} that supports
  * {@code STRING} connect schema types.
@@ -26,8 +30,23 @@ public class ConnectStringType extends 
AbstractConnectSchemaType {
 
     public static final ConnectStringType INSTANCE = new ConnectStringType();
 
+    @Override
+    public String getTypeName(Schema schema) {
+        int columnLength = getColumnLength(schema);
+        if (columnLength > 0) {
+            return columnLength * 3 > DorisTypeProperties.MAX_VARCHAR_SIZE
+                    ? DorisType.STRING
+                    : String.format("%s(%s)", DorisType.VARCHAR, columnLength 
* 3);
+        }
+        return DorisType.STRING;
+    }
+
     @Override
     public String[] getRegistrationKeys() {
         return new String[] {"STRING"};
     }
+
+    private int getColumnLength(Schema schema) {
+        return Integer.parseInt(getSourceColumnLength(schema).orElse("0"));
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
index de3be44..c2e1698 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
@@ -18,12 +18,12 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Date;
 import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 import org.apache.kafka.connect.data.Time;
 import org.apache.kafka.connect.errors.ConnectException;
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
index 8af71b9..2de8c42 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
@@ -18,8 +18,8 @@
  */
 package org.apache.doris.kafka.connector.converter.type.connect;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.errors.ConnectException;
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
index 912f0a4..a5589f3 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
@@ -18,9 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.debezium;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import io.debezium.time.Date;
 import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 import org.apache.kafka.connect.errors.ConnectException;
 
 public class DateType extends AbstractDateType {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
index 36eeceb..b2a1381 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
@@ -18,9 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.debezium;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import io.debezium.time.MicroTime;
 import java.time.LocalTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 
 public class MicroTimeType extends AbstractDebeziumTimeType {
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
index b8c71a2..cb8e3c9 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
@@ -18,9 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.debezium;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import io.debezium.time.MicroTimestamp;
 import java.time.LocalDateTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 
 public class MicroTimestampType extends AbstractDebeziumTimestampType {
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
index 9519e64..abcc05e 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
@@ -18,9 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.debezium;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import io.debezium.time.NanoTime;
 import java.time.LocalTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 
 public class NanoTimeType extends AbstractDebeziumTimeType {
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
index eec06c8..a7c08d0 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
@@ -18,10 +18,10 @@
  */
 package org.apache.doris.kafka.connector.converter.type.debezium;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import io.debezium.time.MicroTimestamp;
 import io.debezium.time.NanoTimestamp;
 import java.time.LocalDateTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 
 /**
  * An implementation of {@link 
org.apache.doris.kafka.connector.converter.type.Type} for {@link
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
index 83e95d9..be1d329 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
@@ -18,9 +18,9 @@
  */
 package org.apache.doris.kafka.connector.converter.type.debezium;
 
+import io.debezium.connector.jdbc.util.DateTimeUtils;
 import io.debezium.time.Time;
 import java.time.LocalTime;
-import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
 
 public class TimeType extends AbstractDebeziumTimeType {
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
index e38fe41..1cbff47 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
@@ -22,6 +22,8 @@ import io.debezium.data.VariableScaleDecimal;
 import java.math.BigDecimal;
 import java.util.Optional;
 import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 
@@ -51,6 +53,16 @@ public class VariableScaleDecimalType extends AbstractType {
                         getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
     }
 
+    @Override
+    public String getTypeName(Schema schema) {
+        // The data passed by VariableScaleDecimal data types does not provide 
adequate information
+        // to
+        // resolve the precision and scale for the data type, so instead we're 
going to default to
+        // the
+        // maximum double-based data types for the dialect, using DOUBLE.
+        return DorisType.DOUBLE;
+    }
+
     @Override
     public boolean isNumber() {
         return true;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java
similarity index 89%
rename from 
src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
rename to 
src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java
index 89b416a..c89be76 100644
--- a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.doris.kafka.connector.dialect;
+package org.apache.doris.kafka.connector.converter.type.doris;
 
 public class DorisType {
     public static final String BOOLEAN = "BOOLEAN";
@@ -29,11 +29,8 @@ public class DorisType {
     public static final String FLOAT = "FLOAT";
     public static final String DOUBLE = "DOUBLE";
     public static final String DECIMAL = "DECIMAL";
-    public static final String DECIMAL_V3 = "DECIMALV3";
     public static final String DATE = "DATE";
-    public static final String DATE_V2 = "DATEV2";
     public static final String DATETIME = "DATETIME";
-    public static final String DATETIME_V2 = "DATETIMEV2";
     public static final String CHAR = "CHAR";
     public static final String VARCHAR = "VARCHAR";
     public static final String STRING = "STRING";
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java
similarity index 83%
rename from 
src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
rename to 
src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java
index 62e4cab..b20ae8e 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java
@@ -17,12 +17,16 @@
  * under the License.
  */
 
-package org.apache.doris.kafka.connector.dialect;
+package org.apache.doris.kafka.connector.converter.type.doris;
 
-public class DialectProperties {
+public class DorisTypeProperties {
 
     /* Max precision of datetime type of Doris. */
     public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
 
     public static final int TIMESTAMP_TYPE_MAX_PRECISION = 9;
+
+    public static final int MAX_VARCHAR_SIZE = 65533;
+
+    public static final int MAX_CHAR_SIZE = 255;
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java 
b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
deleted file mode 100644
index 663aadf..0000000
--- 
a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.dialect.mysql;
-
-import static 
org.apache.doris.kafka.connector.dialect.DialectProperties.MAX_SUPPORTED_DATE_TIME_PRECISION;
-import static 
org.apache.doris.kafka.connector.dialect.DialectProperties.TIMESTAMP_TYPE_MAX_PRECISION;
-
-import com.google.common.base.Preconditions;
-import org.apache.doris.kafka.connector.dialect.DorisType;
-
-public class MysqlType {
-
-    // MySQL driver returns width of timestamp types instead of precision.
-    // 19 characters are used for zero-precision timestamps while others
-    // require 19 + precision + 1 characters with the additional character
-    // required for the decimal separator.
-    private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
-    private static final String BIT = "BIT";
-    private static final String BOOLEAN = "BOOLEAN";
-    private static final String BOOL = "BOOL";
-    private static final String TINYINT = "TINYINT";
-    private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
-    private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED 
ZEROFILL";
-    private static final String SMALLINT = "SMALLINT";
-    private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
-    private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT 
UNSIGNED ZEROFILL";
-    private static final String MEDIUMINT = "MEDIUMINT";
-    private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
-    private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT 
UNSIGNED ZEROFILL";
-    private static final String INT = "INT";
-    private static final String INT_UNSIGNED = "INT UNSIGNED";
-    private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED 
ZEROFILL";
-    private static final String BIGINT = "BIGINT";
-    private static final String SERIAL = "SERIAL";
-    private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
-    private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED 
ZEROFILL";
-    private static final String REAL = "REAL";
-    private static final String REAL_UNSIGNED = "REAL UNSIGNED";
-    private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED 
ZEROFILL";
-    private static final String FLOAT = "FLOAT";
-    private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
-    private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED 
ZEROFILL";
-    private static final String DOUBLE = "DOUBLE";
-    private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
-    private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED 
ZEROFILL";
-    private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
-    private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION 
UNSIGNED";
-    private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
-            "DOUBLE PRECISION UNSIGNED ZEROFILL";
-    private static final String NUMERIC = "NUMERIC";
-    private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
-    private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED 
ZEROFILL";
-    private static final String FIXED = "FIXED";
-    private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
-    private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED 
ZEROFILL";
-    private static final String DECIMAL = "DECIMAL";
-    private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
-    private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED 
ZEROFILL";
-    private static final String CHAR = "CHAR";
-    private static final String VARCHAR = "VARCHAR";
-    private static final String TINYTEXT = "TINYTEXT";
-    private static final String MEDIUMTEXT = "MEDIUMTEXT";
-    private static final String TEXT = "TEXT";
-    private static final String LONGTEXT = "LONGTEXT";
-    private static final String DATE = "DATE";
-    private static final String TIME = "TIME";
-    private static final String DATETIME = "DATETIME";
-    private static final String TIMESTAMP = "TIMESTAMP";
-    private static final String YEAR = "YEAR";
-    private static final String BINARY = "BINARY";
-    private static final String VARBINARY = "VARBINARY";
-    private static final String TINYBLOB = "TINYBLOB";
-    private static final String MEDIUMBLOB = "MEDIUMBLOB";
-    private static final String BLOB = "BLOB";
-    private static final String LONGBLOB = "LONGBLOB";
-    private static final String JSON = "JSON";
-    private static final String ENUM = "ENUM";
-    private static final String SET = "SET";
-
-    public static String toDorisType(String type, Integer length, Integer 
scale) {
-        switch (type.toUpperCase()) {
-            case BIT:
-            case BOOLEAN:
-            case BOOL:
-                return DorisType.BOOLEAN;
-            case TINYINT:
-                return DorisType.TINYINT;
-            case TINYINT_UNSIGNED:
-            case TINYINT_UNSIGNED_ZEROFILL:
-            case SMALLINT:
-                return DorisType.SMALLINT;
-            case SMALLINT_UNSIGNED:
-            case SMALLINT_UNSIGNED_ZEROFILL:
-            case INT:
-            case MEDIUMINT:
-            case YEAR:
-                return DorisType.INT;
-            case INT_UNSIGNED:
-            case INT_UNSIGNED_ZEROFILL:
-            case MEDIUMINT_UNSIGNED:
-            case MEDIUMINT_UNSIGNED_ZEROFILL:
-            case BIGINT:
-                return DorisType.BIGINT;
-            case BIGINT_UNSIGNED:
-            case BIGINT_UNSIGNED_ZEROFILL:
-                return DorisType.LARGEINT;
-            case FLOAT:
-            case FLOAT_UNSIGNED:
-            case FLOAT_UNSIGNED_ZEROFILL:
-                return DorisType.FLOAT;
-            case REAL:
-            case REAL_UNSIGNED:
-            case REAL_UNSIGNED_ZEROFILL:
-            case DOUBLE:
-            case DOUBLE_UNSIGNED:
-            case DOUBLE_UNSIGNED_ZEROFILL:
-            case DOUBLE_PRECISION:
-            case DOUBLE_PRECISION_UNSIGNED:
-            case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
-                return DorisType.DOUBLE;
-            case NUMERIC:
-            case NUMERIC_UNSIGNED:
-            case NUMERIC_UNSIGNED_ZEROFILL:
-            case FIXED:
-            case FIXED_UNSIGNED:
-            case FIXED_UNSIGNED_ZEROFILL:
-            case DECIMAL:
-            case DECIMAL_UNSIGNED:
-            case DECIMAL_UNSIGNED_ZEROFILL:
-                return length != null && length <= 38
-                        ? String.format(
-                                "%s(%s,%s)",
-                                DorisType.DECIMAL_V3,
-                                length,
-                                scale != null && scale >= 0 ? scale : 0)
-                        : DorisType.STRING;
-            case DATE:
-                return DorisType.DATE_V2;
-            case DATETIME:
-            case TIMESTAMP:
-                // default precision is 0
-                // see 
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
-                if (length == null
-                        || length <= 0
-                        || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
-                    return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
-                } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
-                    // Timestamp with a fraction of seconds.
-                    // For example, 2024-01-01 01:01:01.1
-                    // The decimal point will occupy 1 character.
-                    // Thus,the length of the timestamp is 21.
-                    return String.format(
-                            "%s(%s)",
-                            DorisType.DATETIME_V2,
-                            Math.min(
-                                    length - 
ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
-                                    MAX_SUPPORTED_DATE_TIME_PRECISION));
-                } else if (length <= TIMESTAMP_TYPE_MAX_PRECISION) {
-                    // For Debezium JSON data, the timestamp/datetime length 
ranges from 0 to 9.
-                    return String.format(
-                            "%s(%s)",
-                            DorisType.DATETIME_V2,
-                            Math.min(length, 
MAX_SUPPORTED_DATE_TIME_PRECISION));
-                } else {
-                    throw new UnsupportedOperationException(
-                            "Unsupported length: "
-                                    + length
-                                    + " for MySQL TIMESTAMP/DATETIME types");
-                }
-            case CHAR:
-            case VARCHAR:
-                Preconditions.checkNotNull(length);
-                return length * 3 > 65533
-                        ? DorisType.STRING
-                        : String.format("%s(%s)", DorisType.VARCHAR, length * 
3);
-            case TINYTEXT:
-            case TEXT:
-            case MEDIUMTEXT:
-            case LONGTEXT:
-            case ENUM:
-            case TIME:
-            case TINYBLOB:
-            case BLOB:
-            case MEDIUMBLOB:
-            case LONGBLOB:
-            case BINARY:
-            case VARBINARY:
-            case SET:
-                return DorisType.STRING;
-            case JSON:
-                return DorisType.JSONB;
-            default:
-                throw new UnsupportedOperationException("Unsupported MySQL 
Type: " + type);
-        }
-    }
-}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 29810e4..1022344 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -23,7 +23,6 @@ import com.codahale.metrics.MetricRegistry;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 import org.apache.doris.kafka.connector.DorisSinkTask;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.connection.ConnectionProvider;
@@ -35,7 +34,6 @@ import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
 import org.apache.doris.kafka.connector.writer.DorisWriter;
 import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
-import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -88,23 +86,15 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         if (writer.containsKey(nameIndex)) {
             LOG.info("already start task");
         } else {
-            DorisWriter dorisWriter;
             String topic = topicPartition.topic();
             int partition = topicPartition.partition();
-            String schemaChangeTopic = dorisOptions.getSchemaTopic();
-            if (Objects.nonNull(schemaChangeTopic) && 
schemaChangeTopic.equals(topic)) {
-                dorisWriter =
-                        new DebeziumSchemaChange(
-                                topic, partition, dorisOptions, conn, 
connectMonitor);
-            } else {
-                LoadModel loadModel = dorisOptions.getLoadModel();
-                dorisWriter =
-                        LoadModel.COPY_INTO.equals(loadModel)
-                                ? new CopyIntoWriter(
-                                        topic, partition, dorisOptions, conn, 
connectMonitor)
-                                : new StreamLoadWriter(
-                                        topic, partition, dorisOptions, conn, 
connectMonitor);
-            }
+            LoadModel loadModel = dorisOptions.getLoadModel();
+            DorisWriter dorisWriter =
+                    LoadModel.COPY_INTO.equals(loadModel)
+                            ? new CopyIntoWriter(
+                                    topic, partition, dorisOptions, conn, 
connectMonitor)
+                            : new StreamLoadWriter(
+                                    topic, partition, dorisOptions, conn, 
connectMonitor);
             writer.put(nameIndex, dorisWriter);
             metricsJmxReporter.start();
         }
@@ -129,7 +119,7 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         // check all sink writer to see if they need to be flushed
         for (DorisWriter writer : writer.values()) {
             // Time based flushing
-            if (!(writer instanceof DebeziumSchemaChange) && 
writer.shouldFlush()) {
+            if (writer.shouldFlush()) {
                 writer.flushBuffer();
             }
         }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 84d3f90..2356f7d 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -19,13 +19,9 @@
 
 package org.apache.doris.kafka.connector.utils;
 
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.exception.ArgumentsException;
 import org.apache.doris.kafka.connector.exception.DorisException;
@@ -77,31 +73,6 @@ public class ConfigCheckUtils {
             configIsValid = false;
         }
 
-        String schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC);
-        if (StringUtils.isNotEmpty(schemaTopic)) {
-            schemaTopic = schemaTopic.trim();
-            if (!topics.isEmpty()) {
-                List<String> topicList =
-                        
Arrays.stream(topics.split(",")).collect(Collectors.toList());
-                if (!topicList.contains(schemaTopic)) {
-                    LOG.error(
-                            "schema.topic is not included in topics list, 
please add! "
-                                    + " schema.topic={}, topics={}",
-                            schemaTopic,
-                            topics);
-                    configIsValid = false;
-                }
-            }
-            if (!topicsRegex.isEmpty() && !topicsRegex.equals(schemaTopic)) {
-                LOG.error(
-                        "topics.regex must equals schema.topic. please check 
again!"
-                                + " topics.regex={}, schema.topic={}",
-                        topicsRegex,
-                        schemaTopic);
-                configIsValid = false;
-            }
-        }
-
         if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
                 && 
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
                         == null) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
deleted file mode 100644
index 1eeab0e..0000000
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.writer.schema;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.debezium.data.Envelope;
-import io.debezium.util.Strings;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.doris.kafka.connector.cfg.DorisOptions;
-import org.apache.doris.kafka.connector.connection.ConnectionProvider;
-import org.apache.doris.kafka.connector.converter.RecordDescriptor;
-import org.apache.doris.kafka.connector.exception.SchemaChangeException;
-import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
-import org.apache.doris.kafka.connector.model.ColumnDescriptor;
-import org.apache.doris.kafka.connector.model.TableDescriptor;
-import org.apache.doris.kafka.connector.model.doris.Schema;
-import org.apache.doris.kafka.connector.service.DorisSystemService;
-import org.apache.doris.kafka.connector.service.RestService;
-import org.apache.doris.kafka.connector.writer.DorisWriter;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DebeziumSchemaChange extends DorisWriter {
-    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumSchemaChange.class);
-    public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
-    public static final String TABLE_CHANGES = "tableChanges";
-    public static final String TABLE_CHANGES_TYPE = "type";
-    private final Map<String, String> topic2TableMap;
-    private SchemaChangeManager schemaChangeManager;
-    private DorisSystemService dorisSystemService;
-    private Set<String> sinkTableSet;
-    private List<String> ddlSqlList;
-
-    public DebeziumSchemaChange(
-            String topic,
-            int partition,
-            DorisOptions dorisOptions,
-            ConnectionProvider connectionProvider,
-            DorisConnectMonitor connectMonitor) {
-        super(topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
-        this.schemaChange = true;
-        this.sinkTableSet = new HashSet<>();
-        this.dorisSystemService = new DorisSystemService(dorisOptions);
-        this.topic2TableMap = dorisOptions.getTopicMap();
-        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
-        init();
-    }
-
-    @Override
-    public void fetchOffset() {
-        // do nothing
-    }
-
-    private void init() {
-        Set<Map.Entry<String, String>> entrySet = topic2TableMap.entrySet();
-        for (Map.Entry<String, String> entry : entrySet) {
-            sinkTableSet.add(entry.getValue());
-        }
-    }
-
-    @Override
-    public void insert(SinkRecord record) {
-        if (!validate(record)) {
-            processedOffset.set(record.kafkaOffset());
-            return;
-        }
-        schemaChange(record);
-    }
-
-    private boolean validate(final SinkRecord record) {
-        if (!isSchemaChange(record)) {
-            LOG.warn(
-                    "Current topic={}, the message does not contain schema 
change change information, please check schema.topic",
-                    dorisOptions.getSchemaTopic());
-            throw new SchemaChangeException(
-                    "The message does not contain schema change change 
information, please check schema.topic");
-        }
-
-        tableName = resolveTableName(record);
-        if (tableName == null) {
-            LOG.warn(
-                    "Ignored to write record from topic '{}' partition '{}' 
offset '{}'. No resolvable table name",
-                    record.topic(),
-                    record.kafkaPartition(),
-                    record.kafkaOffset());
-            return false;
-        }
-
-        if (!sinkTableSet.contains(tableName)) {
-            LOG.warn(
-                    "The "
-                            + tableName
-                            + " is not defined and requires synchronized data. 
If you need to synchronize the table data, please configure it in 
'doris.topic2table.map'");
-            return false;
-        }
-
-        Struct recordStruct = (Struct) (record.value());
-        if (isTruncate(recordStruct)) {
-            LOG.warn("Truncate {} table is not supported", tableName);
-            return false;
-        }
-
-        List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
-        Struct tableChange = (Struct) tableChanges.get(0);
-        if ("DROP".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))
-                || 
"CREATE".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))) {
-            LOG.warn(
-                    "CREATE and DROP {} tables are currently not supported. 
Please create or drop them manually.",
-                    tableName);
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void commit(int partition) {
-        // do nothing
-    }
-
-    private void schemaChange(final SinkRecord record) {
-        Struct recordStruct = (Struct) (record.value());
-        List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
-        Struct tableChange = (Struct) tableChanges.get(0);
-        RecordDescriptor recordDescriptor =
-                RecordDescriptor.builder()
-                        .withSinkRecord(record)
-                        .withTableChange(tableChange)
-                        .build();
-        tableChange(tableName, recordDescriptor);
-    }
-
-    private boolean isTruncate(final Struct record) {
-        // Generally the truncate corresponding tableChanges is empty
-        return record.getArray(TABLE_CHANGES).isEmpty();
-    }
-
-    private static boolean isSchemaChange(SinkRecord record) {
-        return record.valueSchema() != null
-                && !Strings.isNullOrEmpty(record.valueSchema().name())
-                && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
-    }
-
-    private String resolveTableName(SinkRecord record) {
-        if (isTombstone(record)) {
-            LOG.warn(
-                    "Ignore this record because it seems to be a tombstone 
that doesn't have source field, then cannot resolve table name in topic '{}', 
partition '{}', offset '{}'",
-                    record.topic(),
-                    record.kafkaPartition(),
-                    record.kafkaOffset());
-            return null;
-        }
-        Struct source = ((Struct) 
record.value()).getStruct(Envelope.FieldName.SOURCE);
-        return source.getString("table");
-    }
-
-    private void alterTableIfNeeded(String tableName, RecordDescriptor record) 
{
-        LOG.debug("Attempting to alter table '{}'.", tableName);
-        if (!hasTable(tableName)) {
-            LOG.error("Table '{}' does not exist and cannot be altered.", 
tableName);
-            throw new SchemaChangeException("Could not find table: " + 
tableName);
-        }
-        final TableDescriptor dorisTableDescriptor = 
obtainTableSchema(tableName);
-        SchemaChangeHelper.compareSchema(dorisTableDescriptor, 
record.getFields());
-        ddlSqlList = 
SchemaChangeHelper.generateDDLSql(dorisOptions.getDatabase(), tableName);
-        doSchemaChange(dorisOptions.getDatabase(), tableName);
-    }
-
-    /** Obtain table schema from doris. */
-    private TableDescriptor obtainTableSchema(String tableName) {
-        Schema schema = RestService.getSchema(dorisOptions, dbName, tableName, 
LOG);
-        List<ColumnDescriptor> columnDescriptors = new ArrayList<>();
-        schema.getProperties()
-                .forEach(
-                        column -> {
-                            ColumnDescriptor columnDescriptor =
-                                    ColumnDescriptor.builder()
-                                            .columnName(column.getName())
-                                            .typeName(column.getType())
-                                            .comment(column.getComment())
-                                            .build();
-                            columnDescriptors.add(columnDescriptor);
-                        });
-        return TableDescriptor.builder()
-                .tableName(tableName)
-                .type(schema.getKeysType())
-                .columns(columnDescriptors)
-                .build();
-    }
-
-    private boolean hasTable(String tableName) {
-        return dorisSystemService.tableExists(dbName, tableName);
-    }
-
-    private void tableChange(String tableName, RecordDescriptor 
recordDescriptor) {
-        if (!hasTable(tableName)) {
-            // TODO Table does not exist, automatically created it.
-            LOG.error("{} Table does not exist, please create manually.", 
tableName);
-        } else {
-            // Table exists, lets attempt to alter it if necessary.
-            alterTableIfNeeded(tableName, recordDescriptor);
-        }
-        processedOffset.set(recordDescriptor.getOffset());
-    }
-
-    private boolean doSchemaChange(String database, String tableName) {
-        boolean status = false;
-        if (ddlSqlList.isEmpty()) {
-            LOG.info("Schema change ddl is empty, not need do schema change.");
-            return false;
-        }
-        try {
-            List<SchemaChangeHelper.DDLSchema> ddlSchemas = 
SchemaChangeHelper.getDdlSchemas();
-            for (int i = 0; i < ddlSqlList.size(); i++) {
-                SchemaChangeHelper.DDLSchema ddlSchema = ddlSchemas.get(i);
-                String ddlSql = ddlSqlList.get(i);
-                boolean doSchemaChange = checkSchemaChange(database, 
tableName, ddlSchema);
-                status =
-                        doSchemaChange
-                                && schemaChangeManager.execute(ddlSql, 
dorisOptions.getDatabase());
-                LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
-            }
-        } catch (Exception e) {
-            LOG.warn("schema change error :", e);
-        }
-        return status;
-    }
-
-    private boolean checkSchemaChange(
-            String database, String table, SchemaChangeHelper.DDLSchema 
ddlSchema)
-            throws IllegalArgumentException, IOException {
-        Map<String, Object> param =
-                SchemaChangeManager.buildRequestParam(
-                        ddlSchema.isDropColumn(), ddlSchema.getColumnName());
-        return schemaChangeManager.checkSchemaChange(database, table, param);
-    }
-
-    public long getOffset() {
-        committedOffset.set(processedOffset.get());
-        return committedOffset.get() + 1;
-    }
-
-    private boolean isTombstone(SinkRecord record) {
-        return record.value() == null;
-    }
-
-    @VisibleForTesting
-    public void setSinkTableSet(Set<String> sinkTableSet) {
-        this.sinkTableSet = sinkTableSet;
-    }
-
-    @VisibleForTesting
-    public void setDorisSystemService(DorisSystemService dorisSystemService) {
-        this.dorisSystemService = dorisSystemService;
-    }
-
-    @VisibleForTesting
-    public List<String> getDdlSqlList() {
-        return ddlSqlList;
-    }
-
-    @VisibleForTesting
-    public void setSchemaChangeManager(SchemaChangeManager 
schemaChangeManager) {
-        this.schemaChangeManager = schemaChangeManager;
-    }
-}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
deleted file mode 100644
index abf424c..0000000
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.writer.schema;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.doris.kafka.connector.converter.RecordDescriptor;
-import org.apache.doris.kafka.connector.model.ColumnDescriptor;
-import org.apache.doris.kafka.connector.model.TableDescriptor;
-
-public class SchemaChangeHelper {
-    private static final List<ColumnDescriptor> addColumnDescriptors = 
Lists.newArrayList();
-    // Used to determine whether the column in the doris table can undergo 
schema change
-    private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
-    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
-
-    // TODO support drop column
-    // Dropping a column is a dangerous behavior and may result in an 
accidental deletion.
-    // There are some problems in the current implementation: each alter 
column operation will read
-    // the table structure
-    // in doris and compare the schema with the topic message.
-    // When there are more columns in the doris table than in the upstream 
table,
-    // these redundant columns in doris will be dropped, regardless of these 
redundant columns, is
-    // what you need.
-    // Therefore, the operation of dropping a column behavior currently 
requires the user to do it
-    // himself.
-    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
-
-    /**
-     * Compare kafka upstream table structure with doris table structure. If 
kafka field does not
-     * contain the structure of dorisTable, then need to add this field.
-     *
-     * @param dorisTable read from the table schema of doris.
-     * @param fields table structure from kafka upstream data source.
-     */
-    public static void compareSchema(
-            TableDescriptor dorisTable, Map<String, 
RecordDescriptor.FieldDescriptor> fields) {
-        // Determine whether fields need to be added to doris table
-        addColumnDescriptors.clear();
-        Collection<ColumnDescriptor> dorisTableColumns = 
dorisTable.getColumns();
-        Set<String> dorisTableColumnNames =
-                dorisTableColumns.stream()
-                        .map(ColumnDescriptor::getColumnName)
-                        .collect(Collectors.toSet());
-        Set<Map.Entry<String, RecordDescriptor.FieldDescriptor>> fieldsEntries 
= fields.entrySet();
-        for (Map.Entry<String, RecordDescriptor.FieldDescriptor> fieldEntry : 
fieldsEntries) {
-            String fieldName = fieldEntry.getKey();
-            if (!dorisTableColumnNames.contains(fieldName)) {
-                RecordDescriptor.FieldDescriptor fieldDescriptor = 
fieldEntry.getValue();
-                ColumnDescriptor columnDescriptor =
-                        new ColumnDescriptor.Builder()
-                                .columnName(fieldDescriptor.getName())
-                                .typeName(fieldDescriptor.getSchemaTypeName())
-                                
.defaultValue(fieldDescriptor.getDefaultValue())
-                                .comment(fieldDescriptor.getComment())
-                                .build();
-                addColumnDescriptors.add(columnDescriptor);
-            }
-        }
-    }
-
-    public static List<String> generateDDLSql(String database, String table) {
-        ddlSchemas.clear();
-        List<String> ddlList = Lists.newArrayList();
-        for (ColumnDescriptor columnDescriptor : addColumnDescriptors) {
-            ddlList.add(buildAddColumnDDL(database, table, columnDescriptor));
-            ddlSchemas.add(new DDLSchema(columnDescriptor.getColumnName(), 
false));
-        }
-        return ddlList;
-    }
-
-    public static List<DDLSchema> getDdlSchemas() {
-        return ddlSchemas;
-    }
-
-    private static String buildDropColumnDDL(String database, String 
tableName, String columName) {
-        return String.format(
-                DROP_DDL,
-                identifier(database) + "." + identifier(tableName),
-                identifier(columName));
-    }
-
-    private static String buildAddColumnDDL(
-            String database, String tableName, ColumnDescriptor 
columnDescriptor) {
-        String columnName = columnDescriptor.getColumnName();
-        String columnType = columnDescriptor.getTypeName();
-        String defaultValue = columnDescriptor.getDefaultValue();
-        String comment = columnDescriptor.getComment();
-        String addDDL =
-                String.format(
-                        ADD_DDL,
-                        identifier(database) + "." + identifier(tableName),
-                        identifier(columnName),
-                        columnType);
-        if (defaultValue != null) {
-            addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue);
-        }
-        if (StringUtils.isNotEmpty(comment)) {
-            addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'";
-        }
-        return addDDL;
-    }
-
-    private static String identifier(String name) {
-        return "`" + name + "`";
-    }
-
-    private static String quoteDefaultValue(String defaultValue) {
-        // DEFAULT current_timestamp not need quote
-        if (defaultValue.equalsIgnoreCase("current_timestamp")) {
-            return defaultValue;
-        }
-        return "'" + defaultValue + "'";
-    }
-
-    private static String quoteComment(String comment) {
-        return comment.replaceAll("'", "\\\\'");
-    }
-
-    public static class DDLSchema {
-        private final String columnName;
-        private final boolean isDropColumn;
-
-        public DDLSchema(String columnName, boolean isDropColumn) {
-            this.columnName = columnName;
-            this.isDropColumn = isDropColumn;
-        }
-
-        public String getColumnName() {
-            return columnName;
-        }
-
-        public boolean isDropColumn() {
-            return isDropColumn;
-        }
-    }
-}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 3cff4fa..f80a737 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -19,6 +19,13 @@
 
 package org.apache.doris.kafka.connector.converter;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -30,20 +37,31 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.model.doris.Schema;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
+import org.apache.doris.kafka.connector.service.RestService;
 import org.apache.doris.kafka.connector.writer.TestRecordBuffer;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 public class TestRecordService {
 
+    private final ObjectMapper objectMapper = new ObjectMapper();
     private RecordService recordService;
     private Properties props = new Properties();
     private JsonConverter jsonConverter = new JsonConverter();
+    private MockedStatic<RestService> mockRestService;
 
     @Before
     public void init() throws IOException {
@@ -54,9 +72,21 @@ public class TestRecordService {
         props.load(stream);
         props.put("task_id", "1");
         props.put("converter.mode", "debezium_ingestion");
+        props.put("schema.evolution", "basic");
+        props.put(
+                "doris.topic2table.map",
+                
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal");
         recordService = new RecordService(new DorisOptions((Map) props));
         HashMap<String, String> config = new HashMap<>();
         jsonConverter.configure(config, false);
+        mockRestService = mockStatic(RestService.class);
+
+        SchemaChangeManager mockSchemaChangeManager = 
Mockito.mock(SchemaChangeManager.class);
+        DorisSystemService mockDorisSystemService = 
mock(DorisSystemService.class);
+        doNothing().when(mockSchemaChangeManager).addColumnDDL(anyString(), 
any());
+        when(mockDorisSystemService.tableExists(anyString(), 
anyString())).thenReturn(true);
+        recordService.setDorisSystemService(mockDorisSystemService);
+        recordService.setSchemaChangeManager(mockSchemaChangeManager);
     }
 
     /**
@@ -70,7 +100,19 @@ public class TestRecordService {
      */
     @Test
     public void processMysqlDebeziumStructRecord() throws IOException {
-        String topic = "normal.wdl_test.example_table";
+        String schemaStr =
+                
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"BIGINT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"age\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"email\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"birth_date\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV
 [...]
+        Schema schema = null;
+        try {
+            schema = objectMapper.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException e) {
+            throw new DorisException(e);
+        }
+        mockRestService
+                .when(() -> RestService.getSchema(any(), any(), any(), any()))
+                .thenReturn(schema);
+
+        String topic = "avro_schema.wdl_test.example_table";
         // no delete value
         String noDeleteValue =
                 
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
 [...]
@@ -86,13 +128,48 @@ public class TestRecordService {
         buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
     }
 
+    @Test
+    public void processMysqlDebeziumStructRecordAlter() throws IOException {
+        String schemaStr =
+                
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"BIGINT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"age\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"email\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"birth_date\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV
 [...]
+        Schema schema = null;
+        try {
+            schema = objectMapper.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException e) {
+            throw new DorisException(e);
+        }
+        mockRestService
+                .when(() -> RestService.getSchema(any(), any(), any(), any()))
+                .thenReturn(schema);
+
+        String topic = "avro_schema.wdl_test.example_table";
+        String topicMsg =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
 [...]
+        SchemaAndValue schemaValue =
+                jsonConverter.toConnectData(topic, 
topicMsg.getBytes(StandardCharsets.UTF_8));
+        SinkRecord noDeleteSinkRecord =
+                TestRecordBuffer.newSinkRecord(topic, schemaValue.value(), 8, 
schemaValue.schema());
+        recordService.processStructRecord(noDeleteSinkRecord);
+
+        // Compare the results of schema change
+        Map<String, String> resultFields = new HashMap<>();
+        resultFields.put("time_column", "DATETIME(0)");
+        resultFields.put("blob_column", "STRING");
+        Set<RecordDescriptor.FieldDescriptor> missingFields = 
recordService.getMissingFields();
+        for (RecordDescriptor.FieldDescriptor missingField : missingFields) {
+            
Assert.assertTrue(resultFields.containsKey(missingField.getName()));
+            Assert.assertEquals(
+                    resultFields.get(missingField.getName()), 
missingField.getTypeName());
+        }
+    }
+
     private void buildProcessStructRecord(String topic, String sourceValue, 
String target)
             throws IOException {
         SchemaAndValue noDeleteSchemaValue =
                 jsonConverter.toConnectData(topic, 
sourceValue.getBytes(StandardCharsets.UTF_8));
         SinkRecord noDeleteSinkRecord =
                 TestRecordBuffer.newSinkRecord(
-                        noDeleteSchemaValue.value(), 8, 
noDeleteSchemaValue.schema());
+                        topic, noDeleteSchemaValue.value(), 8, 
noDeleteSchemaValue.schema());
         String processResult = 
recordService.processStructRecord(noDeleteSinkRecord);
         Assert.assertEquals(target, processResult);
     }
@@ -113,6 +190,18 @@ public class TestRecordService {
 
     @Test
     public void processStructRecordWithDebeziumSchema() throws IOException {
+        String schemaStr =
+                
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"}],\"status\":200}";
+        Schema schema = null;
+        try {
+            schema = objectMapper.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException e) {
+            throw new DorisException(e);
+        }
+        mockRestService
+                .when(() -> RestService.getSchema(any(), any(), any(), any()))
+                .thenReturn(schema);
+
         String topic = "normal.wdl_test.test_sink_normal";
 
         // no delete value
@@ -171,4 +260,10 @@ public class TestRecordService {
         String s = recordService.processStringRecord(record);
         Assert.assertEquals("doris", s);
     }
+
+    @After
+    public void close() {
+        mockRestService.close();
+        ;
+    }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
deleted file mode 100644
index c95af44..0000000
--- 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.doris.kafka.connector.writer;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.when;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.doris.kafka.connector.cfg.DorisOptions;
-import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
-import org.apache.doris.kafka.connector.exception.DorisException;
-import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
-import org.apache.doris.kafka.connector.model.doris.Schema;
-import org.apache.doris.kafka.connector.service.DorisSystemService;
-import org.apache.doris.kafka.connector.service.RestService;
-import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange;
-import org.apache.doris.kafka.connector.writer.schema.SchemaChangeManager;
-import org.apache.kafka.connect.data.SchemaAndValue;
-import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
-
-public class TestDebeziumSchemaChange {
-    private final ObjectMapper objectMapper = new ObjectMapper();
-    private final JsonConverter jsonConverter = new JsonConverter();
-    private final HashSet<String> sinkTableSet = new HashSet<>();
-    private DebeziumSchemaChange debeziumSchemaChange;
-    private DorisOptions dorisOptions;
-    private String topic;
-    private MockedStatic<RestService> mockRestService;
-
-    @Before
-    public void init() throws IOException {
-        InputStream stream =
-                this.getClass()
-                        .getClassLoader()
-                        
.getResourceAsStream("doris-connector-sink.properties");
-        Properties props = new Properties();
-        props.load(stream);
-        props.put("task_id", "1");
-        props.put("name", "sink-connector-test");
-        topic = "normal";
-        dorisOptions = new DorisOptions((Map) props);
-        DorisConnectMonitor dorisConnectMonitor = 
mock(DorisConnectMonitor.class);
-        DorisSystemService mockDorisSystemService = 
mock(DorisSystemService.class);
-        jsonConverter.configure(new HashMap<>(), false);
-        mockRestService = mockStatic(RestService.class);
-        SchemaChangeManager mockSchemaChangeManager = 
Mockito.mock(SchemaChangeManager.class);
-        Mockito.when(
-                        mockSchemaChangeManager.checkSchemaChange(
-                                Mockito.any(), Mockito.any(), Mockito.any()))
-                .thenReturn(true);
-        debeziumSchemaChange =
-                new DebeziumSchemaChange(
-                        topic,
-                        0,
-                        dorisOptions,
-                        new JdbcConnectionProvider(dorisOptions),
-                        dorisConnectMonitor);
-        when(mockDorisSystemService.tableExists(anyString(), 
anyString())).thenReturn(true);
-
-        sinkTableSet.add("normal_time");
-        debeziumSchemaChange.setSchemaChangeManager(mockSchemaChangeManager);
-        debeziumSchemaChange.setSinkTableSet(sinkTableSet);
-        debeziumSchemaChange.setDorisSystemService(mockDorisSystemService);
-    }
-
-    @Test
-    public void testAlterSchemaChange() {
-        String alterTopicMsg =
-                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},
 [...]
-        SchemaAndValue schemaAndValue =
-                jsonConverter.toConnectData(topic, 
alterTopicMsg.getBytes(StandardCharsets.UTF_8));
-        SinkRecord sinkRecord =
-                TestRecordBuffer.newSinkRecord(schemaAndValue.value(), 8, 
schemaAndValue.schema());
-        String normalTimeSchemaStr =
-                
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"timestamp_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"datetime_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"date_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV2\"}],\"status\":200}";
-        Schema normalTimeSchema = null;
-        try {
-            normalTimeSchema = objectMapper.readValue(normalTimeSchemaStr, 
Schema.class);
-        } catch (JsonProcessingException e) {
-            throw new DorisException(e);
-        }
-        mockRestService
-                .when(() -> RestService.getSchema(any(), any(), any(), any()))
-                .thenReturn(normalTimeSchema);
-
-        debeziumSchemaChange.insert(sinkRecord);
-        List<String> ddlSqlList = debeziumSchemaChange.getDdlSqlList();
-        Assert.assertEquals(
-                ddlSqlList.get(0),
-                "ALTER TABLE `test_db`.`normal_time` ADD COLUMN `time_test` 
STRING DEFAULT '12:00'");
-    }
-
-    @After
-    public void close() {
-        mockRestService.close();
-    }
-}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java
index 3dd2292..de2e654 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java
@@ -52,16 +52,11 @@ public class TestRecordBuffer {
         return record;
     }
 
-    public static SinkRecord newSinkRecord(Object value, long offset, Schema 
valueSchema) {
+    public static SinkRecord newSinkRecord(
+            String topic, Object value, long offset, Schema valueSchema) {
         SinkRecord record =
                 new SinkRecord(
-                        "topic",
-                        0,
-                        Schema.OPTIONAL_STRING_SCHEMA,
-                        "key",
-                        valueSchema,
-                        value,
-                        offset);
+                        topic, 0, Schema.OPTIONAL_STRING_SCHEMA, "key", 
valueSchema, value, offset);
         return record;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to