This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3a8f4afb1b [Improve][seatunnel-connectors-v2][connector-jdbc] Remove 
some obsolete code in JdbcSinkFactory (#10026)
3a8f4afb1b is described below

commit 3a8f4afb1b63887e1f502ef74584e6153273bdf9
Author: 老王 <[email protected]>
AuthorDate: Mon Dec 29 20:15:03 2025 +0800

    [Improve][seatunnel-connectors-v2][connector-jdbc] Remove some obsolete 
code in JdbcSinkFactory (#10026)
---
 .../api/sink/SinkReplaceNameConstant.java          | 29 ---------
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       | 73 +++++++---------------
 2 files changed, 21 insertions(+), 81 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java
deleted file mode 100644
index 0291c2760c..0000000000
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.sink;
-
-/** @deprecated instead by {@link TablePlaceholder} todo remove this class */
-@Deprecated
-public final class SinkReplaceNameConstant {
-
-    public static final String REPLACE_TABLE_NAME_KEY = "${table_name}";
-
-    public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}";
-
-    public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}";
-}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 4a692ea3f5..78a57cb6a9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -25,7 +25,6 @@ import 
org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
-import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
@@ -75,18 +74,13 @@ public class JdbcSinkFactory implements TableSinkFactory {
         ReadonlyConfig catalogOptions = getCatalogOptions(context);
         Optional<String> optionalTable = 
config.getOptional(JdbcSinkOptions.TABLE);
         Optional<String> optionalDatabase = 
config.getOptional(JdbcSinkOptions.DATABASE);
-        if (!optionalTable.isPresent()) {
-            optionalTable = 
Optional.of(SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY);
-        }
-        // get source table relevant information
+        // source table info
         TableIdentifier tableId = catalogTable.getTableId();
-        String sourceDatabaseName = tableId.getDatabaseName();
-        String sourceSchemaName = tableId.getSchemaName();
-        String pluginInputIdentifier = tableId.getTableName();
-        // get sink table relevant information
+        // sink table info
         String sinkDatabaseName =
-                
optionalDatabase.orElse(SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY);
-        String sinkTableNameBefore = optionalTable.get();
+                
optionalDatabase.orElse(catalogTable.getTablePath().getDatabaseName());
+        String sinkTableNameBefore =
+                
optionalTable.orElse(catalogTable.getTablePath().getTableName());
         String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
         String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length 
- 1];
         String sinkSchemaName;
@@ -98,49 +92,24 @@ public class JdbcSinkFactory implements TableSinkFactory {
         if 
(StringUtils.isNotBlank(catalogOptions.get(JdbcSinkOptions.SCHEMA))) {
             sinkSchemaName = catalogOptions.get(JdbcSinkOptions.SCHEMA);
         }
-        // to add tablePrefix and tableSuffix
+        // prefix / suffix
         String tempTableName;
         String prefix = catalogOptions.get(JdbcSinkOptions.TABLE_PREFIX);
         String suffix = catalogOptions.get(JdbcSinkOptions.TABLE_SUFFIX);
         if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) {
             tempTableName = StringUtils.isNotEmpty(prefix) ? prefix + 
sinkTableName : sinkTableName;
             tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName + 
suffix : tempTableName;
-
         } else {
             tempTableName = sinkTableName;
         }
-        // to replace
-        String finalDatabaseName = sinkDatabaseName;
-        if (StringUtils.isNotEmpty(sourceDatabaseName)) {
-            finalDatabaseName =
-                    sinkDatabaseName.replace(
-                            SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY, 
sourceDatabaseName);
-        }
-
-        String finalSchemaName;
-        if (sinkSchemaName != null) {
-            if (sourceSchemaName == null) {
-                finalSchemaName = sinkSchemaName;
-            } else {
-                finalSchemaName =
-                        sinkSchemaName.replace(
-                                
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, sourceSchemaName);
-            }
-        } else {
-            finalSchemaName = null;
-        }
-        String finalTableName = sinkTableName;
-        if (StringUtils.isNotEmpty(pluginInputIdentifier)) {
-            finalTableName =
-                    tempTableName.replace(
-                            SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, 
pluginInputIdentifier);
-        }
-
-        // rebuild TableIdentifier and catalogTable
+        // without replace, keep original directly
+        String finalSchemaName = sinkSchemaName;
+        String finalTableName = tempTableName;
+        // rebuild identifier
         TableIdentifier newTableId =
                 TableIdentifier.of(
                         tableId.getCatalogName(),
-                        finalDatabaseName,
+                        sinkDatabaseName,
                         finalSchemaName,
                         finalTableName);
         catalogTable =
@@ -151,6 +120,7 @@ public class JdbcSinkFactory implements TableSinkFactory {
                         catalogTable.getPartitionKeys(),
                         catalogTable.getComment(),
                         catalogTable.getCatalogName());
+
         Map<String, String> map = config.toMap();
         if (catalogTable.getTableId().getSchemaName() != null) {
             map.put(
@@ -176,16 +146,17 @@ public class JdbcSinkFactory implements TableSinkFactory {
                                                 
ConstraintKey.ConstraintType.UNIQUE_KEY.equals(
                                                         
key.getConstraintType()))
                                 .findFirst();
-                if (keyOptional.isPresent()) {
-                    map.put(
-                            JdbcSinkOptions.PRIMARY_KEYS.key(),
-                            keyOptional.get().getColumnNames().stream()
-                                    .map(key -> key.getColumnName())
-                                    .collect(Collectors.joining(",")));
-                }
+                keyOptional.ifPresent(
+                        constraintKey ->
+                                map.put(
+                                        JdbcSinkOptions.PRIMARY_KEYS.key(),
+                                        constraintKey.getColumnNames().stream()
+                                                .map(
+                                                        
ConstraintKey.ConstraintKeyColumn
+                                                                
::getColumnName)
+                                                
.collect(Collectors.joining(","))));
             }
         } else {
-            // replace primary key to config
             PrimaryKey configPk =
                     PrimaryKey.of(
                             catalogTable.getTablePath().getTableName() + 
"_config_pk",
@@ -205,7 +176,6 @@ public class JdbcSinkFactory implements TableSinkFactory {
                             catalogTable.getCatalogName());
         }
         config = ReadonlyConfig.fromMap(new HashMap<>(map));
-        // always execute
         final ReadonlyConfig options = config;
         JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
         FieldIdeEnum fieldIdeEnum = config.get(JdbcSinkOptions.FIELD_IDE);
@@ -223,7 +193,6 @@ public class JdbcSinkFactory implements TableSinkFactory {
                 sinkConfig.getJdbcConnectionConfig().getProperties(),
                 dialect.defaultParameter());
         CatalogTable finalCatalogTable = catalogTable;
-        // get saveMode
         DataSaveMode dataSaveMode = config.get(JdbcSinkOptions.DATA_SAVE_MODE);
         SchemaSaveMode schemaSaveMode = 
config.get(JdbcSinkOptions.SCHEMA_SAVE_MODE);
         return () ->

Reply via email to