This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3a8f4afb1b [Improve][seatunnel-connectors-v2][connector-jdbc] Remove
some obsolete code in JdbcSinkFactory (#10026)
3a8f4afb1b is described below
commit 3a8f4afb1b63887e1f502ef74584e6153273bdf9
Author: 老王 <[email protected]>
AuthorDate: Mon Dec 29 20:15:03 2025 +0800
[Improve][seatunnel-connectors-v2][connector-jdbc] Remove some obsolete
code in JdbcSinkFactory (#10026)
---
.../api/sink/SinkReplaceNameConstant.java | 29 ---------
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 73 +++++++---------------
2 files changed, 21 insertions(+), 81 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java
deleted file mode 100644
index 0291c2760c..0000000000
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.sink;
-
-/** @deprecated instead by {@link TablePlaceholder} todo remove this class */
-@Deprecated
-public final class SinkReplaceNameConstant {
-
- public static final String REPLACE_TABLE_NAME_KEY = "${table_name}";
-
- public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}";
-
- public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}";
-}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 4a692ea3f5..78a57cb6a9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -25,7 +25,6 @@ import
org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
-import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
@@ -75,18 +74,13 @@ public class JdbcSinkFactory implements TableSinkFactory {
ReadonlyConfig catalogOptions = getCatalogOptions(context);
Optional<String> optionalTable =
config.getOptional(JdbcSinkOptions.TABLE);
Optional<String> optionalDatabase =
config.getOptional(JdbcSinkOptions.DATABASE);
- if (!optionalTable.isPresent()) {
- optionalTable =
Optional.of(SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY);
- }
- // get source table relevant information
+ // source table info
TableIdentifier tableId = catalogTable.getTableId();
- String sourceDatabaseName = tableId.getDatabaseName();
- String sourceSchemaName = tableId.getSchemaName();
- String pluginInputIdentifier = tableId.getTableName();
- // get sink table relevant information
+ // sink table info
String sinkDatabaseName =
-
optionalDatabase.orElse(SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY);
- String sinkTableNameBefore = optionalTable.get();
+
optionalDatabase.orElse(catalogTable.getTablePath().getDatabaseName());
+ String sinkTableNameBefore =
+
optionalTable.orElse(catalogTable.getTablePath().getTableName());
String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length
- 1];
String sinkSchemaName;
@@ -98,49 +92,24 @@ public class JdbcSinkFactory implements TableSinkFactory {
if
(StringUtils.isNotBlank(catalogOptions.get(JdbcSinkOptions.SCHEMA))) {
sinkSchemaName = catalogOptions.get(JdbcSinkOptions.SCHEMA);
}
- // to add tablePrefix and tableSuffix
+ // prefix / suffix
String tempTableName;
String prefix = catalogOptions.get(JdbcSinkOptions.TABLE_PREFIX);
String suffix = catalogOptions.get(JdbcSinkOptions.TABLE_SUFFIX);
if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) {
tempTableName = StringUtils.isNotEmpty(prefix) ? prefix +
sinkTableName : sinkTableName;
tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName +
suffix : tempTableName;
-
} else {
tempTableName = sinkTableName;
}
- // to replace
- String finalDatabaseName = sinkDatabaseName;
- if (StringUtils.isNotEmpty(sourceDatabaseName)) {
- finalDatabaseName =
- sinkDatabaseName.replace(
- SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY,
sourceDatabaseName);
- }
-
- String finalSchemaName;
- if (sinkSchemaName != null) {
- if (sourceSchemaName == null) {
- finalSchemaName = sinkSchemaName;
- } else {
- finalSchemaName =
- sinkSchemaName.replace(
-
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, sourceSchemaName);
- }
- } else {
- finalSchemaName = null;
- }
- String finalTableName = sinkTableName;
- if (StringUtils.isNotEmpty(pluginInputIdentifier)) {
- finalTableName =
- tempTableName.replace(
- SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
pluginInputIdentifier);
- }
-
- // rebuild TableIdentifier and catalogTable
+ // without replace, keep original directly
+ String finalSchemaName = sinkSchemaName;
+ String finalTableName = tempTableName;
+ // rebuild identifier
TableIdentifier newTableId =
TableIdentifier.of(
tableId.getCatalogName(),
- finalDatabaseName,
+ sinkDatabaseName,
finalSchemaName,
finalTableName);
catalogTable =
@@ -151,6 +120,7 @@ public class JdbcSinkFactory implements TableSinkFactory {
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
+
Map<String, String> map = config.toMap();
if (catalogTable.getTableId().getSchemaName() != null) {
map.put(
@@ -176,16 +146,17 @@ public class JdbcSinkFactory implements TableSinkFactory {
ConstraintKey.ConstraintType.UNIQUE_KEY.equals(
key.getConstraintType()))
.findFirst();
- if (keyOptional.isPresent()) {
- map.put(
- JdbcSinkOptions.PRIMARY_KEYS.key(),
- keyOptional.get().getColumnNames().stream()
- .map(key -> key.getColumnName())
- .collect(Collectors.joining(",")));
- }
+ keyOptional.ifPresent(
+ constraintKey ->
+ map.put(
+ JdbcSinkOptions.PRIMARY_KEYS.key(),
+ constraintKey.getColumnNames().stream()
+ .map(
+
ConstraintKey.ConstraintKeyColumn
+
::getColumnName)
+
.collect(Collectors.joining(","))));
}
} else {
- // replace primary key to config
PrimaryKey configPk =
PrimaryKey.of(
catalogTable.getTablePath().getTableName() +
"_config_pk",
@@ -205,7 +176,6 @@ public class JdbcSinkFactory implements TableSinkFactory {
catalogTable.getCatalogName());
}
config = ReadonlyConfig.fromMap(new HashMap<>(map));
- // always execute
final ReadonlyConfig options = config;
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
FieldIdeEnum fieldIdeEnum = config.get(JdbcSinkOptions.FIELD_IDE);
@@ -223,7 +193,6 @@ public class JdbcSinkFactory implements TableSinkFactory {
sinkConfig.getJdbcConnectionConfig().getProperties(),
dialect.defaultParameter());
CatalogTable finalCatalogTable = catalogTable;
- // get saveMode
DataSaveMode dataSaveMode = config.get(JdbcSinkOptions.DATA_SAVE_MODE);
SchemaSaveMode schemaSaveMode =
config.get(JdbcSinkOptions.SCHEMA_SAVE_MODE);
return () ->