This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d56bb1ba1 [Improve][Connector-V2][Jdbc-Source] Support for Decimal
types as splict keys (#4634)
d56bb1ba1 is described below
commit d56bb1ba1c5b3bdb7cab8689d9c0cb2fcc0c3e0f
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Jun 7 21:16:43 2023 +0800
[Improve][Connector-V2][Jdbc-Source] Support for Decimal types as splict
keys (#4634)
* [Improve][Connector-V2][Jdbc-Source]Support Compatible Mysql bigint(20)
used as a partition_column #4634
Co-authored-by: zhilinli <[email protected]>
---
.../seatunnel/api/configuration/Options.java | 5 ++
.../seatunnel/jdbc/config/JdbcOptions.java | 9 +-
.../seatunnel/jdbc/config/JdbcSourceConfig.java | 9 +-
.../JdbcNumericBetweenParametersProvider.java | 61 +++++++++-----
.../seatunnel/jdbc/source/JdbcSourceFactory.java | 23 +++--
.../seatunnel/jdbc/source/PartitionParameter.java | 5 +-
.../connectors/seatunnel/jdbc/JdbcMysqlIT.java | 97 ++++++++++++----------
.../test/resources/jdbc_mysql_source_and_sink.conf | 4 +-
.../jdbc_mysql_source_and_sink_parallel.conf | 6 +-
...mysql_source_and_sink_parallel_upper_lower.conf | 10 +--
10 files changed, 141 insertions(+), 88 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
index 02aa50c00..432e931c2 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -25,6 +25,7 @@ import lombok.NonNull;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.math.BigDecimal;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -74,6 +75,10 @@ public class Options {
public TypedOptionBuilder<Long> longType() {
return new TypedOptionBuilder<>(key, new TypeReference<Long>() {});
}
+ /** Defines that the value of the option should be of {@link
BigDecimal} type. */
+ public TypedOptionBuilder<BigDecimal> bigDecimalType() {
+ return new TypedOptionBuilder<>(key, new
TypeReference<BigDecimal>() {});
+ }
/** Defines that the value of the option should be of {@link Float}
type. */
public TypedOptionBuilder<Float> floatType() {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index d297fbc9e..87b2a7b46 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.math.BigDecimal;
import java.util.List;
@SuppressWarnings("checkstyle:MagicNumber")
@@ -122,14 +123,14 @@ public interface JdbcOptions {
.noDefaultValue()
.withDescription("partition column");
- Option<Long> PARTITION_UPPER_BOUND =
+ Option<BigDecimal> PARTITION_UPPER_BOUND =
Options.key("partition_upper_bound")
- .longType()
+ .bigDecimalType()
.noDefaultValue()
.withDescription("partition upper bound");
- Option<Long> PARTITION_LOWER_BOUND =
+ Option<BigDecimal> PARTITION_LOWER_BOUND =
Options.key("partition_lower_bound")
- .longType()
+ .bigDecimalType()
.noDefaultValue()
.withDescription("partition lower bound");
Option<Integer> PARTITION_NUM =
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
index 7ff4da1f1..4c6221549 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
@@ -23,6 +23,7 @@ import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.util.Optional;
@Data
@@ -33,8 +34,8 @@ public class JdbcSourceConfig implements Serializable {
private JdbcConnectionConfig jdbcConnectionConfig;
public String query;
private String partitionColumn;
- private Long partitionUpperBound;
- private Long partitionLowerBound;
+ private BigDecimal partitionUpperBound;
+ private BigDecimal partitionLowerBound;
private int fetchSize;
private Integer partitionNumber;
@@ -60,11 +61,11 @@ public class JdbcSourceConfig implements Serializable {
return Optional.ofNullable(partitionColumn);
}
- public Optional<Long> getPartitionUpperBound() {
+ public Optional<BigDecimal> getPartitionUpperBound() {
return Optional.ofNullable(partitionUpperBound);
}
- public Optional<Long> getPartitionLowerBound() {
+ public Optional<BigDecimal> getPartitionLowerBound() {
return Optional.ofNullable(partitionLowerBound);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
index b35ffab23..ced1d2831 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -39,8 +41,8 @@ import static com.google.common.base.Preconditions.checkState;
*/
public class JdbcNumericBetweenParametersProvider implements
JdbcParameterValuesProvider {
- private final long minVal;
- private final long maxVal;
+ private final BigDecimal minVal;
+ private final BigDecimal maxVal;
private long batchSize;
private int batchNum;
@@ -51,8 +53,8 @@ public class JdbcNumericBetweenParametersProvider implements
JdbcParameterValues
* @param minVal the lower bound of the produced "from" values
* @param maxVal the upper bound of the produced "to" values
*/
- public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
- checkArgument(minVal <= maxVal, "minVal must not be larger than
maxVal");
+ public JdbcNumericBetweenParametersProvider(BigDecimal minVal, BigDecimal
maxVal) {
+ checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be
larger than maxVal");
this.minVal = minVal;
this.maxVal = maxVal;
}
@@ -64,8 +66,9 @@ public class JdbcNumericBetweenParametersProvider implements
JdbcParameterValues
* @param minVal the lower bound of the produced "from" values
* @param maxVal the upper bound of the produced "to" values
*/
- public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal,
long maxVal) {
- checkArgument(minVal <= maxVal, "minVal must not be larger than
maxVal");
+ public JdbcNumericBetweenParametersProvider(
+ long fetchSize, BigDecimal minVal, BigDecimal maxVal) {
+ checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be
larger than maxVal");
this.minVal = minVal;
this.maxVal = maxVal;
ofBatchSize(fetchSize);
@@ -74,24 +77,33 @@ public class JdbcNumericBetweenParametersProvider
implements JdbcParameterValues
public JdbcNumericBetweenParametersProvider ofBatchSize(long batchSize) {
checkArgument(batchSize > 0, "Batch size must be positive");
- long maxElemCount = (maxVal - minVal) + 1;
- if (batchSize > maxElemCount) {
- batchSize = maxElemCount;
+ BigDecimal maxElemCount =
(maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
+ if (BigDecimal.valueOf(batchSize).compareTo(maxElemCount) > 0) {
+ batchSize = maxElemCount.longValue();
}
this.batchSize = batchSize;
- this.batchNum = new Double(Math.ceil((double) maxElemCount /
batchSize)).intValue();
+ this.batchNum =
+ new Double(
+ Math.ceil(
+
(maxElemCount.divide(BigDecimal.valueOf(batchSize)))
+ .doubleValue()))
+ .intValue();
return this;
}
public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) {
checkArgument(batchNum > 0, "Batch number must be positive");
- long maxElemCount = (maxVal - minVal) + 1;
- if (batchNum > maxElemCount) {
- batchNum = (int) maxElemCount;
+ BigDecimal maxElemCount =
(maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
+ if (BigDecimal.valueOf(batchNum).compareTo(maxElemCount) > 0) {
+ batchNum = maxElemCount.intValue();
}
this.batchNum = batchNum;
- this.batchSize = new Double(Math.ceil((double) maxElemCount /
batchNum)).longValue();
+ // For the presence of a decimal we take the integer up
+ this.batchSize =
+ (maxElemCount.divide(BigDecimal.valueOf(batchNum), 2,
RoundingMode.HALF_UP))
+ .setScale(0, RoundingMode.CEILING)
+ .longValue();
return this;
}
@@ -101,15 +113,24 @@ public class JdbcNumericBetweenParametersProvider
implements JdbcParameterValues
batchSize > 0,
"Batch size and batch number must be positive. Have you called
`ofBatchSize` or `ofBatchNum`?");
- long maxElemCount = (maxVal - minVal) + 1;
- long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+ BigDecimal maxElemCount =
(maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
+ BigDecimal bigBatchNum =
+ maxElemCount
+ .subtract(BigDecimal.valueOf(batchSize - 1))
+ .multiply(BigDecimal.valueOf(batchNum));
Serializable[][] parameters = new Serializable[batchNum][2];
- long start = minVal;
+ BigDecimal start = minVal;
for (int i = 0; i < batchNum; i++) {
- long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
- parameters[i] = new Long[] {start, end};
- start = end + 1;
+ BigDecimal end =
+ start.add(BigDecimal.valueOf(batchSize))
+ .subtract(BigDecimal.valueOf(1))
+ .subtract(
+
BigDecimal.valueOf(i).compareTo(bigBatchNum) >= 0
+ ? BigDecimal.ONE
+ : BigDecimal.ZERO);
+ parameters[i] = new BigDecimal[] {start, end};
+ start = end.add(BigDecimal.valueOf(1));
}
return parameters;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index ee41f2e85..8f9605182 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
@@ -45,6 +46,7 @@ import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -135,8 +137,8 @@ public class JdbcSourceFactory implements
TableSourceFactory {
static PartitionParameter createPartitionParameter(
JdbcSourceConfig config, String columnName, Connection connection)
{
- long max = Long.MAX_VALUE;
- long min = Long.MIN_VALUE;
+ BigDecimal max = null;
+ BigDecimal min = null;
if (config.getPartitionLowerBound().isPresent()
&& config.getPartitionUpperBound().isPresent()) {
max = config.getPartitionUpperBound().get();
@@ -155,11 +157,11 @@ public class JdbcSourceFactory implements
TableSourceFactory {
max =
config.getPartitionUpperBound().isPresent()
? config.getPartitionUpperBound().get()
- : rs.getLong(1);
+ : rs.getBigDecimal(1);
min =
config.getPartitionLowerBound().isPresent()
? config.getPartitionLowerBound().get()
- : rs.getLong(2);
+ : rs.getBigDecimal(2);
}
} catch (SQLException e) {
throw new PrepareFailException("jdbc", PluginType.SOURCE,
e.toString());
@@ -200,7 +202,18 @@ public class JdbcSourceFactory implements
TableSourceFactory {
}
private static boolean isNumericType(SeaTunnelDataType<?> type) {
- return type.equals(BasicType.INT_TYPE) ||
type.equals(BasicType.LONG_TYPE);
+ int scale = 1;
+ if (type instanceof DecimalType) {
+ scale = ((DecimalType) type).getScale() == 0 ? 0 : ((DecimalType)
type).getScale();
+ if (scale != 0) {
+ throw new JdbcConnectorException(
+ CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format(
+ "The current field is DecimalType containing
decimals: %d Unable to support",
+ scale));
+ }
+ }
+ return type.equals(BasicType.INT_TYPE) ||
type.equals(BasicType.LONG_TYPE) || scale == 0;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
index 14ba9f864..61677079e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
@@ -21,13 +21,14 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.math.BigDecimal;
@Data
@AllArgsConstructor
public class PartitionParameter implements Serializable {
String partitionColumnName;
- long minValue;
- long maxValue;
+ BigDecimal minValue;
+ BigDecimal maxValue;
Integer partitionNumber;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 51bb36a2d..f4b1338b1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -64,47 +64,50 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
private static final String CREATE_SQL =
"CREATE TABLE IF NOT EXISTS %s\n"
+ "(\n"
- + " `c_bit_1` bit(1)
DEFAULT NULL,\n"
- + " `c_bit_8` bit(8)
DEFAULT NULL,\n"
- + " `c_bit_16` bit(16)
DEFAULT NULL,\n"
- + " `c_bit_32` bit(32)
DEFAULT NULL,\n"
- + " `c_bit_64` bit(64)
DEFAULT NULL,\n"
- + " `c_boolean` tinyint(1)
DEFAULT NULL,\n"
- + " `c_tinyint` tinyint(4)
DEFAULT NULL,\n"
- + " `c_tinyint_unsigned` tinyint(3) unsigned
DEFAULT NULL,\n"
- + " `c_smallint` smallint(6)
DEFAULT NULL,\n"
- + " `c_smallint_unsigned` smallint(5) unsigned
DEFAULT NULL,\n"
- + " `c_mediumint` mediumint(9)
DEFAULT NULL,\n"
- + " `c_mediumint_unsigned` mediumint(8) unsigned
DEFAULT NULL,\n"
- + " `c_int` int(11)
DEFAULT NULL,\n"
- + " `c_integer` int(11)
DEFAULT NULL,\n"
- + " `c_bigint` bigint(20)
DEFAULT NULL,\n"
- + " `c_bigint_unsigned` bigint(20) unsigned
DEFAULT NULL,\n"
- + " `c_decimal` decimal(20, 0)
DEFAULT NULL,\n"
- + " `c_decimal_unsigned` decimal(38, 18)
DEFAULT NULL,\n"
- + " `c_float` float
DEFAULT NULL,\n"
- + " `c_float_unsigned` float unsigned
DEFAULT NULL,\n"
- + " `c_double` double
DEFAULT NULL,\n"
- + " `c_double_unsigned` double unsigned
DEFAULT NULL,\n"
- + " `c_char` char(1)
DEFAULT NULL,\n"
- + " `c_tinytext` tinytext,\n"
- + " `c_mediumtext` mediumtext,\n"
- + " `c_text` text,\n"
- + " `c_varchar` varchar(255)
DEFAULT NULL,\n"
- + " `c_json` json
DEFAULT NULL,\n"
- + " `c_longtext` longtext,\n"
- + " `c_date` date
DEFAULT NULL,\n"
- + " `c_datetime` datetime
DEFAULT NULL,\n"
- + " `c_timestamp` timestamp NULL
DEFAULT NULL,\n"
- + " `c_tinyblob` tinyblob,\n"
- + " `c_mediumblob` mediumblob,\n"
- + " `c_blob` blob,\n"
- + " `c_longblob` longblob,\n"
- + " `c_varbinary` varbinary(255)
DEFAULT NULL,\n"
- + " `c_binary` binary(1)
DEFAULT NULL,\n"
- + " `c_year` year(4)
DEFAULT NULL,\n"
- + " `c_int_unsigned` int(10) unsigned
DEFAULT NULL,\n"
- + " `c_integer_unsigned` int(10) unsigned
DEFAULT NULL\n"
+ + " `c_bit_1` bit(1)
DEFAULT NULL,\n"
+ + " `c_bit_8` bit(8)
DEFAULT NULL,\n"
+ + " `c_bit_16` bit(16)
DEFAULT NULL,\n"
+ + " `c_bit_32` bit(32)
DEFAULT NULL,\n"
+ + " `c_bit_64` bit(64)
DEFAULT NULL,\n"
+ + " `c_boolean` tinyint(1)
DEFAULT NULL,\n"
+ + " `c_tinyint` tinyint(4)
DEFAULT NULL,\n"
+ + " `c_tinyint_unsigned` tinyint(3) unsigned
DEFAULT NULL,\n"
+ + " `c_smallint` smallint(6)
DEFAULT NULL,\n"
+ + " `c_smallint_unsigned` smallint(5) unsigned
DEFAULT NULL,\n"
+ + " `c_mediumint` mediumint(9)
DEFAULT NULL,\n"
+ + " `c_mediumint_unsigned` mediumint(8) unsigned
DEFAULT NULL,\n"
+ + " `c_int` int(11)
DEFAULT NULL,\n"
+ + " `c_integer` int(11)
DEFAULT NULL,\n"
+ + " `c_bigint` bigint(20)
DEFAULT NULL,\n"
+ + " `c_bigint_unsigned` bigint(20) unsigned
DEFAULT NULL,\n"
+ + " `c_decimal` decimal(20, 0)
DEFAULT NULL,\n"
+ + " `c_decimal_unsigned` decimal(38, 18)
DEFAULT NULL,\n"
+ + " `c_float` float
DEFAULT NULL,\n"
+ + " `c_float_unsigned` float unsigned
DEFAULT NULL,\n"
+ + " `c_double` double
DEFAULT NULL,\n"
+ + " `c_double_unsigned` double unsigned
DEFAULT NULL,\n"
+ + " `c_char` char(1)
DEFAULT NULL,\n"
+ + " `c_tinytext` tinytext,\n"
+ + " `c_mediumtext` mediumtext,\n"
+ + " `c_text` text,\n"
+ + " `c_varchar` varchar(255)
DEFAULT NULL,\n"
+ + " `c_json` json
DEFAULT NULL,\n"
+ + " `c_longtext` longtext,\n"
+ + " `c_date` date
DEFAULT NULL,\n"
+ + " `c_datetime` datetime
DEFAULT NULL,\n"
+ + " `c_timestamp` timestamp NULL
DEFAULT NULL,\n"
+ + " `c_tinyblob` tinyblob,\n"
+ + " `c_mediumblob` mediumblob,\n"
+ + " `c_blob` blob,\n"
+ + " `c_longblob` longblob,\n"
+ + " `c_varbinary` varbinary(255)
DEFAULT NULL,\n"
+ + " `c_binary` binary(1)
DEFAULT NULL,\n"
+ + " `c_year` year(4)
DEFAULT NULL,\n"
+ + " `c_int_unsigned` int(10) unsigned
DEFAULT NULL,\n"
+ + " `c_integer_unsigned` int(10) unsigned
DEFAULT NULL,\n"
+ + " `c_bigint_30` BIGINT(40) unsigned
DEFAULT NULL,\n"
+ + " `c_decimal_unsigned_30` DECIMAL(30) unsigned
DEFAULT NULL,\n"
+ + " `c_decimal_30` DECIMAL(30)
DEFAULT NULL\n"
+ ");";
@Override
@@ -190,10 +193,15 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
"c_blob",
"c_longblob",
"c_varbinary",
- "c_binary"
+ "c_binary",
+ "c_bigint_30",
+ "c_decimal_unsigned_30",
+ "c_decimal_30",
};
List<SeaTunnelRow> rows = new ArrayList<>();
+ BigDecimal bigintValue = new BigDecimal("2844674407371055000");
+ BigDecimal decimalValue = new
BigDecimal("999999999999999999999999999899");
for (int i = 0; i < 100; i++) {
byte byteArr = Integer.valueOf(i).byteValue();
SeaTunnelRow row =
@@ -242,7 +250,10 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
"test".getBytes(),
"test".getBytes(),
"test".getBytes(),
- "f".getBytes()
+ "f".getBytes(),
+ bigintValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
});
rows.add(row);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
index 00871f0f0..b91ee9d31 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
@@ -47,7 +47,7 @@ sink {
c_decimal, c_decimal_unsigned,
c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext,
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp,
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
- c_binary, c_year,
c_int_unsigned, c_integer_unsigned)
- values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
+ c_binary, c_year,
c_int_unsigned,
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
+ values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?);"""
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
index bd1ad4f54..c393e69ce 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
@@ -27,7 +27,7 @@ source {
user = "root"
password = "Abc!@#135_seatunnel"
query = "select * from source"
- partition_column = "c_integer"
+ partition_column = "c_decimal_unsigned_30"
partition_num = 3
result_table_name = "jdbc"
@@ -50,7 +50,7 @@ sink {
c_decimal, c_decimal_unsigned,
c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext,
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp,
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
- c_binary, c_year,
c_int_unsigned, c_integer_unsigned)
- values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
+ c_binary, c_year,
c_int_unsigned,
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
+ values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?);"""
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index a9e54d7ac..0460ccdf3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -27,10 +27,10 @@ source {
user = "root"
password = "Abc!@#135_seatunnel"
query = "select * from source"
- partition_column = "c_int"
+ partition_column = "c_bigint_30"
result_table_name = "jdbc"
- partition_lower_bound = 1
- partition_upper_bound = 50
+ partition_lower_bound = 2844674407371055160
+ partition_upper_bound = 2844674407371055259
partition_num = 5
}
}
@@ -51,7 +51,7 @@ sink {
c_decimal, c_decimal_unsigned,
c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext,
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp,
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
- c_binary, c_year,
c_int_unsigned, c_integer_unsigned)
- values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
+ c_binary, c_year,
c_int_unsigned,
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
+ values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?);"""
}
}
\ No newline at end of file