This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fb89033273 [Feature][Connector-V2] SqlServer support user-defined type 
(#7706)
fb89033273 is described below

commit fb890332732f44ce16fa9c721851cc07b4a70865
Author: Jia Fan <[email protected]>
AuthorDate: Sun Sep 29 10:05:28 2024 +0800

    [Feature][Connector-V2] SqlServer support user-defined type (#7706)
---
 .../connector/sqlserver/SqlServerConnection.java   | 763 +++++++++++++++++++++
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   |   4 +-
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    |  10 +-
 .../src/test/resources/ddl/column_type_test.sql    |  32 +-
 .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java |  18 +-
 5 files changed, 807 insertions(+), 20 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
new file mode 100644
index 0000000000..19c0cc0cbd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.sqlserver;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.sqlserver.jdbc.SQLServerDriver;
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.config.Configuration;
+import io.debezium.data.Envelope;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.schema.DatabaseSchema;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog.SELECT_COLUMNS_SQL_TEMPLATE;
+
+/**
+ * {@link JdbcConnection} extension to be used with Microsoft SQL Server
+ *
+ * @author Horia Chiorean ([email protected]), Jiri Pechanec
+ */
+public class SqlServerConnection extends JdbcConnection {
+
+    /**
+     * @deprecated The connector will determine the database server timezone 
offset automatically.
+     */
+    @Deprecated public static final String SERVER_TIMEZONE_PROP_NAME = 
"server.timezone";
+
+    public static final String INSTANCE_NAME = "instance";
+
+    private static final String GET_DATABASE_NAME = "SELECT name FROM 
sys.databases WHERE name = ?";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SqlServerConnection.class);
+
+    private static final String STATEMENTS_PLACEHOLDER = "#";
+    private static final String DATABASE_NAME_PLACEHOLDER = "#db";
+    private static final String GET_MAX_LSN = "SELECT 
[#db].sys.fn_cdc_get_max_lsn()";
+    private static final String GET_MAX_TRANSACTION_LSN =
+            "SELECT MAX(start_lsn) FROM [#db].cdc.lsn_time_mapping WHERE 
tran_id <> 0x00";
+    private static final String GET_NTH_TRANSACTION_LSN_FROM_BEGINNING =
+            "SELECT MAX(start_lsn) FROM (SELECT TOP (?) start_lsn FROM 
[#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00 ORDER BY start_lsn) as 
next_lsns";
+    private static final String GET_NTH_TRANSACTION_LSN_FROM_LAST =
+            "SELECT MAX(start_lsn) FROM (SELECT TOP (? + 1) start_lsn FROM 
[#db].cdc.lsn_time_mapping WHERE start_lsn >= ? AND tran_id <> 0x00 ORDER BY 
start_lsn) as next_lsns";
+
+    private static final String GET_MIN_LSN = "SELECT 
[#db].sys.fn_cdc_get_min_lsn('#')";
+    private static final String LOCK_TABLE = "SELECT * FROM [#] WITH 
(TABLOCKX)";
+    private static final String INCREMENT_LSN = "SELECT 
[#db].sys.fn_cdc_increment_lsn(?)";
+    private static final String GET_ALL_CHANGES_FOR_TABLE =
+            "SELECT *# FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all 
update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
+    private final String get_all_changes_for_table;
+    protected static final String LSN_TIMESTAMP_SELECT_STATEMENT =
+            
"TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), 
DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))";
+
+    /**
+     * Queries the list of captured column names and their change table 
identifiers in the given
+     * database.
+     */
+    private static final String GET_CAPTURED_COLUMNS =
+            "SELECT object_id, column_name"
+                    + " FROM [#db].cdc.captured_columns"
+                    + " ORDER BY object_id, column_id";
+
+    /**
+     * Queries the list of capture instances in the given database.
+     *
+     * <p>If two or more capture instances with the same start LSN are 
available for a given source
+     * table, only the newest one will be returned.
+     *
+     * <p>We use a query instead of {@code 
sys.sp_cdc_help_change_data_capture} because: 1. The
+     * stored procedure doesn't allow filtering capture instances by start 
LSN. 2. There is no way
+     * to use the result returned by a stored procedure in a query.
+     */
+    private static final String GET_CHANGE_TABLES =
+            "WITH ordered_change_tables"
+                    + " AS (SELECT ROW_NUMBER() OVER (PARTITION BY 
ct.source_object_id, ct.start_lsn ORDER BY ct.create_date DESC) AS ct_sequence,"
+                    + " ct.*"
+                    + " FROM [#db].cdc.change_tables AS ct#)"
+                    + " SELECT OBJECT_SCHEMA_NAME(source_object_id, DB_ID(?)),"
+                    + " OBJECT_NAME(source_object_id, DB_ID(?)),"
+                    + " capture_instance,"
+                    + " object_id,"
+                    + " start_lsn"
+                    + " FROM ordered_change_tables WHERE ct_sequence = 1";
+
+    private static final String GET_NEW_CHANGE_TABLES =
+            "SELECT * FROM [#db].cdc.change_tables WHERE start_lsn BETWEEN ? 
AND ?";
+    private static final String OPENING_QUOTING_CHARACTER = "[";
+    private static final String CLOSING_QUOTING_CHARACTER = "]";
+
+    private static final String URL_PATTERN =
+            "jdbc:sqlserver://${"
+                    + JdbcConfiguration.HOSTNAME
+                    + "}:${"
+                    + JdbcConfiguration.PORT
+                    + "}";
+
+    private final boolean multiPartitionMode;
+    private final String getAllChangesForTable;
+    private final int queryFetchSize;
+
+    private final SqlServerDefaultValueConverter defaultValueConverter;
+
+    private boolean optionRecompile;
+
+    /**
+     * Creates a new connection using the supplied configuration.
+     *
+     * @param config {@link Configuration} instance, may not be null.
+     * @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
+     * @param valueConverters {@link SqlServerValueConverters} instance
+     * @param classLoaderSupplier class loader supplier
+     * @param skippedOperations a set of {@link Envelope.Operation} to skip in 
streaming
+     */
+    public SqlServerConnection(
+            JdbcConfiguration config,
+            SourceTimestampMode sourceTimestampMode,
+            SqlServerValueConverters valueConverters,
+            Supplier<ClassLoader> classLoaderSupplier,
+            Set<Envelope.Operation> skippedOperations,
+            boolean multiPartitionMode) {
+        super(
+                config,
+                createConnectionFactory(multiPartitionMode),
+                classLoaderSupplier,
+                OPENING_QUOTING_CHARACTER,
+                CLOSING_QUOTING_CHARACTER);
+
+        if (config().hasKey(SERVER_TIMEZONE_PROP_NAME)) {
+            LOGGER.warn(
+                    "The '{}' option is deprecated and is not taken into 
account",
+                    SERVER_TIMEZONE_PROP_NAME);
+        }
+
+        defaultValueConverter =
+                new SqlServerDefaultValueConverter(this::connection, 
valueConverters);
+        this.queryFetchSize = 
config().getInteger(CommonConnectorConfig.QUERY_FETCH_SIZE);
+
+        if (!skippedOperations.isEmpty()) {
+            Set<String> skippedOps = new HashSet<>();
+            StringBuilder getAllChangesForTableStatement =
+                    new StringBuilder(
+                            "SELECT *# FROM 
[#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') WHERE 
__$operation NOT IN (");
+            skippedOperations.forEach(
+                    (Envelope.Operation operation) -> {
+                        // This number are the __$operation number in the 
SQLServer
+                        // 
https://docs.microsoft.com/en-us/sql/relational-databases/system-functions/cdc-fn-cdc-get-all-changes-capture-instance-transact-sql?view=sql-server-ver15#table-returned
+                        switch (operation) {
+                            case CREATE:
+                                skippedOps.add("2");
+                                break;
+                            case UPDATE:
+                                skippedOps.add("3");
+                                skippedOps.add("4");
+                                break;
+                            case DELETE:
+                                skippedOps.add("1");
+                                break;
+                        }
+                    });
+            getAllChangesForTableStatement.append(String.join(",", 
skippedOps));
+            getAllChangesForTableStatement.append(
+                    ") order by [__$start_lsn] ASC, [__$seqval] ASC, 
[__$operation] ASC");
+            get_all_changes_for_table = 
getAllChangesForTableStatement.toString();
+        } else {
+            get_all_changes_for_table = GET_ALL_CHANGES_FOR_TABLE;
+        }
+
+        getAllChangesForTable =
+                get_all_changes_for_table.replaceFirst(
+                        STATEMENTS_PLACEHOLDER,
+                        Matcher.quoteReplacement(
+                                
sourceTimestampMode.lsnTimestampSelectStatement()));
+        this.multiPartitionMode = multiPartitionMode;
+
+        this.optionRecompile = false;
+    }
+
+    /**
+     * Creates a new connection using the supplied configuration.
+     *
+     * @param config {@link Configuration} instance, may not be null.
+     * @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
+     * @param valueConverters {@link SqlServerValueConverters} instance
+     * @param classLoaderSupplier class loader supplier
+     * @param skippedOperations a set of {@link Envelope.Operation} to skip in 
streaming
+     * @param optionRecompile Includes query option RECOMPILE on incremental 
snapshots
+     */
+    public SqlServerConnection(
+            JdbcConfiguration config,
+            SourceTimestampMode sourceTimestampMode,
+            SqlServerValueConverters valueConverters,
+            Supplier<ClassLoader> classLoaderSupplier,
+            Set<Envelope.Operation> skippedOperations,
+            boolean multiPartitionMode,
+            boolean optionRecompile) {
+        this(
+                config,
+                sourceTimestampMode,
+                valueConverters,
+                classLoaderSupplier,
+                skippedOperations,
+                multiPartitionMode);
+
+        this.optionRecompile = optionRecompile;
+    }
+
+    private static String createUrlPattern(boolean multiPartitionMode) {
+        String pattern = URL_PATTERN;
+        if (!multiPartitionMode) {
+            pattern += ";databaseName=${" + JdbcConfiguration.DATABASE + "}";
+        }
+
+        return pattern;
+    }
+
+    private static ConnectionFactory createConnectionFactory(boolean 
multiPartitionMode) {
+        return JdbcConnection.patternBasedFactory(
+                createUrlPattern(multiPartitionMode),
+                SQLServerDriver.class.getName(),
+                SqlServerConnection.class.getClassLoader(),
+                JdbcConfiguration.PORT.withDefault(
+                        SqlServerConnectorConfig.PORT.defaultValueAsString()));
+    }
+
+    /**
+     * Returns a JDBC connection string for the current configuration.
+     *
+     * @return a {@code String} where the variables in {@code urlPattern} are 
replaced with values
+     *     from the configuration
+     */
+    public String connectionString() {
+        return connectionString(createUrlPattern(multiPartitionMode));
+    }
+
+    @Override
+    public synchronized Connection connection(boolean executeOnConnect) throws 
SQLException {
+        boolean connected = isConnected();
+        Connection connection = super.connection(executeOnConnect);
+
+        if (!connected) {
+            connection.setAutoCommit(false);
+        }
+
+        return connection;
+    }
+
+    /** @return the current largest log sequence number */
+    public Lsn getMaxLsn(String databaseName) throws SQLException {
+        return queryAndMap(
+                replaceDatabaseNamePlaceholder(GET_MAX_LSN, databaseName),
+                singleResultMapper(
+                        rs -> {
+                            final Lsn ret = Lsn.valueOf(rs.getBytes(1));
+                            LOGGER.trace("Current maximum lsn is {}", ret);
+                            return ret;
+                        },
+                        "Maximum LSN query must return exactly one value"));
+    }
+
+    /**
+     * @return the log sequence number of the most recent transaction that 
isn't further than {@code
+     *     maxOffset} from the beginning.
+     */
+    public Lsn getNthTransactionLsnFromBeginning(String databaseName, int 
maxOffset)
+            throws SQLException {
+        return prepareQueryAndMap(
+                replaceDatabaseNamePlaceholder(
+                        GET_NTH_TRANSACTION_LSN_FROM_BEGINNING, databaseName),
+                statement -> {
+                    statement.setInt(1, maxOffset);
+                },
+                singleResultMapper(
+                        rs -> {
+                            final Lsn ret = Lsn.valueOf(rs.getBytes(1));
+                            LOGGER.trace("Nth lsn from beginning is {}", ret);
+                            return ret;
+                        },
+                        "Nth LSN query must return exactly one value"));
+    }
+
+    /**
+     * @return the log sequence number of the most recent transaction that 
isn't further than {@code
+     *     maxOffset} from {@code lastLsn}.
+     */
+    public Lsn getNthTransactionLsnFromLast(String databaseName, Lsn lastLsn, 
int maxOffset)
+            throws SQLException {
+        return prepareQueryAndMap(
+                
replaceDatabaseNamePlaceholder(GET_NTH_TRANSACTION_LSN_FROM_LAST, databaseName),
+                statement -> {
+                    statement.setInt(1, maxOffset);
+                    statement.setBytes(2, lastLsn.getBinary());
+                },
+                singleResultMapper(
+                        rs -> {
+                            final Lsn ret = Lsn.valueOf(rs.getBytes(1));
+                            LOGGER.trace("Nth lsn from last is {}", ret);
+                            return ret;
+                        },
+                        "Nth LSN query must return exactly one value"));
+    }
+
+    /** @return the log sequence number of the most recent transaction. */
+    public Lsn getMaxTransactionLsn(String databaseName) throws SQLException {
+        return queryAndMap(
+                replaceDatabaseNamePlaceholder(GET_MAX_TRANSACTION_LSN, 
databaseName),
+                singleResultMapper(
+                        rs -> {
+                            final Lsn ret = Lsn.valueOf(rs.getBytes(1));
+                            LOGGER.trace("Max transaction lsn is {}", ret);
+                            return ret;
+                        },
+                        "Max transaction LSN query must return exactly one 
value"));
+    }
+
+    /** @return the smallest log sequence number of table */
+    public Lsn getMinLsn(String databaseName, String changeTableName) throws 
SQLException {
+        String query =
+                replaceDatabaseNamePlaceholder(GET_MIN_LSN, databaseName)
+                        .replace(STATEMENTS_PLACEHOLDER, changeTableName);
+        return queryAndMap(
+                query,
+                singleResultMapper(
+                        rs -> {
+                            final Lsn ret = Lsn.valueOf(rs.getBytes(1));
+                            LOGGER.trace("Current minimum lsn is {}", ret);
+                            return ret;
+                        },
+                        "Minimum LSN query must return exactly one value"));
+    }
+
+    @Override
+    protected Optional<ColumnEditor> readTableColumn(
+            ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter 
columnFilter)
+            throws SQLException {
+        return doReadTableColumn(columnMetadata, tableId, columnFilter);
+    }
+
+    private Optional<ColumnEditor> doReadTableColumn(
+            ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter 
columnFilter)
+            throws SQLException {
+        // Oracle drivers require this for LONG/LONGRAW to be fetched first.
+        final String defaultValue = columnMetadata.getString(13);
+        String tableSql =
+                StringUtils.isNotEmpty(tableId.table())
+                        ? "AND tbl.name = '" + tableId.table() + "'"
+                        : "";
+
+        Map<String, String> columnTypeMapping = new HashMap<>();
+
+        // Support user-defined types (UDTs)
+        try (PreparedStatement ps =
+                        connection()
+                                .prepareStatement(
+                                        String.format(
+                                                SELECT_COLUMNS_SQL_TEMPLATE,
+                                                tableId.schema(),
+                                                tableSql));
+                ResultSet resultSet = ps.executeQuery()) {
+            while (resultSet.next()) {
+                String columnName = resultSet.getString("column_name");
+                String dataType = resultSet.getString("type");
+                columnTypeMapping.put(columnName, dataType);
+            }
+        }
+        final String columnName = columnMetadata.getString(4);
+        if (columnFilter == null
+                || columnFilter.matches(
+                        tableId.catalog(), tableId.schema(), tableId.table(), 
columnName)) {
+            ColumnEditor column = Column.editor().name(columnName);
+            column.type(
+                    columnTypeMapping.containsKey(columnName)
+                            ? columnTypeMapping.get(columnName)
+                            : columnMetadata.getString(6));
+            column.length(columnMetadata.getInt(7));
+            if (columnMetadata.getObject(9) != null) {
+                column.scale(columnMetadata.getInt(9));
+            }
+            column.optional(isNullable(columnMetadata.getInt(11)));
+            column.position(columnMetadata.getInt(17));
+            
column.autoIncremented("YES".equalsIgnoreCase(columnMetadata.getString(23)));
+            String autogenerated = null;
+            try {
+                autogenerated = columnMetadata.getString(24);
+            } catch (SQLException e) {
+                // ignore, some drivers don't have this index - e.g. Postgres
+            }
+            column.generated("YES".equalsIgnoreCase(autogenerated));
+
+            column.nativeType(resolveNativeType(column.typeName()));
+            column.jdbcType(resolveJdbcType(columnMetadata.getInt(5), 
column.nativeType()));
+
+            // Allow implementation to make column changes if required before 
being added to table
+            column = overrideColumn(column);
+
+            if (defaultValue != null) {
+                column.defaultValueExpression(defaultValue);
+            }
+            return Optional.of(column);
+        }
+
+        return Optional.empty();
+    }
+
+    /**
+     * Provides all changes recorder by the SQL Server CDC capture process for 
a set of tables.
+     *
+     * @param databaseName - the name of the database to query
+     * @param changeTables - the requested tables to obtain changes for
+     * @param intervalFromLsn - closed lower bound of interval of changes to 
be provided
+     * @param intervalToLsn - closed upper bound of interval of changes to be 
provided
+     * @param consumer - the change processor
+     * @throws SQLException
+     */
+    public void getChangesForTables(
+            String databaseName,
+            SqlServerChangeTable[] changeTables,
+            Lsn intervalFromLsn,
+            Lsn intervalToLsn,
+            BlockingMultiResultSetConsumer consumer)
+            throws SQLException, InterruptedException {
+        final String[] queries = new String[changeTables.length];
+        final StatementPreparer[] preparers = new 
StatementPreparer[changeTables.length];
+
+        int idx = 0;
+        for (SqlServerChangeTable changeTable : changeTables) {
+            final String query =
+                    replaceDatabaseNamePlaceholder(getAllChangesForTable, 
databaseName)
+                            .replace(STATEMENTS_PLACEHOLDER, 
changeTable.getCaptureInstance());
+            queries[idx] = query;
+            // If the table was added in the middle of queried buffer we need
+            // to adjust from to the first LSN available
+            final Lsn fromLsn = getFromLsn(databaseName, changeTable, 
intervalFromLsn);
+            LOGGER.trace(
+                    "Getting changes for table {} in range[{}, {}]",
+                    changeTable,
+                    fromLsn,
+                    intervalToLsn);
+            preparers[idx] =
+                    statement -> {
+                        if (queryFetchSize > 0) {
+                            statement.setFetchSize(queryFetchSize);
+                        }
+                        statement.setBytes(1, fromLsn.getBinary());
+                        statement.setBytes(2, intervalToLsn.getBinary());
+                    };
+
+            idx++;
+        }
+        prepareQuery(queries, preparers, consumer);
+    }
+
+    private Lsn getFromLsn(
+            String databaseName, SqlServerChangeTable changeTable, Lsn 
intervalFromLsn)
+            throws SQLException {
+        Lsn fromLsn =
+                changeTable.getStartLsn().compareTo(intervalFromLsn) > 0
+                        ? changeTable.getStartLsn()
+                        : intervalFromLsn;
+        return fromLsn.getBinary() != null
+                ? fromLsn
+                : getMinLsn(databaseName, changeTable.getCaptureInstance());
+    }
+
+    /**
+     * Obtain the next available position in the database log.
+     *
+     * @param databaseName - the name of the database that the LSN belongs to
+     * @param lsn - LSN of the current position
+     * @return LSN of the next position in the database
+     * @throws SQLException
+     */
+    public Lsn incrementLsn(String databaseName, Lsn lsn) throws SQLException {
+        return prepareQueryAndMap(
+                replaceDatabaseNamePlaceholder(INCREMENT_LSN, databaseName),
+                statement -> {
+                    statement.setBytes(1, lsn.getBinary());
+                },
+                singleResultMapper(
+                        rs -> {
+                            final Lsn ret = Lsn.valueOf(rs.getBytes(1));
+                            LOGGER.trace("Increasing lsn from {} to {}", lsn, 
ret);
+                            return ret;
+                        },
+                        "Increment LSN query must return exactly one value"));
+    }
+
+    /**
+     * Creates an exclusive lock for a given table.
+     *
+     * @param tableId to be locked
+     * @throws SQLException
+     */
+    public void lockTable(TableId tableId) throws SQLException {
+        final String lockTableStmt = 
LOCK_TABLE.replace(STATEMENTS_PLACEHOLDER, tableId.table());
+        execute(lockTableStmt);
+    }
+
+    private String cdcNameForTable(TableId tableId) {
+        return tableId.schema() + '_' + tableId.table();
+    }
+
+    public static class CdcEnabledTable {
+        private final String tableId;
+        private final String captureName;
+        private final Lsn fromLsn;
+
+        private CdcEnabledTable(String tableId, String captureName, Lsn 
fromLsn) {
+            this.tableId = tableId;
+            this.captureName = captureName;
+            this.fromLsn = fromLsn;
+        }
+
+        public String getTableId() {
+            return tableId;
+        }
+
+        public String getCaptureName() {
+            return captureName;
+        }
+
+        public Lsn getFromLsn() {
+            return fromLsn;
+        }
+    }
+
+    public List<SqlServerChangeTable> getChangeTables(String databaseName) 
throws SQLException {
+        return getChangeTables(databaseName, Lsn.NULL);
+    }
+
+    public List<SqlServerChangeTable> getChangeTables(String databaseName, Lsn 
toLsn)
+            throws SQLException {
+        Map<Integer, List<String>> columns =
+                queryAndMap(
+                        replaceDatabaseNamePlaceholder(GET_CAPTURED_COLUMNS, 
databaseName),
+                        rs -> {
+                            Map<Integer, List<String>> result = new 
HashMap<>();
+                            while (rs.next()) {
+                                int changeTableObjectId = rs.getInt(1);
+                                if (!result.containsKey(changeTableObjectId)) {
+                                    result.put(changeTableObjectId, new 
LinkedList<>());
+                                }
+
+                                
result.get(changeTableObjectId).add(rs.getString(2));
+                            }
+                            return result;
+                        });
+        final ResultSetMapper<List<SqlServerChangeTable>> mapper =
+                rs -> {
+                    final List<SqlServerChangeTable> changeTables = new 
ArrayList<>();
+                    while (rs.next()) {
+                        int changeTableObjectId = rs.getInt(4);
+                        changeTables.add(
+                                new SqlServerChangeTable(
+                                        new TableId(databaseName, 
rs.getString(1), rs.getString(2)),
+                                        rs.getString(3),
+                                        changeTableObjectId,
+                                        Lsn.valueOf(rs.getBytes(5)),
+                                        columns.get(changeTableObjectId)));
+                    }
+                    return changeTables;
+                };
+
+        String query = replaceDatabaseNamePlaceholder(GET_CHANGE_TABLES, 
databaseName);
+
+        if (toLsn.isAvailable()) {
+            return prepareQueryAndMap(
+                    query.replace(STATEMENTS_PLACEHOLDER, " WHERE ct.start_lsn 
<= ?"),
+                    ps -> {
+                        ps.setBytes(1, toLsn.getBinary());
+                        ps.setString(2, databaseName);
+                        ps.setString(3, databaseName);
+                    },
+                    mapper);
+        } else {
+            return prepareQueryAndMap(
+                    query.replace(STATEMENTS_PLACEHOLDER, ""),
+                    ps -> {
+                        ps.setString(1, databaseName);
+                        ps.setString(2, databaseName);
+                    },
+                    mapper);
+        }
+    }
+
+    public List<SqlServerChangeTable> getNewChangeTables(
+            String databaseName, Lsn fromLsn, Lsn toLsn) throws SQLException {
+        final String query = 
replaceDatabaseNamePlaceholder(GET_NEW_CHANGE_TABLES, databaseName);
+
+        return prepareQueryAndMap(
+                query,
+                ps -> {
+                    ps.setBytes(1, fromLsn.getBinary());
+                    ps.setBytes(2, toLsn.getBinary());
+                },
+                rs -> {
+                    final List<SqlServerChangeTable> changeTables = new 
ArrayList<>();
+                    while (rs.next()) {
+                        changeTables.add(
+                                new SqlServerChangeTable(
+                                        rs.getString(4),
+                                        rs.getInt(1),
+                                        Lsn.valueOf(rs.getBytes(5))));
+                    }
+                    return changeTables;
+                });
+    }
+
+    public Table getTableSchemaFromTable(String databaseName, 
SqlServerChangeTable changeTable)
+            throws SQLException {
+        final DatabaseMetaData metadata = connection().getMetaData();
+
+        List<Column> columns = new ArrayList<>();
+        try (ResultSet rs =
+                metadata.getColumns(
+                        databaseName,
+                        changeTable.getSourceTableId().schema(),
+                        changeTable.getSourceTableId().table(),
+                        null)) {
+            while (rs.next()) {
+                readTableColumn(rs, changeTable.getSourceTableId(), null)
+                        .ifPresent(
+                                ce -> {
+                                    // Filter out columns not included in the 
change table.
+                                    if 
(changeTable.getCapturedColumns().contains(ce.name())) {
+                                        columns.add(ce.create());
+                                    }
+                                });
+            }
+        }
+
+        final List<String> pkColumnNames =
+                readPrimaryKeyOrUniqueIndexNames(metadata, 
changeTable.getSourceTableId()).stream()
+                        .filter(column -> 
changeTable.getCapturedColumns().contains(column))
+                        .collect(Collectors.toList());
+        Collections.sort(columns);
+        return Table.editor()
+                .tableId(changeTable.getSourceTableId())
+                .addColumns(columns)
+                .setPrimaryKeyNames(pkColumnNames)
+                .create();
+    }
+
+    public String getNameOfChangeTable(String captureName) {
+        return captureName + "_CT";
+    }
+
+    /**
+     * Retrieve the name of the database in the original case as it's defined 
on the server.
+     *
+     * <p>Although SQL Server supports case-insensitive collations, the 
connector uses the database
+     * name to build the produced records' source info and, subsequently, the 
keys of its committed
+     * offset messages. This value must remain the same during the lifetime of 
the connector
+     * regardless of the case used in the connector configuration.
+     */
+    public String retrieveRealDatabaseName(String databaseName) {
+        try {
+            return prepareQueryAndMap(
+                    GET_DATABASE_NAME,
+                    ps -> ps.setString(1, databaseName),
+                    singleResultMapper(
+                            rs -> rs.getString(1), "Could not retrieve exactly 
one database name"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Couldn't obtain database name", e);
+        }
+    }
+
+    @Override
+    protected boolean isTableUniqueIndexIncluded(String indexName, String 
columnName) {
+        // SQL Server provides indices also without index name
+        // so we need to ignore them
+        return indexName != null;
+    }
+
+    @Override
+    public <T extends DatabaseSchema<TableId>> Object getColumnValue(
+            ResultSet rs, int columnIndex, Column column, Table table, T 
schema)
+            throws SQLException {
+        final ResultSetMetaData metaData = rs.getMetaData();
+        final int columnType = metaData.getColumnType(columnIndex);
+
+        if (columnType == Types.TIME) {
+            return rs.getTimestamp(columnIndex);
+        } else {
+            return super.getColumnValue(rs, columnIndex, column, table, 
schema);
+        }
+    }
+
+    @Override
+    public String buildSelectWithRowLimits(
+            TableId tableId,
+            int limit,
+            String projection,
+            Optional<String> condition,
+            String orderBy) {
+        final StringBuilder sql = new StringBuilder("SELECT TOP ");
+        sql.append(limit).append(' ').append(projection).append(" FROM ");
+        sql.append(quotedTableIdString(tableId));
+        if (condition.isPresent()) {
+            sql.append(" WHERE ").append(condition.get());
+        }
+        sql.append(" ORDER BY ").append(orderBy);
+        if (this.optionRecompile) {
+            sql.append(" OPTION(RECOMPILE)");
+        }
+        return sql.toString();
+    }
+
+    @Override
+    public String quotedTableIdString(TableId tableId) {
+        return "[" + tableId.catalog() + "].[" + tableId.schema() + "].[" + 
tableId.table() + "]";
+    }
+
+    private String replaceDatabaseNamePlaceholder(String sql, String 
databaseName) {
+        return sql.replace(DATABASE_NAME_PLACEHOLDER, databaseName);
+    }
+
+    public SqlServerDefaultValueConverter getDefaultValueConverter() {
+        return defaultValueConverter;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index dc6b42a456..7c759e2eda 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -40,7 +40,7 @@ import java.sql.SQLException;
 @Slf4j
 public class SqlServerCatalog extends AbstractJdbcCatalog {
 
-    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+    public static final String SELECT_COLUMNS_SQL_TEMPLATE =
             "SELECT tbl.name AS table_name,\n"
                     + "       col.name AS column_name,\n"
                     + "       ext.value AS comment,\n"
@@ -53,7 +53,7 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
                     + "       def.definition AS default_value\n"
                     + "FROM sys.tables tbl\n"
                     + "    INNER JOIN sys.columns col ON tbl.object_id = 
col.object_id\n"
-                    + "    LEFT JOIN sys.types types ON col.user_type_id = 
types.user_type_id\n"
+                    + "    LEFT JOIN sys.types types ON col.system_type_id = 
types.user_type_id\n"
                     + "    LEFT JOIN sys.extended_properties ext ON 
ext.major_id = col.object_id AND ext.minor_id = col.column_id\n"
                     + "    LEFT JOIN sys.default_constraints def ON 
col.default_object_id = def.object_id AND ext.minor_id = col.column_id AND 
ext.name = 'MS_Description'\n"
                     + "WHERE schema_name(tbl.schema_id) = '%s' %s\n"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 8a1814e6ae..eb891be771 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -113,7 +113,8 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                     + "  val_smalldatetime,\n"
                     + "  val_xml,\n"
                     + "  val_datetimeoffset,\n"
-                    + "  CONVERT(varchar(100), val_varbinary) as 
val_varbinary\n"
+                    + "  CONVERT(varchar(100), val_varbinary) as 
val_varbinary,\n"
+                    + "  val_udtdecimal\n"
                     + "from %s order by id asc";
     private static final String SELECT_SINK_SQL =
             "select\n"
@@ -142,7 +143,8 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                     + "  val_smalldatetime,\n"
                     + "  val_xml,\n"
                     + "  val_datetimeoffset,\n"
-                    + "  CONVERT(varchar(100), val_varbinary) as 
val_varbinary\n"
+                    + "  CONVERT(varchar(100), val_varbinary) as 
val_varbinary,\n"
+                    + "  val_udtdecimal\n"
                     + "from %s order by id asc";
 
     public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER =
@@ -347,7 +349,7 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                         + "                               1.123, 2, 3.323, 
4.323, 5.323, 6.323,\n"
                         + "                               1, 22, 333, 4444, 
55555,\n"
                         + "                               '2018-07-13', 
'10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 
14:23:45',\n"
-                        + "                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));");
+                        + "                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 
5.32);");
         executeSql(
                 "INSERT INTO "
                         + table
@@ -356,7 +358,7 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                         + "                               1.123, 2, 3.323, 
4.323, 5.323, 6.323,\n"
                         + "                               1, 22, 333, 4444, 
55555,\n"
                         + "                               '2018-07-13', 
'10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 
14:23:45',\n"
-                        + "                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));");
+                        + "                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 
5.32);");
 
         executeSql("DELETE FROM " + table + " where id = 2");
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
index 0c6aebe4fd..64ceebcaa8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
@@ -23,6 +23,8 @@ CREATE DATABASE column_type_test;
 USE column_type_test;
 EXEC sys.sp_cdc_enable_db;
 
+CREATE TYPE UDTDECIMAL FROM decimal(12, 2);
+
 CREATE TABLE full_types (
     id int NOT NULL,
     val_char char(3),
@@ -49,7 +51,8 @@ CREATE TABLE full_types (
     val_smalldatetime smalldatetime,
     val_xml xml,
     val_datetimeoffset DATETIMEOFFSET(4),
-    val_varbinary  varbinary(100)
+    val_varbinary  varbinary(100),
+    val_udtdecimal UDTDECIMAL,
     PRIMARY KEY (id)
 );
 INSERT INTO full_types VALUES (0,
@@ -57,19 +60,19 @@ INSERT INTO full_types VALUES (0,
                                1.123, 2, 3.323, 4.323, 5.323, 6.323,
                                1, 22, 333, 4444, 55555,
                                '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 INSERT INTO full_types VALUES (1,
                                'cč1', 'vcč', 'tč', N'cč', N'vcč', N'tč',
                                1.123, 2, 3.323, 4.323, 5.323, 6.323,
                                1, 22, 333, 4444, 55555,
                                '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 INSERT INTO full_types VALUES (2,
                                'cč2', 'vcč', 'tč', N'cč', N'vcč', N'tč',
                                1.123, 2, 3.323, 4.323, 5.323, 6.323,
                                1, 22, 333, 4444, 55555,
                                '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 
'full_types', @role_name = NULL, @supports_net_changes = 0;
 
 CREATE TABLE full_types_no_primary_key (
@@ -98,26 +101,27 @@ CREATE TABLE full_types_no_primary_key (
                             val_smalldatetime smalldatetime,
                             val_xml xml,
                             val_datetimeoffset DATETIMEOFFSET(4),
-                            val_varbinary  varbinary(100)
+                            val_varbinary  varbinary(100),
+                            val_udtdecimal UDTDECIMAL
 );
 INSERT INTO full_types_no_primary_key VALUES (0,
                                'cč0', 'vcč', 'tč', N'cč', N'vcč', N'tč',
                                1.123, 2, 3.323, 4.323, 5.323, 6.323,
                                1, 22, 333, 4444, 55555,
                                '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 INSERT INTO full_types_no_primary_key VALUES (1,
                                'cč1', 'vcč', 'tč', N'cč', N'vcč', N'tč',
                                1.123, 2, 3.323, 4.323, 5.323, 6.323,
                                1, 22, 333, 4444, 55555,
                                '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 INSERT INTO full_types_no_primary_key VALUES (2,
                                'cč2', 'vcč', 'tč', N'cč', N'vcč', N'tč',
                                1.123, 2, 3.323, 4.323, 5.323, 6.323,
                                1, 22, 333, 4444, 55555,
                                '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 
'full_types_no_primary_key', @role_name = NULL, @supports_net_changes = 0;
 
 CREATE TABLE full_types_custom_primary_key (
@@ -146,26 +150,27 @@ CREATE TABLE full_types_custom_primary_key (
                                            val_smalldatetime smalldatetime,
                                            val_xml xml,
                                            val_datetimeoffset 
DATETIMEOFFSET(4),
-                                           val_varbinary  varbinary(100)
+                                           val_varbinary  varbinary(100),
+                                           val_udtdecimal UDTDECIMAL
 );
 INSERT INTO full_types_custom_primary_key VALUES (0,
                                               'cč0', 'vcč', 'tč', N'cč', 
N'vcč', N'tč',
                                               1.123, 2, 3.323, 4.323, 5.323, 
6.323,
                                               1, 22, 333, 4444, 55555,
                                               '2018-07-13', '10:23:45', 
'2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 INSERT INTO full_types_custom_primary_key VALUES (1,
                                               'cč1', 'vcč', 'tč', N'cč', 
N'vcč', N'tč',
                                               1.123, 2, 3.323, 4.323, 5.323, 
6.323,
                                               1, 22, 333, 4444, 55555,
                                               '2018-07-13', '10:23:45', 
'2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 INSERT INTO full_types_custom_primary_key VALUES (2,
                                               'cč2', 'vcč', 'tč', N'cč', 
N'vcč', N'tč',
                                               1.123, 2, 3.323, 4.323, 5.323, 
6.323,
                                               1, 22, 333, 4444, 55555,
                                               '2018-07-13', '10:23:45', 
'2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
-                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)), 5.32);
 EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 
'full_types_custom_primary_key', @role_name = NULL, @supports_net_changes = 0;
 
 CREATE TABLE full_types_sink (
@@ -194,6 +199,7 @@ CREATE TABLE full_types_sink (
                             val_smalldatetime smalldatetime,
                             val_xml xml,
                             val_datetimeoffset DATETIMEOFFSET(4),
-                            val_varbinary  varbinary(100)
+                            val_varbinary  varbinary(100),
+                            val_udtdecimal UDTDECIMAL,
                             PRIMARY KEY (id)
 );
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index 5610f0963c..1b03044749 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -101,6 +101,7 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                     + "\tVARCHAR_TEST varchar(16) COLLATE Chinese_PRC_CS_AS 
NULL,\n"
                     + "\tVARCHAR_MAX_TEST varchar(MAX) COLLATE 
Chinese_PRC_CS_AS DEFAULT NULL NULL,\n"
                     + "\tXML_TEST xml NULL,\n"
+                    + "\tUDT_TEST UDTDECIMAL NULL,\n"
                     + "\tCONSTRAINT PK_TEST_INDEX PRIMARY KEY 
(INT_IDENTITY_TEST)\n"
                     + ");";
 
@@ -138,7 +139,8 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                     + "\tVARBINARY_MAX_TEST varbinary(MAX) NULL,\n"
                     + "\tVARCHAR_TEST varchar(16) COLLATE Chinese_PRC_CS_AS 
NULL,\n"
                     + "\tVARCHAR_MAX_TEST varchar(MAX) COLLATE 
Chinese_PRC_CS_AS DEFAULT NULL NULL,\n"
-                    + "\tXML_TEST xml NULL\n"
+                    + "\tXML_TEST xml NULL,\n"
+                    + "\tUDT_TEST UDTDECIMAL NULL\n"
                     + ");";
 
     private String username;
@@ -182,6 +184,18 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                 .build();
     }
 
+    @Override
+    protected void createSchemaIfNeeded() {
+        // create user-defined type
+        String sql = "CREATE TYPE UDTDECIMAL FROM decimal(12, 2);";
+        try {
+            connection.prepareStatement(sql).executeUpdate();
+        } catch (Exception e) {
+            throw new SeaTunnelRuntimeException(
+                    JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql 
" + sql, e);
+        }
+    }
+
     @Override
     String driverUrl() {
         return 
"https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar";;
@@ -223,6 +237,7 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                     "VARCHAR_TEST",
                     "VARCHAR_MAX_TEST",
                     "XML_TEST",
+                    "UDT_TEST"
                 };
 
         List<SeaTunnelRow> rows = new ArrayList<>();
@@ -262,6 +277,7 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                                 "VarCharValue" + i, // VARCHAR_TEST
                                 "VarCharMaxValue" + i, // VARCHAR_MAX_TEST
                                 "<xml>Test" + i + "</xml>", // XML_TEST
+                                new BigDecimal("123.45") // UDT_TEST
                             });
             rows.add(row);
         }

Reply via email to