This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 42e1761e [Bug][Seatunnel-web][DB2] DB2 Datasource fails due to
improper use of database and schema names. (#220)
42e1761e is described below
commit 42e1761e594acc21bda98790fff00023e87aea9c
Author: Mohammad Arshad <[email protected]>
AuthorDate: Sun Sep 29 07:29:21 2024 +0530
[Bug][Seatunnel-web][DB2] DB2 Datasource fails due to improper use of
database and schema names. (#220)
---
.../plugin/db2/jdbc/Db2DataSourceConfig.java | 11 ++++-
.../plugin/db2/jdbc/Db2JdbcDataSourceChannel.java | 56 +++++++++++++++-------
.../datasource/plugin/db2/jdbc/Db2OptionRule.java | 4 +-
.../seatunnel/app/bean/engine/EngineDataType.java | 6 ++-
.../impl/Db2DataSourceConfigSwitcher.java | 6 ++-
5 files changed, 59 insertions(+), 24 deletions(-)
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
index 90673587..fee9fb07 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
@@ -38,7 +38,16 @@ public class Db2DataSourceConfig {
.build();
public static final Set<String> DB2_SYSTEM_DATABASES =
- Sets.newHashSet("information_schema", "mysql",
"performance_schema", "sys");
+ Sets.newHashSet(
+ "SYSTOOLS",
+ "SYSCAT",
+ "SYSIBM",
+ "SYSIBMADM",
+ "SYSSTAT",
+ "SYSPROC",
+ "SYSFUN",
+ "SYSPUBLIC",
+ "SYSIBMTS");
public static final OptionRule OPTION_RULE =
OptionRule.builder()
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
index 844a4756..724d428e 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
@@ -25,14 +25,15 @@ import
org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -40,8 +41,8 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
+@Slf4j
public class Db2JdbcDataSourceChannel implements DataSourceChannel {
-
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return Db2DataSourceConfig.OPTION_RULE;
@@ -65,13 +66,13 @@ public class Db2JdbcDataSourceChannel implements
DataSourceChannel {
if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
filterName = "%" + filterName + "%";
} else if (StringUtils.equals(filterName, "")) {
- filterName = null;
+ filterName = "%";
}
try (Connection connection = getConnection(requestParams);
ResultSet resultSet =
connection
.getMetaData()
- .getTables(null, null, "%", new String[]
{"TABLE"})) {
+ .getTables(null, database, filterName, new
String[] {"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
@@ -90,11 +91,30 @@ public class Db2JdbcDataSourceChannel implements
DataSourceChannel {
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String>
requestParams) {
- // Hardcoded list of example database names
- List<String> dbNames = Arrays.asList("default");
+ List<String> dbNames = new ArrayList<>();
+ try (Connection connection = getConnection(requestParams);
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery("SELECT SCHEMANAME
FROM SYSCAT.SCHEMATA");
+ while (resultSet.next()) {
+ String dbName = resultSet.getString("SCHEMANAME");
+ if (StringUtils.isBlank(dbName)) {
+ continue;
+ }
+ dbName = dbName.trim();
+ if (isNotSystemDatabase(dbName)) {
+ dbNames.add(dbName);
+ }
+ }
+ } catch (SQLException | ClassNotFoundException e) {
+ throw new DataSourcePluginException("Failed to get database
names", e);
+ }
return dbNames;
}
+ private boolean isNotSystemDatabase(String dbName) {
+ return
!Db2DataSourceConfig.DB2_SYSTEM_DATABASES.contains(dbName.toUpperCase());
+ }
+
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String>
requestParams) {
@@ -119,7 +139,7 @@ public class Db2JdbcDataSourceChannel implements
DataSourceChannel {
String primaryKey = getPrimaryKey(metaData, database, table);
// Retrieve column information
- try (ResultSet resultSet = metaData.getColumns(null, null, table,
null)) {
+ try (ResultSet resultSet = metaData.getColumns(null, database,
table, null)) {
while (resultSet.next()) {
TableField tableField = new TableField();
@@ -140,14 +160,9 @@ public class Db2JdbcDataSourceChannel implements
DataSourceChannel {
tableFields.add(tableField);
}
}
- } catch (SQLException e) {
- // Log the exception and rethrow as DataSourcePluginException
- System.out.println("Error while retrieving table fields: " + e);
+ } catch (SQLException | ClassNotFoundException e) {
+ log.error("Error while retrieving table fields", e);
throw new DataSourcePluginException("Failed to get table fields",
e);
- } catch (ClassNotFoundException e) {
- // Log the exception and rethrow as DataSourcePluginException
- System.out.println("JDBC driver class not found" + e);
- throw new DataSourcePluginException("JDBC driver class not found",
e);
}
return tableFields;
}
@@ -170,7 +185,7 @@ public class Db2JdbcDataSourceChannel implements
DataSourceChannel {
private String getPrimaryKey(DatabaseMetaData metaData, String dbName,
String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, dbName,
tableName);
- while (primaryKeysInfo.next()) {
+ if (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
}
return null;
@@ -178,8 +193,15 @@ public class Db2JdbcDataSourceChannel implements
DataSourceChannel {
private Connection getConnection(Map<String, String> requestParams)
throws SQLException, ClassNotFoundException {
- // Ensure the DB2 JDBC driver is loaded
- Class.forName("com.ibm.db2.jcc.DB2Driver");
+ String driverClass =
+ requestParams.getOrDefault(
+ Db2OptionRule.DRIVER.key(),
+ Db2OptionRule.DriverType.DB2.getDriverClassName());
+ try {
+ Class.forName(driverClass);
+ } catch (ClassNotFoundException e) {
+ throw new DataSourcePluginException("DB2 jdbc driver " +
driverClass + " not found", e);
+ }
checkNotNull(requestParams.get(Db2OptionRule.URL.key()), "Jdbc url
cannot be null");
String url = requestParams.get(Db2OptionRule.URL.key());
if (requestParams.containsKey(Db2OptionRule.USER.key())) {
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
index e5e2b9b1..4947dffd 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
@@ -26,9 +26,7 @@ public class Db2OptionRule {
Options.key("url")
.stringType()
.noDefaultValue()
- .withDescription(
- "jdbc url, eg:"
- + "
jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
+ .withDescription("jdbc url, eg:" + "
jdbc:db2://localhost:50000/databaseName");
public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
index f7fae731..8d56224f 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
@@ -69,7 +69,11 @@ public class EngineDataType {
T_INT_ARRAY("array<int>", ArrayType.INT_ARRAY_TYPE),
T_LONG_ARRAY("array<bigint>", ArrayType.LONG_ARRAY_TYPE),
T_FLOAT_ARRAY("array<float>", ArrayType.FLOAT_ARRAY_TYPE),
- T_DOUBLE_ARRAY("array<double>", ArrayType.DOUBLE_ARRAY_TYPE);
+ T_DOUBLE_ARRAY("array<double>", ArrayType.DOUBLE_ARRAY_TYPE),
+ T_VARCHAR("varchar", BasicType.STRING_TYPE),
+ T_CHAR("char", BasicType.STRING_TYPE),
+ T_INTEGER("integer", BasicType.INT_TYPE),
+ T_DECIMAL_DEFAULT("decimal", BasicType.DOUBLE_TYPE);
@Getter private final String name;
@Getter private final SeaTunnelDataType<?> RawType;
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
index 327f9829..022148d6 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
@@ -199,7 +199,7 @@ public class Db2DataSourceConfigSwitcher extends
BaseJdbcDataSourceConfigSwitche
sb.append(", ");
}
}
- sb.append(" FROM ").append(quoteIdentifier(table));
+ sb.append(" FROM ").append(quoteIdentifier(database) + "." +
quoteIdentifier(table));
return sb.toString();
}
@@ -211,7 +211,9 @@ public class Db2DataSourceConfigSwitcher extends
BaseJdbcDataSourceConfigSwitche
protected String generateSinkSql(
List<String> tableFields, String database, String schema, String
table) {
StringBuilder sb = new StringBuilder();
- sb.append("INSERT INTO ").append(quoteIdentifier(table)).append(" (");
+ sb.append("INSERT INTO ")
+ .append(quoteIdentifier(database) + "." +
quoteIdentifier(table))
+ .append(" (");
// Append column names
for (int i = 0; i < tableFields.size(); i++) {