This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 696f2948fa [Feature][Connector-V2] Add `decimal_type_narrowing` option
in jdbc (#7461)
696f2948fa is described below
commit 696f2948fa618e84b0c715f44ea7cfdab1d74b5b
Author: Jia Fan <[email protected]>
AuthorDate: Wed Aug 28 14:28:26 2024 +0800
[Feature][Connector-V2] Add `decimal_type_narrowing` option in jdbc (#7461)
---
docs/en/connector-v2/source/Jdbc.md | 25 ++-
docs/en/connector-v2/source/PostgreSQL.md | 15 --
.../seatunnel/jdbc/catalog/JdbcCatalogOptions.java | 6 +-
.../jdbc/catalog/oracle/OracleCatalog.java | 25 ++-
.../jdbc/catalog/oracle/OracleCatalogFactory.java | 4 +-
.../jdbc/config/JdbcConnectionConfig.java | 15 ++
.../seatunnel/jdbc/config/JdbcOptions.java | 7 +
.../seatunnel/jdbc/config/JdbcSourceConfig.java | 5 +-
.../dialect/oracle/OracleTypeConverter.java | 24 ++-
.../internal/dialect/oracle/OracleTypeMapper.java | 13 +-
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 3 +
.../dialect/oracle/OracleTypeConverterTest.java | 239 ++++++++++++++++-----
.../connectors/seatunnel/jdbc/JdbcOracleIT.java | 24 +++
.../test/resources/jdbc_oracle_source_to_sink.conf | 4 +-
.../jdbc_oracle_source_to_sink_use_select1.conf | 4 +-
.../jdbc_oracle_source_to_sink_use_select2.conf | 2 +-
.../jdbc_oracle_source_to_sink_use_select3.conf | 2 +-
...rce_to_sink_without_decimal_type_narrowing.conf | 82 +++++++
18 files changed, 407 insertions(+), 92 deletions(-)
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 31257d85b1..44a8a7f3df 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -39,7 +39,7 @@ supports query SQL and can achieve projection effect.
## Options
-| name | type | required | default
value |
description
[...]
+| name | type | required | default
value | description
[...]
|--------------------------------------------|---------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost/test
[...]
| driver | String | Yes | -
| The jdbc class name used to connect to the remote data source, if you
use MySQL the value is `com.mysql.cj.jdbc.Driver`.
[...]
@@ -52,6 +52,7 @@ supports query SQL and can achieve projection effect.
| partition_upper_bound | Long | No | -
| The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
[...]
| partition_lower_bound | Long | No | -
| The partition_column min value for scan, if not set SeaTunnel will query
database get min value.
[...]
| partition_num | Int | No | job
parallelism | Not recommended for use, The correct approach is to control the
number of split through `split.size`<br/> How many splits do we need to split
into, only support positive integer. default value is job parallelism.
[...]
+| decimal_type_narrowing | Boolean | No | true
| Decimal type narrowing, if true, the decimal type will be narrowed to
the int or long type if without loss of precision. Only support for Oracle at
now. Please refer to `decimal_type_narrowing` below
[...]
| use_select_count | Boolean | No | false
| Use select count for table count rather then other methods in dynamic
chunk split stage. This is currently only available for jdbc-oracle.In this
scenario, select count directly is used when it is faster to update statistics
using sql from analysis table
[...]
| skip_analyze | Boolean | No | false
| Skip the analysis of table count in dynamic chunk split stage. This is
currently only available for jdbc-oracle.In this scenario, you schedule
analysis table sql to update related table statistics periodically or your
table data does not change frequently
[...]
| fetch_size | Int | No | 0
| For queries that return a large number of objects, you can configure the
row fetch size used in the query to improve performance by reducing the number
database hits required to satisfy the selection criteria. Zero means use jdbc
default value.
[...]
@@ -66,6 +67,28 @@ supports query SQL and can achieve projection effect.
| split.inverse-sampling.rate | Int | No | 1000
| The inverse of the sampling rate used in the sample sharding strategy.
For example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is pref [...]
| common-options | | No | -
| Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details.
[...]
+### decimal_type_narrowing
+
+Decimal type narrowing, if true, the decimal type will be narrowed to the int
or long type if without loss of precision. Only support for Oracle at now.
+
+eg:
+
+decimal_type_narrowing = true
+
+| Oracle | SeaTunnel |
+|---------------|-----------|
+| NUMBER(1, 0) | Boolean |
+| NUMBER(6, 0) | INT |
+| NUMBER(10, 0) | BIGINT |
+
+decimal_type_narrowing = false
+
+| Oracle | SeaTunnel |
+|---------------|----------------|
+| NUMBER(1, 0) | Decimal(1, 0) |
+| NUMBER(6, 0) | Decimal(6, 0) |
+| NUMBER(10, 0) | Decimal(10, 0) |
+
## Parallel Reader
The JDBC Source connector supports parallel reading of data from tables.
SeaTunnel will use certain rules to split the data in the table, which will be
handed over to readers for reading. The number of readers is determined by the
`parallelism` option.
diff --git a/docs/en/connector-v2/source/PostgreSQL.md
b/docs/en/connector-v2/source/PostgreSQL.md
index b6e95c8ad7..5dd922b76f 100644
--- a/docs/en/connector-v2/source/PostgreSQL.md
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -74,21 +74,6 @@ Read external data source data through JDBC.
## Options
-| Name | Type | Required | Default |
Description
|
-|------------------------------|------------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url | String | Yes | - | The
URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost:5432/test
|
-| driver | String | Yes | - | The
jdbc class name used to connect to the remote data source,<br/> if you use
PostgreSQL the value is `org.postgresql.Driver`.
|
-| user | String | No | - |
Connection instance user name
|
-| password | String | No | - |
Connection instance password
|
-| query | String | Yes | - |
Query statement
|
-| connection_check_timeout_sec | Int | No | 30 | The
time in seconds to wait for the database operation used to validate the
connection to complete
|
-| partition_column | String | No | - | The
column name for parallelism's partition, only support numeric type,Only support
numeric type primary key, and only can config one column.
|
-| partition_lower_bound | BigDecimal | No | - | The
partition_column min value for scan, if not set SeaTunnel will query database
get min value.
|
-| partition_upper_bound | BigDecimal | No | - | The
partition_column max value for scan, if not set SeaTunnel will query database
get max value.
|
-| partition_num | Int | No | job parallelism | The
number of partition count, only support positive integer. default value is job
parallelism
|
-| fetch_size | Int | No | 0 | For
queries that return a large number of objects,you can configure<br/> the row
fetch size used in the query toimprove performance by<br/> reducing the number
database hits required to satisfy the selection criteria.<br/> Zero means use
jdbc default value. |
-| properties | Map | No | - |
Additional connection configuration parameters,when properties and URL have the
same parameters, the priority is determined by the <br/>specific implementation
of the driver. For example, in MySQL, properties take precedence over the URL.
|
-
| Name | Type | Required |
Default |
Description
[...]
|--------------------------------------------|------------|----------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
jdbc:mysql://localhost:3306:3306/test
[...]
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
index c2f2405ee0..c412ca9218 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
public interface JdbcCatalogOptions {
Option<String> BASE_URL =
@@ -59,7 +60,10 @@ public interface JdbcCatalogOptions {
+ "For example, when using OceanBase
database, you need to set it to 'mysql' or 'oracle'.");
OptionRule.Builder BASE_RULE =
- OptionRule.builder().required(BASE_URL).required(USERNAME,
PASSWORD).optional(SCHEMA);
+ OptionRule.builder()
+ .required(BASE_URL)
+ .required(USERNAME, PASSWORD)
+ .optional(SCHEMA, JdbcOptions.DECIMAL_TYPE_NARROWING);
Option<String> TABLE_PREFIX =
Options.key("tablePrefix")
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 5aa6dcd874..ccbbfb509d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper;
@@ -71,13 +72,32 @@ public class OracleCatalog extends AbstractJdbcCatalog {
+ "ORDER BY \n"
+ " cols.column_id \n";
+ private boolean decimalTypeNarrowing;
+
public OracleCatalog(
String catalogName,
String username,
String pwd,
JdbcUrlUtil.UrlInfo urlInfo,
String defaultSchema) {
+ this(
+ catalogName,
+ username,
+ pwd,
+ urlInfo,
+ defaultSchema,
+ JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue());
+ }
+
+ public OracleCatalog(
+ String catalogName,
+ String username,
+ String pwd,
+ JdbcUrlUtil.UrlInfo urlInfo,
+ String defaultSchema,
+ boolean decimalTypeNarrowing) {
super(catalogName, username, pwd, urlInfo, defaultSchema);
+ this.decimalTypeNarrowing = decimalTypeNarrowing;
}
@Override
@@ -162,7 +182,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
.defaultValue(defaultValue)
.comment(columnComment)
.build();
- return OracleTypeConverter.INSTANCE.convert(typeDefine);
+ return new
OracleTypeConverter(decimalTypeNarrowing).convert(typeDefine);
}
@Override
@@ -183,7 +203,8 @@ public class OracleCatalog extends AbstractJdbcCatalog {
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
- return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new
OracleTypeMapper());
+ return CatalogUtils.getCatalogTable(
+ defaultConnection, sqlQuery, new
OracleTypeMapper(decimalTypeNarrowing));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
index 7c90c79347..2b51d97621 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import com.google.auto.service.AutoService;
@@ -52,7 +53,8 @@ public class OracleCatalogFactory implements CatalogFactory {
options.get(JdbcCatalogOptions.USERNAME),
options.get(JdbcCatalogOptions.PASSWORD),
urlInfo,
- options.get(JdbcCatalogOptions.SCHEMA));
+ options.get(JdbcCatalogOptions.SCHEMA),
+ options.get(JdbcOptions.DECIMAL_TYPE_NARROWING));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index dc379bb38a..053ab71a41 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -43,6 +43,8 @@ public class JdbcConnectionConfig implements Serializable {
public String xaDataSourceClassName;
+ public boolean decimalTypeNarrowing =
JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
+
public int maxCommitAttempts =
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
public int transactionTimeoutSec =
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
@@ -81,6 +83,8 @@ public class JdbcConnectionConfig implements Serializable {
config.getOptional(JdbcOptions.USER).ifPresent(builder::username);
config.getOptional(JdbcOptions.PASSWORD).ifPresent(builder::password);
config.getOptional(JdbcOptions.PROPERTIES).ifPresent(builder::properties);
+ config.getOptional(JdbcOptions.DECIMAL_TYPE_NARROWING)
+ .ifPresent(builder::decimalTypeNarrowing);
return builder.build();
}
@@ -108,6 +112,10 @@ public class JdbcConnectionConfig implements Serializable {
return maxRetries;
}
+ public boolean isDecimalTypeNarrowing() {
+ return decimalTypeNarrowing;
+ }
+
public Optional<String> getUsername() {
return Optional.ofNullable(username);
}
@@ -153,6 +161,7 @@ public class JdbcConnectionConfig implements Serializable {
private boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
private int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
private String xaDataSourceClassName;
+ private boolean decimalTypeNarrowing =
JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
private int maxCommitAttempts =
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
private int transactionTimeoutSec =
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
private Map<String, String> properties;
@@ -183,6 +192,11 @@ public class JdbcConnectionConfig implements Serializable {
return this;
}
+ public Builder decimalTypeNarrowing(boolean decimalTypeNarrowing) {
+ this.decimalTypeNarrowing = decimalTypeNarrowing;
+ return this;
+ }
+
public Builder maxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
@@ -267,6 +281,7 @@ public class JdbcConnectionConfig implements Serializable {
jdbcConnectionConfig.transactionTimeoutSec =
this.transactionTimeoutSec;
jdbcConnectionConfig.maxCommitAttempts = this.maxCommitAttempts;
jdbcConnectionConfig.xaDataSourceClassName =
this.xaDataSourceClassName;
+ jdbcConnectionConfig.decimalTypeNarrowing =
this.decimalTypeNarrowing;
jdbcConnectionConfig.useKerberos = this.useKerberos;
jdbcConnectionConfig.kerberosPrincipal = this.kerberosPrincipal;
jdbcConnectionConfig.kerberosKeytabPath = this.kerberosKeytabPath;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 7f0ec48f36..976650456b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -101,6 +101,13 @@ public interface JdbcOptions {
.defaultValue(false)
.withDescription("generate sql using the database table");
+ Option<Boolean> DECIMAL_TYPE_NARROWING =
+ Options.key("decimal_type_narrowing")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "decimal type narrowing, if true, the decimal type
will be narrowed to the int or long type if without loss of precision. Only
support for Oracle at now.");
+
Option<String> XA_DATA_SOURCE_CLASS_NAME =
Options.key("xa_data_source_class_name")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
index 74c7801318..09cc92d70e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
@@ -42,6 +42,7 @@ public class JdbcSourceConfig implements Serializable {
private double splitEvenDistributionFactorLowerBound;
private int splitSampleShardingThreshold;
private int splitInverseSamplingRate;
+ private boolean decimalTypeNarrowing;
public static JdbcSourceConfig of(ReadonlyConfig config) {
JdbcSourceConfig.Builder builder = JdbcSourceConfig.builder();
@@ -53,7 +54,7 @@ public class JdbcSourceConfig implements Serializable {
boolean isOldVersion =
config.getOptional(JdbcOptions.QUERY).isPresent()
&&
config.getOptional(JdbcOptions.PARTITION_COLUMN).isPresent();
- builder.useDynamicSplitter(isOldVersion ? false : true);
+ builder.useDynamicSplitter(!isOldVersion);
builder.splitSize(config.get(JdbcSourceOptions.SPLIT_SIZE));
builder.splitEvenDistributionFactorUpperBound(
@@ -64,6 +65,8 @@ public class JdbcSourceConfig implements Serializable {
config.get(JdbcSourceOptions.SPLIT_SAMPLE_SHARDING_THRESHOLD));
builder.splitInverseSamplingRate(config.get(JdbcSourceOptions.SPLIT_INVERSE_SAMPLING_RATE));
+
builder.decimalTypeNarrowing(config.get(JdbcOptions.DECIMAL_TYPE_NARROWING));
+
config.getOptional(JdbcSourceOptions.WHERE_CONDITION)
.ifPresent(
whereConditionClause -> {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
index d359f3fef0..023fa949cf 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
@@ -86,6 +86,16 @@ public class OracleTypeConverter implements
TypeConverter<BasicTypeDefine> {
public static final long BYTES_4GB = (long) Math.pow(2, 32);
public static final OracleTypeConverter INSTANCE = new
OracleTypeConverter();
+ private final boolean decimalTypeNarrowing;
+
+ public OracleTypeConverter() {
+ this(true);
+ }
+
+ public OracleTypeConverter(boolean decimalTypeNarrowing) {
+ this.decimalTypeNarrowing = decimalTypeNarrowing;
+ }
+
@Override
public String identifier() {
return DatabaseIdentifier.ORACLE;
@@ -119,12 +129,14 @@ public class OracleTypeConverter implements
TypeConverter<BasicTypeDefine> {
if (scale <= 0) {
int newPrecision = (int) (precision - scale);
- if (newPrecision == 1) {
- builder.dataType(BasicType.BOOLEAN_TYPE);
- } else if (newPrecision <= 9) {
- builder.dataType(BasicType.INT_TYPE);
- } else if (newPrecision <= 18) {
- builder.dataType(BasicType.LONG_TYPE);
+ if (newPrecision <= 18 && decimalTypeNarrowing) {
+ if (newPrecision == 1) {
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ } else if (newPrecision <= 9) {
+ builder.dataType(BasicType.INT_TYPE);
+ } else {
+ builder.dataType(BasicType.LONG_TYPE);
+ }
} else if (newPrecision < 38) {
builder.dataType(new DecimalType(newPrecision, 0));
builder.columnLength((long) newPrecision);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
index ce5ef8af88..bbdd19af8e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import lombok.extern.slf4j.Slf4j;
@@ -31,9 +32,19 @@ import java.util.Arrays;
@Slf4j
public class OracleTypeMapper implements JdbcDialectTypeMapper {
+ private final boolean decimalTypeNarrowing;
+
+ public OracleTypeMapper() {
+ this(JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue());
+ }
+
+ public OracleTypeMapper(boolean decimalTypeNarrowing) {
+ this.decimalTypeNarrowing = decimalTypeNarrowing;
+ }
+
@Override
public Column mappingColumn(BasicTypeDefine typeDefine) {
- return OracleTypeConverter.INSTANCE.convert(typeDefine);
+ return new
OracleTypeConverter(decimalTypeNarrowing).convert(typeDefine);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 860131041a..1fa379acb4 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -395,6 +396,8 @@ public class JdbcCatalogUtils {
.ifPresent(val ->
catalogConfig.put(JdbcCatalogOptions.PASSWORD.key(), val));
Optional.ofNullable(config.getCompatibleMode())
.ifPresent(val ->
catalogConfig.put(JdbcCatalogOptions.COMPATIBLE_MODE.key(), val));
+ catalogConfig.put(
+ JdbcOptions.DECIMAL_TYPE_NARROWING.key(),
config.isDecimalTypeNarrowing());
return ReadonlyConfig.fromMap(catalogConfig);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
index 26238bad30..d4a8defdda 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverterTest.java
@@ -36,12 +36,14 @@ import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.or
public class OracleTypeConverterTest {
+ private static final OracleTypeConverter INSTANCE = new
OracleTypeConverter();
+
@Test
public void testConvertUnsupported() {
BasicTypeDefine<Object> typeDefine =
BasicTypeDefine.builder().name("test").columnType("aaa").dataType("aaa").build();
try {
- OracleTypeConverter.INSTANCE.convert(typeDefine);
+ INSTANCE.convert(typeDefine);
Assertions.fail();
} catch (SeaTunnelRuntimeException e) {
// ignore
@@ -50,6 +52,113 @@ public class OracleTypeConverterTest {
}
}
+ @Test
+ public void testConvertNumberWithoutDecimalTypeNarrowing() {
+ OracleTypeConverter converter = new OracleTypeConverter(false);
+
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number")
+ .dataType("number")
+ .build();
+ Column column = converter.convert(typeDefine);
+
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number(38,127)")
+ .dataType("number")
+ .precision(38L)
+ .scale(127)
+ .build();
+ column = converter.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number")
+ .dataType("number")
+ .scale(0)
+ .build();
+ column = converter.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number(1,0)")
+ .dataType("number")
+ .precision(1L)
+ .scale(0)
+ .build();
+ column = converter.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(1, 0), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number(8,0)")
+ .dataType("number")
+ .precision(8L)
+ .scale(0)
+ .build();
+ column = converter.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(8, 0), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number(18,0)")
+ .dataType("number")
+ .precision(18L)
+ .scale(0)
+ .build();
+ column = converter.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(18, 0), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number(38,0)")
+ .dataType("number")
+ .precision(38L)
+ .scale(0)
+ .build();
+ column = converter.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number(39,0)")
+ .dataType("number")
+ .precision(39L)
+ .scale(0)
+ .build();
+ column = converter.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
+
@Test
public void testConvertInteger() {
BasicTypeDefine<Object> typeDefine =
@@ -58,10 +167,24 @@ public class OracleTypeConverterTest {
.columnType("integer")
.dataType("integer")
.build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ // generated by int/smallint type in oracle create table sql
+ BasicTypeDefine<Object> numberTypeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("number")
+ .dataType("number")
+ .precision(null)
+ .scale(0)
+ .build();
+ column = INSTANCE.convert(numberTypeDefine);
+ Assertions.assertEquals(numberTypeDefine.getName(), column.getName());
+ Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
+ Assertions.assertEquals(numberTypeDefine.getColumnType(),
column.getSourceType());
}
@Test
@@ -72,7 +195,7 @@ public class OracleTypeConverterTest {
.columnType("number")
.dataType("number")
.build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
@@ -86,7 +209,7 @@ public class OracleTypeConverterTest {
.precision(38L)
.scale(127)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -98,7 +221,7 @@ public class OracleTypeConverterTest {
.dataType("number")
.scale(0)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -111,7 +234,7 @@ public class OracleTypeConverterTest {
.precision(1L)
.scale(0)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -124,7 +247,7 @@ public class OracleTypeConverterTest {
.precision(8L)
.scale(0)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -137,7 +260,7 @@ public class OracleTypeConverterTest {
.precision(18L)
.scale(0)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -150,7 +273,7 @@ public class OracleTypeConverterTest {
.precision(38L)
.scale(0)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -163,7 +286,7 @@ public class OracleTypeConverterTest {
.precision(39L)
.scale(0)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -177,7 +300,7 @@ public class OracleTypeConverterTest {
.columnType("float")
.dataType("float")
.build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 18), column.getDataType());
@@ -189,7 +312,7 @@ public class OracleTypeConverterTest {
.columnType("binary_float")
.dataType("binary_float")
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
@@ -197,7 +320,7 @@ public class OracleTypeConverterTest {
typeDefine =
BasicTypeDefine.builder().name("test").columnType("real").dataType("real").build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType());
@@ -212,7 +335,7 @@ public class OracleTypeConverterTest {
.columnType("binary_double")
.dataType("binary_double")
.build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType());
@@ -228,7 +351,7 @@ public class OracleTypeConverterTest {
.dataType("char")
.length(1L)
.build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -242,7 +365,7 @@ public class OracleTypeConverterTest {
.dataType("nchar")
.length(1L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -256,7 +379,7 @@ public class OracleTypeConverterTest {
.dataType("varchar")
.length(1L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -270,7 +393,7 @@ public class OracleTypeConverterTest {
.dataType("varchar2")
.length(1L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -284,7 +407,7 @@ public class OracleTypeConverterTest {
.dataType("nvarchar2")
.length(1L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -297,7 +420,7 @@ public class OracleTypeConverterTest {
.columnType("rowid")
.dataType("rowid")
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -311,7 +434,7 @@ public class OracleTypeConverterTest {
.dataType("xmltype")
.length(1L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -325,7 +448,7 @@ public class OracleTypeConverterTest {
.dataType("sys.xmltype")
.length(1L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -339,7 +462,7 @@ public class OracleTypeConverterTest {
.dataType("long")
.length(1L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -348,7 +471,7 @@ public class OracleTypeConverterTest {
typeDefine =
BasicTypeDefine.builder().name("test").columnType("clob").dataType("clob").build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -361,7 +484,7 @@ public class OracleTypeConverterTest {
.columnType("nclob")
.dataType("nclob")
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
@@ -373,7 +496,7 @@ public class OracleTypeConverterTest {
public void testConvertBytes() {
BasicTypeDefine<Object> typeDefine =
BasicTypeDefine.builder().name("test").columnType("blob").dataType("blob").build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE,
column.getDataType());
@@ -382,7 +505,7 @@ public class OracleTypeConverterTest {
typeDefine =
BasicTypeDefine.builder().name("test").columnType("raw").dataType("raw").build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE,
column.getDataType());
@@ -396,7 +519,7 @@ public class OracleTypeConverterTest {
.dataType("raw")
.length(10L)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE,
column.getDataType());
@@ -409,7 +532,7 @@ public class OracleTypeConverterTest {
.columnType("long raw")
.dataType("long raw")
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE,
column.getDataType());
@@ -421,7 +544,7 @@ public class OracleTypeConverterTest {
public void testConvertDatetime() {
BasicTypeDefine<Object> typeDefine =
BasicTypeDefine.builder().name("test").columnType("date").dataType("date").build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
@@ -435,7 +558,7 @@ public class OracleTypeConverterTest {
.dataType("timestamp")
.scale(6)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
@@ -449,7 +572,7 @@ public class OracleTypeConverterTest {
.dataType("timestamp with time zone")
.scale(6)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
@@ -463,7 +586,7 @@ public class OracleTypeConverterTest {
.dataType("timestamp with local time zone")
.scale(6)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
@@ -482,7 +605,7 @@ public class OracleTypeConverterTest {
null,
null);
try {
- OracleTypeConverter.INSTANCE.reconvert(column);
+ INSTANCE.reconvert(column);
Assertions.fail();
} catch (SeaTunnelRuntimeException e) {
// ignore
@@ -496,7 +619,7 @@ public class OracleTypeConverterTest {
Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.BOOLEAN_TYPE).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
String.format("%s(%s)", OracleTypeConverter.ORACLE_NUMBER, 1),
@@ -509,7 +632,7 @@ public class OracleTypeConverterTest {
public void testReconvertByte() {
Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.BYTE_TYPE).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getDataType());
@@ -520,7 +643,7 @@ public class OracleTypeConverterTest {
Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.SHORT_TYPE).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getDataType());
@@ -530,7 +653,7 @@ public class OracleTypeConverterTest {
public void testReconvertInt() {
Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.INT_TYPE).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getDataType());
@@ -540,7 +663,7 @@ public class OracleTypeConverterTest {
public void testReconvertLong() {
Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.LONG_TYPE).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_INTEGER,
typeDefine.getDataType());
@@ -551,7 +674,7 @@ public class OracleTypeConverterTest {
Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.FLOAT_TYPE).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
OracleTypeConverter.ORACLE_BINARY_FLOAT,
typeDefine.getColumnType());
@@ -563,7 +686,7 @@ public class OracleTypeConverterTest {
Column column =
PhysicalColumn.builder().name("test").dataType(BasicType.DOUBLE_TYPE).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
OracleTypeConverter.ORACLE_BINARY_DOUBLE,
typeDefine.getColumnType());
@@ -575,7 +698,7 @@ public class OracleTypeConverterTest {
Column column =
PhysicalColumn.builder().name("test").dataType(new
DecimalType(0, 0)).build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
String.format(
@@ -588,7 +711,7 @@ public class OracleTypeConverterTest {
column = PhysicalColumn.builder().name("test").dataType(new
DecimalType(10, 2)).build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
String.format("%s(%s,%s)", OracleTypeConverter.ORACLE_NUMBER,
10, 2),
@@ -605,7 +728,7 @@ public class OracleTypeConverterTest {
.columnLength(null)
.build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB,
typeDefine.getDataType());
@@ -617,7 +740,7 @@ public class OracleTypeConverterTest {
.columnLength(2000L)
.build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
String.format("%s(%s)", OracleTypeConverter.ORACLE_RAW,
column.getColumnLength()),
@@ -631,7 +754,7 @@ public class OracleTypeConverterTest {
.columnLength(BYTES_2GB)
.build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB,
typeDefine.getDataType());
@@ -643,7 +766,7 @@ public class OracleTypeConverterTest {
.columnLength(BYTES_2GB + 1)
.build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_BLOB,
typeDefine.getDataType());
@@ -658,7 +781,7 @@ public class OracleTypeConverterTest {
.columnLength(null)
.build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals("VARCHAR2(4000)", typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_VARCHAR2,
typeDefine.getDataType());
@@ -670,7 +793,7 @@ public class OracleTypeConverterTest {
.columnLength(2000L)
.build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
String.format(
@@ -685,7 +808,7 @@ public class OracleTypeConverterTest {
.columnLength(4000L)
.build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
String.format(
@@ -700,7 +823,7 @@ public class OracleTypeConverterTest {
.columnLength(40001L)
.build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_CLOB,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_CLOB,
typeDefine.getDataType());
@@ -714,7 +837,7 @@ public class OracleTypeConverterTest {
.dataType(LocalTimeType.LOCAL_DATE_TYPE)
.build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(OracleTypeConverter.ORACLE_DATE,
typeDefine.getColumnType());
Assertions.assertEquals(OracleTypeConverter.ORACLE_DATE,
typeDefine.getDataType());
@@ -728,7 +851,7 @@ public class OracleTypeConverterTest {
.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE)
.build();
- BasicTypeDefine typeDefine =
OracleTypeConverter.INSTANCE.reconvert(column);
+ BasicTypeDefine typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
OracleTypeConverter.ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE,
@@ -744,7 +867,7 @@ public class OracleTypeConverterTest {
.scale(3)
.build();
- typeDefine = OracleTypeConverter.INSTANCE.reconvert(column);
+ typeDefine = INSTANCE.reconvert(column);
Assertions.assertEquals(column.getName(), typeDefine.getName());
Assertions.assertEquals(
String.format("TIMESTAMP(%s) WITH LOCAL TIME ZONE",
column.getScale()),
@@ -765,7 +888,7 @@ public class OracleTypeConverterTest {
.precision(38L)
.scale(-1)
.build();
- Column column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ Column column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -778,7 +901,7 @@ public class OracleTypeConverterTest {
.precision(5L)
.scale(-2)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -791,7 +914,7 @@ public class OracleTypeConverterTest {
.precision(9L)
.scale(-2)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
@@ -804,7 +927,7 @@ public class OracleTypeConverterTest {
.precision(14L)
.scale(-11)
.build();
- column = OracleTypeConverter.INSTANCE.convert(typeDefine);
+ column = INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(25, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index b7c4a54b59..9cd130219a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -25,11 +25,15 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLPa
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -77,6 +81,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
+ " VARCHAR_10_COL varchar2(10),\n"
+ " CHAR_10_COL char(10),\n"
+ " CLOB_COL clob,\n"
+ + " NUMBER_1 number(1),\n"
+ + " NUMBER_6 number(6),\n"
+ + " NUMBER_10 number(10),\n"
+ " NUMBER_3_SF_2_DP number(3, 2),\n"
+ " NUMBER_7_SF_N2_DP number(7, -2),\n"
+ " INTEGER_COL integer,\n"
@@ -97,6 +104,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
+ " VARCHAR_10_COL varchar2(10),\n"
+ " CHAR_10_COL char(10),\n"
+ " CLOB_COL clob,\n"
+ + " NUMBER_1 number(1),\n"
+ + " NUMBER_6 number(6),\n"
+ + " NUMBER_10 number(10),\n"
+ " NUMBER_3_SF_2_DP number(3, 2),\n"
+ " NUMBER_7_SF_N2_DP number(7, -2),\n"
+ " INTEGER_COL integer,\n"
@@ -115,6 +125,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
"VARCHAR_10_COL",
"CHAR_10_COL",
"CLOB_COL",
+ "NUMBER_1",
+ "NUMBER_6",
+ "NUMBER_10",
"NUMBER_3_SF_2_DP",
"NUMBER_7_SF_N2_DP",
"INTEGER_COL",
@@ -148,6 +161,14 @@ public class JdbcOracleIT extends AbstractJdbcIT {
dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1,
1024);
}
+ @TestTemplate
+ public void testOracleWithoutDecimalTypeNarrowing(TestContainer container)
throws Exception {
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
@Override
JdbcCase getJdbcCase() {
Map<String, String> containerEnv = new HashMap<>();
@@ -207,6 +228,9 @@ public class JdbcOracleIT extends AbstractJdbcIT {
String.format("f%s", i),
String.format("f%s", i),
String.format("f%s", i),
+ 1,
+ i * 10,
+ i * 1000,
BigDecimal.valueOf(1.1),
BigDecimal.valueOf(2400),
i,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
index d956894c34..4df8c7b993 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
@@ -30,7 +30,7 @@ source {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
+ query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
@@ -46,7 +46,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
index 8a0c831044..1988b48872 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
@@ -31,7 +31,7 @@ source {
user = testUser
password = testPassword
use_select_count = true
- query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
+ query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
@@ -47,7 +47,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
index ebebdb5505..4d01da5c72 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
@@ -47,7 +47,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
index d00ce9b643..94a850fdd0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
@@ -48,7 +48,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf
new file mode 100644
index 0000000000..58e98f5def
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_without_decimal_type_narrowing.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = oracle.jdbc.driver.OracleDriver
+ url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+ user = testUser
+ password = testPassword
+ decimal_type_narrowing = false
+ query = "SELECT NUMBER_1,NUMBER_6,NUMBER_10 FROM E2E_TABLE_SOURCE"
+ properties {
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 20000
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 20000
+ }
+ ],
+ field_rules = [
+ {
+ field_name = NUMBER_1
+ field_type = "decimal(1, 0)"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = NUMBER_6
+ field_type = "decimal(6, 0)"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = NUMBER_10
+ field_type = "decimal(10, 0)"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+
+}