This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 840c3e5eb Add DataTypeConvertor in Catalog (#4094)
840c3e5eb is described below

commit 840c3e5eb47d222c82dfe054182790b47c4fca30
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Feb 10 22:14:41 2023 +0800

    Add DataTypeConvertor in Catalog (#4094)
---
 .../seatunnel/api/table/catalog/Catalog.java       |  5 +++
 .../table/catalog/DataTypeConvertException.java    | 52 ++++++++++++++++++++++
 .../api/table/catalog/DataTypeConvertor.java       | 50 +++++++++++++++++++++
 .../seatunnel/jdbc/catalog/MySqlCatalog.java       | 15 +++++--
 .../MysqlDataTypeConvertor.java}                   | 41 +++++++++++++----
 .../catalog/sql/MysqlCreateTableSqlBuilder.java    |  4 +-
 .../jdbc/catalog/MysqlDataTypeConvertorTest.java   | 37 +++++++++++++++
 7 files changed, 189 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 90a443d03..42e74ff8d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -146,4 +146,9 @@ public interface Catalog {
 
     // todo: Support for update table metadata
 
+
+    /**
+     * Return a {@link DataTypeConvertor} used to convert the data type 
between SeaTunnel and the connector.
+     */
+    DataTypeConvertor<?> getDataTypeConvertor();
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
new file mode 100644
index 000000000..a4e26dd06
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class DataTypeConvertException extends SeaTunnelRuntimeException {
+    private static final String CONVERT_TO_SEA_TUNNEL_ERROR_MSG = "Convert 
type: %s to SeaTunnel data type error.";
+
+    private static final String CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG = 
"Convert SeaTunnel data type: %s to connector data type error.";
+
+    public DataTypeConvertException(String message) {
+        this(message, null);
+    }
+
+    public DataTypeConvertException(String message, Throwable cause) {
+        super(CommonErrorCode.UNSUPPORTED_DATA_TYPE, message, cause);
+    }
+
+    public static DataTypeConvertException 
convertToSeaTunnelDataTypeException(Object dataType) {
+        return new 
DataTypeConvertException(String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, 
dataType));
+    }
+
+    public static DataTypeConvertException 
convertToSeaTunnelDataTypeException(Object dataType, Throwable cause) {
+        return new 
DataTypeConvertException(String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG, 
dataType), cause);
+    }
+
+    public static DataTypeConvertException 
convertToConnectorDataTypeException(SeaTunnelDataType<?> seaTunnelDataType) {
+        return new 
DataTypeConvertException(String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG,
 seaTunnelDataType));
+    }
+
+    public static DataTypeConvertException 
convertToConnectorDataTypeException(SeaTunnelDataType<?> seaTunnelDataType, 
Throwable cause) {
+        return new 
DataTypeConvertException(String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG,
 seaTunnelDataType), cause);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
new file mode 100644
index 000000000..fd51ce001
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.Map;
+
+/**
+ * DataTypeConvertor is used to convert the data type between connector and 
SeaTunnel.
+ */
+public interface DataTypeConvertor<T> {
+
+    /**
+     * Transfer the data type from connector to SeaTunnel.
+     *
+     * @param t                  origin data type
+     * @param dataTypeProperties origin data type properties, e.g. precision, 
scale, length
+     * @return SeaTunnel data type
+     */
+    // todo: If the origin data type contains the properties, we can remove 
the dataTypeProperties.
+    SeaTunnelDataType<?> toSeaTunnelType(T t, Map<String, Object> 
dataTypeProperties) throws DataTypeConvertException;
+
+    /**
+     * Transfer the data type from SeaTunnel to connector.
+     *
+     * @param seaTunnelDataType  seaTunnel data type
+     * @param dataTypeProperties seaTunnel data type properties, e.g. 
precision, scale, length
+     * @return origin data type
+     */
+    // todo: If the SeaTunnel data type contains the properties, we can remove 
the dataTypeProperties.
+    T toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, 
Object> dataTypeProperties) throws DataTypeConvertException;
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 8683d571f..7456f5f9c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
@@ -30,7 +31,6 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql.MysqlCreateTableSqlBuilder;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
 
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
@@ -156,6 +156,11 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         }
     }
 
