This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d56bb1ba1 [Improve][Connector-V2][Jdbc-Source] Support for Decimal 
types as splict keys  (#4634)
d56bb1ba1 is described below

commit d56bb1ba1c5b3bdb7cab8689d9c0cb2fcc0c3e0f
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Jun 7 21:16:43 2023 +0800

    [Improve][Connector-V2][Jdbc-Source] Support for Decimal types as splict 
keys  (#4634)
    
    * [Improve][Connector-V2][Jdbc-Source]Support Compatible Mysql bigint(20) 
used as a partition_column #4634
    
    Co-authored-by: zhilinli <[email protected]>
---
 .../seatunnel/api/configuration/Options.java       |  5 ++
 .../seatunnel/jdbc/config/JdbcOptions.java         |  9 +-
 .../seatunnel/jdbc/config/JdbcSourceConfig.java    |  9 +-
 .../JdbcNumericBetweenParametersProvider.java      | 61 +++++++++-----
 .../seatunnel/jdbc/source/JdbcSourceFactory.java   | 23 +++--
 .../seatunnel/jdbc/source/PartitionParameter.java  |  5 +-
 .../connectors/seatunnel/jdbc/JdbcMysqlIT.java     | 97 ++++++++++++----------
 .../test/resources/jdbc_mysql_source_and_sink.conf |  4 +-
 .../jdbc_mysql_source_and_sink_parallel.conf       |  6 +-
 ...mysql_source_and_sink_parallel_upper_lower.conf | 10 +--
 10 files changed, 141 insertions(+), 88 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
index 02aa50c00..432e931c2 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -25,6 +25,7 @@ import lombok.NonNull;
 
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.math.BigDecimal;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -74,6 +75,10 @@ public class Options {
         public TypedOptionBuilder<Long> longType() {
             return new TypedOptionBuilder<>(key, new TypeReference<Long>() {});
         }
+        /** Defines that the value of the option should be of {@link 
BigDecimal} type. */
+        public TypedOptionBuilder<BigDecimal> bigDecimalType() {
+            return new TypedOptionBuilder<>(key, new 
TypeReference<BigDecimal>() {});
+        }
 
         /** Defines that the value of the option should be of {@link Float} 
type. */
         public TypedOptionBuilder<Float> floatType() {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index d297fbc9e..87b2a7b46 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.math.BigDecimal;
 import java.util.List;
 
 @SuppressWarnings("checkstyle:MagicNumber")
@@ -122,14 +123,14 @@ public interface JdbcOptions {
                     .noDefaultValue()
                     .withDescription("partition column");
 
-    Option<Long> PARTITION_UPPER_BOUND =
+    Option<BigDecimal> PARTITION_UPPER_BOUND =
             Options.key("partition_upper_bound")
-                    .longType()
+                    .bigDecimalType()
                     .noDefaultValue()
                     .withDescription("partition upper bound");
-    Option<Long> PARTITION_LOWER_BOUND =
+    Option<BigDecimal> PARTITION_LOWER_BOUND =
             Options.key("partition_lower_bound")
-                    .longType()
+                    .bigDecimalType()
                     .noDefaultValue()
                     .withDescription("partition lower bound");
     Option<Integer> PARTITION_NUM =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
index 7ff4da1f1..4c6221549 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
@@ -23,6 +23,7 @@ import lombok.Builder;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.util.Optional;
 
 @Data
@@ -33,8 +34,8 @@ public class JdbcSourceConfig implements Serializable {
     private JdbcConnectionConfig jdbcConnectionConfig;
     public String query;
     private String partitionColumn;
-    private Long partitionUpperBound;
-    private Long partitionLowerBound;
+    private BigDecimal partitionUpperBound;
+    private BigDecimal partitionLowerBound;
     private int fetchSize;
     private Integer partitionNumber;
 
@@ -60,11 +61,11 @@ public class JdbcSourceConfig implements Serializable {
         return Optional.ofNullable(partitionColumn);
     }
 
-    public Optional<Long> getPartitionUpperBound() {
+    public Optional<BigDecimal> getPartitionUpperBound() {
         return Optional.ofNullable(partitionUpperBound);
     }
 
-    public Optional<Long> getPartitionLowerBound() {
+    public Optional<BigDecimal> getPartitionLowerBound() {
         return Optional.ofNullable(partitionLowerBound);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
index b35ffab23..ced1d2831 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
@@ -39,8 +41,8 @@ import static com.google.common.base.Preconditions.checkState;
  */
 public class JdbcNumericBetweenParametersProvider implements 
JdbcParameterValuesProvider {
 
-    private final long minVal;
-    private final long maxVal;
+    private final BigDecimal minVal;
+    private final BigDecimal maxVal;
 
     private long batchSize;
     private int batchNum;
@@ -51,8 +53,8 @@ public class JdbcNumericBetweenParametersProvider implements 
JdbcParameterValues
      * @param minVal the lower bound of the produced "from" values
      * @param maxVal the upper bound of the produced "to" values
      */
-    public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
-        checkArgument(minVal <= maxVal, "minVal must not be larger than 
maxVal");
+    public JdbcNumericBetweenParametersProvider(BigDecimal minVal, BigDecimal 
maxVal) {
+        checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be 
larger than maxVal");
         this.minVal = minVal;
         this.maxVal = maxVal;
     }
@@ -64,8 +66,9 @@ public class JdbcNumericBetweenParametersProvider implements 
JdbcParameterValues
      * @param minVal the lower bound of the produced "from" values
      * @param maxVal the upper bound of the produced "to" values
      */
-    public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal, 
long maxVal) {
-        checkArgument(minVal <= maxVal, "minVal must not be larger than 
maxVal");
+    public JdbcNumericBetweenParametersProvider(
+            long fetchSize, BigDecimal minVal, BigDecimal maxVal) {
+        checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be 
larger than maxVal");
         this.minVal = minVal;
         this.maxVal = maxVal;
         ofBatchSize(fetchSize);
@@ -74,24 +77,33 @@ public class JdbcNumericBetweenParametersProvider 
implements JdbcParameterValues
     public JdbcNumericBetweenParametersProvider ofBatchSize(long batchSize) {
         checkArgument(batchSize > 0, "Batch size must be positive");
 
-        long maxElemCount = (maxVal - minVal) + 1;
-        if (batchSize > maxElemCount) {
-            batchSize = maxElemCount;
+        BigDecimal maxElemCount = 
(maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
+        if (BigDecimal.valueOf(batchSize).compareTo(maxElemCount) > 0) {
+            batchSize = maxElemCount.longValue();
         }
         this.batchSize = batchSize;
-        this.batchNum = new Double(Math.ceil((double) maxElemCount / 
batchSize)).intValue();
+        this.batchNum =
+                new Double(
+                                Math.ceil(
+                                        
(maxElemCount.divide(BigDecimal.valueOf(batchSize)))
+                                                .doubleValue()))
+                        .intValue();
         return this;
     }
 
     public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) {
         checkArgument(batchNum > 0, "Batch number must be positive");
 
-        long maxElemCount = (maxVal - minVal) + 1;
-        if (batchNum > maxElemCount) {
-            batchNum = (int) maxElemCount;
+        BigDecimal maxElemCount = 
(maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
+        if (BigDecimal.valueOf(batchNum).compareTo(maxElemCount) > 0) {
+            batchNum = maxElemCount.intValue();
         }
         this.batchNum = batchNum;
-        this.batchSize = new Double(Math.ceil((double) maxElemCount / 
batchNum)).longValue();
+        // For the presence of a decimal we take the integer up
+        this.batchSize =
+                (maxElemCount.divide(BigDecimal.valueOf(batchNum), 2, 
RoundingMode.HALF_UP))
+                        .setScale(0, RoundingMode.CEILING)
+                        .longValue();
         return this;
     }
 
@@ -101,15 +113,24 @@ public class JdbcNumericBetweenParametersProvider 
implements JdbcParameterValues
                 batchSize > 0,
                 "Batch size and batch number must be positive. Have you called 
`ofBatchSize` or `ofBatchNum`?");
 
-        long maxElemCount = (maxVal - minVal) + 1;
-        long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+        BigDecimal maxElemCount = 
(maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
+        BigDecimal bigBatchNum =
+                maxElemCount
+                        .subtract(BigDecimal.valueOf(batchSize - 1))
+                        .multiply(BigDecimal.valueOf(batchNum));
 
         Serializable[][] parameters = new Serializable[batchNum][2];
-        long start = minVal;
+        BigDecimal start = minVal;
         for (int i = 0; i < batchNum; i++) {
-            long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
-            parameters[i] = new Long[] {start, end};
-            start = end + 1;
+            BigDecimal end =
+                    start.add(BigDecimal.valueOf(batchSize))
+                            .subtract(BigDecimal.valueOf(1))
+                            .subtract(
+                                    
BigDecimal.valueOf(i).compareTo(bigBatchNum) >= 0
+                                            ? BigDecimal.ONE
+                                            : BigDecimal.ZERO);
+            parameters[i] = new BigDecimal[] {start, end};
+            start = end.add(BigDecimal.valueOf(1));
         }
         return parameters;
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index ee41f2e85..8f9605182 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -45,6 +46,7 @@ import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -135,8 +137,8 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
 
     static PartitionParameter createPartitionParameter(
             JdbcSourceConfig config, String columnName, Connection connection) 
{
-        long max = Long.MAX_VALUE;
-        long min = Long.MIN_VALUE;
+        BigDecimal max = null;
+        BigDecimal min = null;
         if (config.getPartitionLowerBound().isPresent()
                 && config.getPartitionUpperBound().isPresent()) {
             max = config.getPartitionUpperBound().get();
@@ -155,11 +157,11 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
                 max =
                         config.getPartitionUpperBound().isPresent()
                                 ? config.getPartitionUpperBound().get()
-                                : rs.getLong(1);
+                                : rs.getBigDecimal(1);
                 min =
                         config.getPartitionLowerBound().isPresent()
                                 ? config.getPartitionLowerBound().get()
-                                : rs.getLong(2);
+                                : rs.getBigDecimal(2);
             }
         } catch (SQLException e) {
             throw new PrepareFailException("jdbc", PluginType.SOURCE, 
e.toString());
@@ -200,7 +202,18 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
     }
 
     private static boolean isNumericType(SeaTunnelDataType<?> type) {
-        return type.equals(BasicType.INT_TYPE) || 
type.equals(BasicType.LONG_TYPE);
+        int scale = 1;
+        if (type instanceof DecimalType) {
+            scale = ((DecimalType) type).getScale() == 0 ? 0 : ((DecimalType) 
type).getScale();
+            if (scale != 0) {
+                throw new JdbcConnectorException(
+                        CommonErrorCode.ILLEGAL_ARGUMENT,
+                        String.format(
+                                "The current field is DecimalType containing 
decimals: %d Unable to support",
+                                scale));
+            }
+        }
+        return type.equals(BasicType.INT_TYPE) || 
type.equals(BasicType.LONG_TYPE) || scale == 0;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
index 14ba9f864..61677079e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
@@ -21,13 +21,14 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 
 @Data
 @AllArgsConstructor
 public class PartitionParameter implements Serializable {
 
     String partitionColumnName;
-    long minValue;
-    long maxValue;
+    BigDecimal minValue;
+    BigDecimal maxValue;
     Integer partitionNumber;
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 51bb36a2d..f4b1338b1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -64,47 +64,50 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
     private static final String CREATE_SQL =
             "CREATE TABLE IF NOT EXISTS %s\n"
                     + "(\n"
-                    + "    `c_bit_1`              bit(1)                
DEFAULT NULL,\n"
-                    + "    `c_bit_8`              bit(8)                
DEFAULT NULL,\n"
-                    + "    `c_bit_16`             bit(16)               
DEFAULT NULL,\n"
-                    + "    `c_bit_32`             bit(32)               
DEFAULT NULL,\n"
-                    + "    `c_bit_64`             bit(64)               
DEFAULT NULL,\n"
-                    + "    `c_boolean`            tinyint(1)            
DEFAULT NULL,\n"
-                    + "    `c_tinyint`            tinyint(4)            
DEFAULT NULL,\n"
-                    + "    `c_tinyint_unsigned`   tinyint(3) unsigned   
DEFAULT NULL,\n"
-                    + "    `c_smallint`           smallint(6)           
DEFAULT NULL,\n"
-                    + "    `c_smallint_unsigned`  smallint(5) unsigned  
DEFAULT NULL,\n"
-                    + "    `c_mediumint`          mediumint(9)          
DEFAULT NULL,\n"
-                    + "    `c_mediumint_unsigned` mediumint(8) unsigned 
DEFAULT NULL,\n"
-                    + "    `c_int`                int(11)               
DEFAULT NULL,\n"
-                    + "    `c_integer`            int(11)               
DEFAULT NULL,\n"
-                    + "    `c_bigint`             bigint(20)            
DEFAULT NULL,\n"
-                    + "    `c_bigint_unsigned`    bigint(20) unsigned   
DEFAULT NULL,\n"
-                    + "    `c_decimal`            decimal(20, 0)        
DEFAULT NULL,\n"
-                    + "    `c_decimal_unsigned`   decimal(38, 18)       
DEFAULT NULL,\n"
-                    + "    `c_float`              float                 
DEFAULT NULL,\n"
-                    + "    `c_float_unsigned`     float unsigned        
DEFAULT NULL,\n"
-                    + "    `c_double`             double                
DEFAULT NULL,\n"
-                    + "    `c_double_unsigned`    double unsigned       
DEFAULT NULL,\n"
-                    + "    `c_char`               char(1)               
DEFAULT NULL,\n"
-                    + "    `c_tinytext`           tinytext,\n"
-                    + "    `c_mediumtext`         mediumtext,\n"
-                    + "    `c_text`               text,\n"
-                    + "    `c_varchar`            varchar(255)          
DEFAULT NULL,\n"
-                    + "    `c_json`               json                  
DEFAULT NULL,\n"
-                    + "    `c_longtext`           longtext,\n"
-                    + "    `c_date`               date                  
DEFAULT NULL,\n"
-                    + "    `c_datetime`           datetime              
DEFAULT NULL,\n"
-                    + "    `c_timestamp`          timestamp NULL        
DEFAULT NULL,\n"
-                    + "    `c_tinyblob`           tinyblob,\n"
-                    + "    `c_mediumblob`         mediumblob,\n"
-                    + "    `c_blob`               blob,\n"
-                    + "    `c_longblob`           longblob,\n"
-                    + "    `c_varbinary`          varbinary(255)        
DEFAULT NULL,\n"
-                    + "    `c_binary`             binary(1)             
DEFAULT NULL,\n"
-                    + "    `c_year`               year(4)               
DEFAULT NULL,\n"
-                    + "    `c_int_unsigned`       int(10) unsigned      
DEFAULT NULL,\n"
-                    + "    `c_integer_unsigned`   int(10) unsigned      
DEFAULT NULL\n"
+                    + "    `c_bit_1`                bit(1)                
DEFAULT NULL,\n"
+                    + "    `c_bit_8`                bit(8)                
DEFAULT NULL,\n"
+                    + "    `c_bit_16`               bit(16)               
DEFAULT NULL,\n"
+                    + "    `c_bit_32`               bit(32)               
DEFAULT NULL,\n"
+                    + "    `c_bit_64`               bit(64)               
DEFAULT NULL,\n"
+                    + "    `c_boolean`              tinyint(1)            
DEFAULT NULL,\n"
+                    + "    `c_tinyint`              tinyint(4)            
DEFAULT NULL,\n"
+                    + "    `c_tinyint_unsigned`     tinyint(3) unsigned   
DEFAULT NULL,\n"
+                    + "    `c_smallint`             smallint(6)           
DEFAULT NULL,\n"
+                    + "    `c_smallint_unsigned`    smallint(5) unsigned  
DEFAULT NULL,\n"
+                    + "    `c_mediumint`            mediumint(9)          
DEFAULT NULL,\n"
+                    + "    `c_mediumint_unsigned`   mediumint(8) unsigned 
DEFAULT NULL,\n"
+                    + "    `c_int`                  int(11)               
DEFAULT NULL,\n"
+                    + "    `c_integer`              int(11)               
DEFAULT NULL,\n"
+                    + "    `c_bigint`               bigint(20)            
DEFAULT NULL,\n"
+                    + "    `c_bigint_unsigned`      bigint(20) unsigned   
DEFAULT NULL,\n"
+                    + "    `c_decimal`              decimal(20, 0)        
DEFAULT NULL,\n"
+                    + "    `c_decimal_unsigned`     decimal(38, 18)       
DEFAULT NULL,\n"
+                    + "    `c_float`                float                 
DEFAULT NULL,\n"
+                    + "    `c_float_unsigned`       float unsigned        
DEFAULT NULL,\n"
+                    + "    `c_double`               double                
DEFAULT NULL,\n"
+                    + "    `c_double_unsigned`      double unsigned       
DEFAULT NULL,\n"
+                    + "    `c_char`                 char(1)               
DEFAULT NULL,\n"
+                    + "    `c_tinytext`             tinytext,\n"
+                    + "    `c_mediumtext`           mediumtext,\n"
+                    + "    `c_text`                 text,\n"
+                    + "    `c_varchar`              varchar(255)          
DEFAULT NULL,\n"
+                    + "    `c_json`                 json                  
DEFAULT NULL,\n"
+                    + "    `c_longtext`             longtext,\n"
+                    + "    `c_date`                 date                  
DEFAULT NULL,\n"
+                    + "    `c_datetime`             datetime              
DEFAULT NULL,\n"
+                    + "    `c_timestamp`            timestamp NULL        
DEFAULT NULL,\n"
+                    + "    `c_tinyblob`             tinyblob,\n"
+                    + "    `c_mediumblob`           mediumblob,\n"
+                    + "    `c_blob`                 blob,\n"
+                    + "    `c_longblob`             longblob,\n"
+                    + "    `c_varbinary`            varbinary(255)        
DEFAULT NULL,\n"
+                    + "    `c_binary`               binary(1)             
DEFAULT NULL,\n"
+                    + "    `c_year`                 year(4)               
DEFAULT NULL,\n"
+                    + "    `c_int_unsigned`         int(10) unsigned      
DEFAULT NULL,\n"
+                    + "    `c_integer_unsigned`     int(10) unsigned      
DEFAULT NULL,\n"
+                    + "    `c_bigint_30`            BIGINT(40)  unsigned  
DEFAULT NULL,\n"
+                    + "    `c_decimal_unsigned_30`  DECIMAL(30) unsigned  
DEFAULT NULL,\n"
+                    + "    `c_decimal_30`           DECIMAL(30)           
DEFAULT NULL\n"
                     + ");";
 
     @Override
@@ -190,10 +193,15 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
                     "c_blob",
                     "c_longblob",
                     "c_varbinary",
-                    "c_binary"
+                    "c_binary",
+                    "c_bigint_30",
+                    "c_decimal_unsigned_30",
+                    "c_decimal_30",
                 };
 
         List<SeaTunnelRow> rows = new ArrayList<>();
+        BigDecimal bigintValue = new BigDecimal("2844674407371055000");
+        BigDecimal decimalValue = new 
BigDecimal("999999999999999999999999999899");
         for (int i = 0; i < 100; i++) {
             byte byteArr = Integer.valueOf(i).byteValue();
             SeaTunnelRow row =
@@ -242,7 +250,10 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
                                 "test".getBytes(),
                                 "test".getBytes(),
                                 "test".getBytes(),
-                                "f".getBytes()
+                                "f".getBytes(),
+                                bigintValue.add(BigDecimal.valueOf(i)),
+                                decimalValue.add(BigDecimal.valueOf(i)),
+                                decimalValue.add(BigDecimal.valueOf(i)),
                             });
             rows.add(row);
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
index 00871f0f0..b91ee9d31 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
@@ -47,7 +47,7 @@ sink {
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
                                                 c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
-                                                c_binary, c_year, 
c_int_unsigned, c_integer_unsigned)
-                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
+                                                c_binary, c_year, 
c_int_unsigned, 
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?);"""
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
index bd1ad4f54..c393e69ce 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
@@ -27,7 +27,7 @@ source {
     user = "root"
     password = "Abc!@#135_seatunnel"
     query = "select * from source"
-    partition_column = "c_integer"
+    partition_column = "c_decimal_unsigned_30"
     partition_num = 3
 
     result_table_name = "jdbc"
@@ -50,7 +50,7 @@ sink {
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
                                                 c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
-                                                c_binary, c_year, 
c_int_unsigned, c_integer_unsigned)
-                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
+                                                c_binary, c_year, 
c_int_unsigned, 
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?);"""
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index a9e54d7ac..0460ccdf3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -27,10 +27,10 @@ source {
     user = "root"
     password = "Abc!@#135_seatunnel"
     query = "select * from source"
-    partition_column = "c_int"
+    partition_column = "c_bigint_30"
     result_table_name = "jdbc"
-    partition_lower_bound = 1
-    partition_upper_bound = 50
+    partition_lower_bound = 2844674407371055160
+    partition_upper_bound = 2844674407371055259
     partition_num = 5
   }
 }
@@ -51,7 +51,7 @@ sink {
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
                                                 c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
-                                                c_binary, c_year, 
c_int_unsigned, c_integer_unsigned)
-                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
+                                                c_binary, c_year, 
c_int_unsigned, 
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?);"""
   }
 }
\ No newline at end of file

Reply via email to