This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 919a91032a [Improve][Oracle-CDC] Clean unused code (#6212)
919a91032a is described below
commit 919a91032ac13222079c1f4b7df8fe5d687ef6eb
Author: hailin0 <[email protected]>
AuthorDate: Tue Jan 16 11:10:44 2024 +0800
[Improve][Oracle-CDC] Clean unused code (#6212)
---
.../cdc/oracle/source/OracleIncrementalSource.java | 28 +---------------------
.../cdc/oracle/utils/OracleTypeUtils.java | 16 -------------
2 files changed, 1 insertion(+), 43 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index 5c942f8b50..933c8cdc37 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -37,7 +37,6 @@ import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchem
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import
org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
-import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffsetFactory;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
@@ -45,24 +44,18 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
import org.apache.kafka.connect.data.Struct;
import com.google.auto.service.AutoService;
-import io.debezium.connector.oracle.OracleConnection;
import io.debezium.jdbc.JdbcConnection;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.TableChanges;
import lombok.NoArgsConstructor;
-import java.sql.SQLException;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleTypeUtils.convertFromTable;
-
@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
public class OracleIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig>
@@ -116,26 +109,7 @@ public class OracleIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceC
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}
- SeaTunnelDataType<SeaTunnelRow> physicalRowType;
- if (dataType == null) {
- OracleSourceConfig oracleSourceConfig =
- (OracleSourceConfig) this.configFactory.create(0);
- TableId tableId =
-
this.dataSourceDialect.discoverDataCollections(oracleSourceConfig).get(0);
- Table table;
- try (OracleConnection oracleConnection =
-
createOracleConnection(oracleSourceConfig.getDbzConfiguration())) {
- table =
- ((OracleDialect) dataSourceDialect)
- .queryTableSchema(oracleConnection, tableId)
- .getTable();
- } catch (SQLException e) {
- throw new SeaTunnelException(e);
- }
- physicalRowType = convertFromTable(table);
- } else {
- physicalRowType = dataType;
- }
+ SeaTunnelDataType<SeaTunnelRow> physicalRowType = dataType;
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
index 8a7ddc91d8..7e23fbf3a1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
@@ -22,14 +22,11 @@ import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import oracle.jdbc.OracleTypes;
import java.sql.Types;
-import java.util.List;
/** Utilities for converting from oracle types to SeaTunnel types. */
public class OracleTypeUtils {
@@ -78,17 +75,4 @@ public class OracleTypeUtils {
column.typeName(), column.jdbcType()));
}
}
-
- public static SeaTunnelRowType convertFromTable(Table table) {
-
- List<Column> columns = table.columns();
- String[] fieldNames =
columns.stream().map(Column::name).toArray(String[]::new);
-
- SeaTunnelDataType<?>[] fieldTypes =
- columns.stream()
- .map(OracleTypeUtils::convertFromColumn)
- .toArray(SeaTunnelDataType[]::new);
-
- return new SeaTunnelRowType(fieldNames, fieldTypes);
- }
}