+    @Override
+    public DataTypeConvertor<MysqlType> getDataTypeConvertor() {
+        return MysqlDataTypeConvertor.getInstance();
+    }
+
     // todo: If the origin source is mysql, we can directly use create table 
like to create the target table?
     @Override
     protected boolean createTableInternal(TablePath tablePath, CatalogTable 
table) throws CatalogException {
@@ -208,11 +213,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
      * @see com.mysql.cj.MysqlType
      * @see ResultSetImpl#getObjectStoredProc(int, int)
      */
+    @SuppressWarnings("unchecked")
     private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex) throws SQLException {
-        int precision = metadata.getPrecision(colIndex);
-        int scale = metadata.getScale(colIndex);
         MysqlType mysqlType = 
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
-        return DataTypeUtils.toSeaTunnelDataType(mysqlType, precision, scale);
+        Map<String, Object> dataTypeProperties = new HashMap<>();
+        dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, 
metadata.getPrecision(colIndex));
+        dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, 
metadata.getScale(colIndex));
+        return getDataTypeConvertor().toSeaTunnelType(mysqlType, 
dataTypeProperties);
     }
 
     @SuppressWarnings("MagicNumber")
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
similarity index 73%
rename from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
rename to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
index 83af004f5..247177033 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
@@ -15,8 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -27,14 +29,24 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 
 import com.mysql.cj.MysqlType;
-import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.MapUtils;
 
-@Slf4j
-@UtilityClass
-public class DataTypeUtils {
+import java.util.Map;
 
-    public static SeaTunnelDataType<?> toSeaTunnelDataType(MysqlType 
mysqlType, int precision, int scale) {
+public class MysqlDataTypeConvertor implements DataTypeConvertor<MysqlType> {
+
+    private MysqlDataTypeConvertor() {
+
+    }
+
+    private static final MysqlDataTypeConvertor INSTANCE = new 
MysqlDataTypeConvertor();
+
+    public static final String PRECISION = "precision";
+    public static final String SCALE = "scale";
+
+    // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType 
doesn't contains properties.
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(MysqlType mysqlType, 
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
         switch (mysqlType) {
             case NULL:
                 return BasicType.VOID_TYPE;
@@ -88,14 +100,21 @@ public class DataTypeUtils {
             case BIGINT_UNSIGNED:
             case DECIMAL:
             case DECIMAL_UNSIGNED:
+                Integer precision = MapUtils.getInteger(dataTypeProperties, 
PRECISION);
+                Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE);
+                if (precision == null || scale == null) {
+                    throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType,
+                        new IllegalArgumentException("Decimal type must have 
precision and scale"));
+                }
                 return new DecimalType(precision, scale);
             // TODO: support 'SET' & 'YEAR' type
             default:
-                throw new 
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
+                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
         }
     }
 
-    public MysqlType toMysqlType(SeaTunnelDataType<?> seaTunnelDataType) {
+    @Override
+    public MysqlType toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, 
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
         SqlType sqlType = seaTunnelDataType.getSqlType();
         // todo: verify
         switch (sqlType) {
@@ -135,4 +154,8 @@ public class DataTypeUtils {
 
         }
     }
+
+    public static MysqlDataTypeConvertor getInstance() {
+        return INSTANCE;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
index c06b93160..af54696dc 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MysqlDataTypeConvertor;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -147,7 +147,7 @@ public class MysqlCreateTableSqlBuilder {
         // Column name
         columnSqls.add(column.getName());
         // Column type
-        
columnSqls.add(DataTypeUtils.toMysqlType(column.getDataType()).getName());
+        
columnSqls.add(MysqlDataTypeConvertor.getInstance().toConnectorType(column.getDataType(),
 null).getName());
         // Column length
         if (column.getColumnLength() != null) {
             columnSqls.add("(" + column.getColumnLength() + ")");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
new file mode 100644
index 000000000..936e7ea12
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import com.mysql.cj.MysqlType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+public class MysqlDataTypeConvertorTest {
+
+    private MysqlDataTypeConvertor mysqlDataTypeConvertor = 
MysqlDataTypeConvertor.getInstance();
+
+    @Test
+    public void from() {
+        Assertions.assertEquals(BasicType.VOID_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR, 
Collections.emptyMap()));
+    }
+}

Reply via email to