This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 75c9adb280 [Fix][Connector-xugu] Fix several bugs in the xugu
connector (#9820)
75c9adb280 is described below
commit 75c9adb280ea3e4e37cc0a935cf3a00aba5e9271
Author: Leon Yoah <[email protected]>
AuthorDate: Tue Sep 9 17:18:21 2025 +0800
[Fix][Connector-xugu] Fix several bugs in the xugu connector (#9820)
---
.../seatunnel/jdbc/catalog/xugu/XuguCatalog.java | 97 ++++++++++++--
.../catalog/xugu/XuguCreateTableSqlBuilder.java | 2 +-
.../jdbc/internal/dialect/xugu/XuguDialect.java | 11 +-
.../internal/dialect/xugu/XuguTypeConverter.java | 2 +-
.../connectors/seatunnel/jdbc/JdbcXuguIT.java | 141 ++++++++++++++++++++-
5 files changed, 231 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
index fab09c1852..0401e2d1ba 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
@@ -21,13 +21,18 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
+import org.apache.commons.lang3.StringUtils;
+
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
@@ -36,6 +41,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD;
@Slf4j
public class XuguCatalog extends AbstractJdbcCatalog {
@@ -98,11 +107,11 @@ public class XuguCatalog extends AbstractJdbcCatalog {
+ " c.DEF_VAL AS DEFAULT_VALUE,\n"
+ " c.NOT_NULl AS IS_NULLABLE\n"
+ " FROM\n"
- + " dba_columns c\n"
- + " LEFT JOIN dba_tables tab ON\n"
+ + " all_columns c\n"
+ + " LEFT JOIN all_tables tab ON\n"
+ " c.db_id = tab.db_id\n"
+ " AND c.table_id = tab.table_id\n"
- + " LEFT JOIN dba_schemas sc ON\n"
+ + " LEFT JOIN all_schemas sc ON\n"
+ " tab.schema_id = sc.schema_id\n"
+ " AND tab.db_id = sc.db_id\n"
+ " WHERE\n"
@@ -122,21 +131,44 @@ public class XuguCatalog extends AbstractJdbcCatalog {
@Override
protected String getDatabaseWithConditionSql(String databaseName) {
- return String.format(getListDatabaseSql() + " where DB_NAME = '%s'",
databaseName);
+ return String.format(getListDatabaseSql() + " where UPPER(DB_NAME) =
'%s'", databaseName);
}
@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
getListTableSql(tablePath.getDatabaseName())
- + " where user_name = '%s' and table_name = '%s'",
+ + " and s.schema_name = '%s' and t.table_name = '%s'",
tablePath.getSchemaName(),
tablePath.getTableName());
}
+ // "Test" and "TEST" are the same database
@Override
protected String getListDatabaseSql() {
- return "SELECT DB_NAME FROM dba_databases";
+ return "SELECT UPPER(DB_NAME) FROM all_databases";
+ }
+
+ // Rewrite the databaseExists method, and xugu will force the conversion
to uppercase
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ if (StringUtils.isBlank(databaseName)) {
+ return false;
+ }
+ try {
+ return querySQLResultExists(
+ defaultUrl,
getDatabaseWithConditionSql(databaseName.toUpperCase()));
+ } catch (SeaTunnelRuntimeException e) {
+ if
(e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+ log.warn(
+ "The catalog: {} is not supported the
getDatabaseWithConditionSql for databaseExists",
+ this.catalogName);
+ return listDatabases().contains(databaseName.toUpperCase());
+ }
+ throw e;
+ } catch (SQLException e) {
+ throw new SeaTunnelException("Failed to querySQLResult", e);
+ }
}
@Override
@@ -162,8 +194,10 @@ public class XuguCatalog extends AbstractJdbcCatalog {
@Override
protected String getListTableSql(String databaseName) {
- return "SELECT user_name ,table_name FROM all_users au \n"
- + "INNER JOIN all_tables at ON au.user_id=at.user_id AND
au.db_id=at.db_id";
+ return "select s.schema_name,t.table_name \n"
+ + "from all_schemas s,all_tables t\n"
+ + "where\n"
+ + "s.schema_id=t.schema_id";
}
@Override
@@ -243,11 +277,48 @@ public class XuguCatalog extends AbstractJdbcCatalog {
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
TablePath tablePath)
throws SQLException {
try {
- return getConstraintKeys(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getSchemaName(),
- tablePath.getTableName());
+ List<ConstraintKey> constraintKeys =
+ getConstraintKeys(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ // Block the unique constraint field name because all returned by
xugu are enclosed in
+ // double quotes
+ if (constraintKeys != null && !constraintKeys.isEmpty()) {
+ constraintKeys =
+ constraintKeys.stream()
+ .filter(Objects::nonNull)
+ .map(
+ constraintKey ->
+ ConstraintKey.of(
+
constraintKey.getConstraintType(),
+
constraintKey.getConstraintName(),
+
constraintKey.getColumnNames() != null
+ ?
constraintKey.getColumnNames()
+
.stream()
+
.filter(Objects::nonNull)
+ .map(
+
column ->
+
ConstraintKey
+
.ConstraintKeyColumn
+
.of(
+
column
+
.getColumnName()
+
!= null
+
? column.getColumnName()
+
.replace(
+
"\"",
+
"")
+
: null,
+
column
+
.getSortType()))
+
.collect(
+
Collectors.toList())
+ : null))
+ .collect(Collectors.toList());
+ }
+ return constraintKeys;
} catch (SQLException e) {
log.info("Obtain constraint failure", e);
return new ArrayList<>();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
index 170d013eeb..372b3868e9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
@@ -143,7 +143,7 @@ public class XuguCreateTableSqlBuilder {
columnCommentSql
.append(CatalogUtils.quoteIdentifier(column.getName(),
fieldIde, "\""))
.append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
- .append(column.getComment())
+ .append(column.getComment().replace("'", "''"))
.append("'");
return columnCommentSql.toString();
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
index adf2cf2190..3ee6093574 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -85,11 +86,6 @@ public class XuguDialect implements JdbcDialect {
return quoteIdentifier(tableName);
}
- @Override
- public String extractTableName(TablePath tablePath) {
- return tablePath.getSchemaAndTableName();
- }
-
@Override
public TablePath parse(String tablePath) {
return TablePath.of(tablePath, true);
@@ -107,6 +103,10 @@ public class XuguDialect implements JdbcDialect {
Arrays.stream(fieldNames)
.filter(fieldName ->
!Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
+ if (nonUniqueKeyFields.isEmpty()) {
+ throw new SeaTunnelException(
+ "The non-primary key field cannot be empty. Please set
other fields");
+ }
String valuesBinding =
Arrays.stream(fieldNames)
.map(fieldName -> ":" + fieldName + " " +
quoteIdentifier(fieldName))
@@ -139,7 +139,6 @@ public class XuguDialect implements JdbcDialect {
Arrays.stream(fieldNames)
.map(fieldName -> "SOURCE." +
quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));
-
String upsertSQL =
String.format(
" MERGE INTO %s TARGET"
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
index 54a8805f3b..38b0553124 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
@@ -33,7 +33,7 @@ import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
// reference
-//
https://docs.xugudb.com/%E8%99%9A%E8%B0%B7%E6%95%B0%E6%8D%AE%E5%BA%93%E5%AF%B9%E5%A4%96%E5%8F%91%E5%B8%83/06%E5%8F%82%E8%80%83%E6%8C%87%E5%8D%97/SQL%E8%AF%AD%E6%B3%95%E5%8F%82%E8%80%83/%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B/%E6%A6%82%E8%BF%B0/
+// https://docs.xugudb.com/content/reference/sql/datatype/numerical
@Slf4j
@AutoService(TypeConverter.class)
public class XuguTypeConverter implements TypeConverter<BasicTypeDefine> {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
index ab3d0450bd..23915ecc40 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu.XuguCatalog;
@@ -26,6 +28,8 @@ import
org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -245,7 +249,142 @@ public class JdbcXuguIT extends AbstractJdbcIT {
jdbcCase.getPassword(),
JdbcUrlUtil.getUrlInfo(jdbcUrl),
XUGU_SCHEMA,
- null);
+ DRIVER_CLASS);
catalog.open();
}
+
+ // Catalog test methods transferred from XuguCatalogTest
+ @Test
+ void testListDatabases() {
+ // Test listing databases functionality
+ List<String> databases = catalog.listDatabases();
+ Assertions.assertNotNull(databases, "Database list should not be
null");
+ Assertions.assertFalse(databases.isEmpty(), "Database list should not
be empty");
+ }
+
+ @Test
+ void testDatabaseExists() {
+ // Test specific database existence with case sensitivity
+ Assertions.assertTrue(
+ catalog.databaseExists(XUGU_DATABASE), "SYSTEM database should
exist");
+ Assertions.assertTrue(
+ catalog.databaseExists(XUGU_DATABASE.toUpperCase()),
+ "Database existence check should be case-insensitive
(uppercase)");
+
+ // Test mixed case scenarios for SYSTEM database
+ Assertions.assertTrue(catalog.databaseExists("system"), "system should
exist (lowercase)");
+ Assertions.assertTrue(catalog.databaseExists("System"), "System should
exist (mixed case)");
+
+ // Test non-existent database
+ Assertions.assertFalse(
+ catalog.databaseExists("NON_EXISTENT_DB"),
+ "Non-existent database should return false");
+ }
+
+ @Test
+ void testTableExists() {
+ // Test specific table existence
+ TablePath testTablePath = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA,
XUGU_SOURCE);
+ Assertions.assertTrue(
+ catalog.tableExists(testTablePath),
+ "e2e_table_source should exist in SYSDBA schema");
+
+ // Test case-insensitive database name handling
+ TablePath lowerCaseDatabasePath =
+ TablePath.of(XUGU_DATABASE.toLowerCase(), XUGU_SCHEMA,
XUGU_SOURCE);
+ Assertions.assertTrue(
+ catalog.tableExists(lowerCaseDatabasePath),
+ "Table existence check should be case-insensitive for database
name");
+
+ // Test non-existent table
+ TablePath nonExistentTable = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA,
"NON_EXISTENT_TABLE");
+ Assertions.assertFalse(
+ catalog.tableExists(nonExistentTable), "Non-existent table
should return false");
+ }
+
+ @Test
+ void testGetTable() {
+ // Test getting specific table metadata
+ TablePath testTablePath = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA,
XUGU_SOURCE);
+ CatalogTable table = catalog.getTable(testTablePath);
+
+ Assertions.assertNotNull(table, "Table metadata should not be null");
+ Assertions.assertNotNull(table.getTableSchema(), "Table schema should
not be null");
+ Assertions.assertEquals(
+ XUGU_SOURCE, table.getTableId().getTableName(), "Table name
should match");
+ Assertions.assertEquals(
+ XUGU_SCHEMA, table.getTableId().getSchemaName(), "Schema name
should match");
+ Assertions.assertEquals(
+ XUGU_DATABASE, table.getTableId().getDatabaseName(), "Database
name should match");
+
+ // Test that table has columns
+ Assertions.assertNotNull(table.getTableSchema().getColumns(), "Table
should have columns");
+ Assertions.assertFalse(
+ table.getTableSchema().getColumns().isEmpty(),
+ "e2e_table_source should have columns");
+ }
+
+ @Test
+ void testGetConstraintKeys() {
+ // Test constraint keys for specific table
+ TablePath testTablePath = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA,
XUGU_SOURCE);
+ CatalogTable table = catalog.getTable(testTablePath);
+
+ Assertions.assertNotNull(table, "Table should not be null");
+ Assertions.assertNotNull(table.getTableSchema(), "Table schema should
not be null");
+ Assertions.assertNotNull(
+ table.getTableSchema().getConstraintKeys(), "Constraint keys
should not be null");
+
+ // Test Xugu-specific constraint key processing (removes double quotes)
+ table.getTableSchema()
+ .getConstraintKeys()
+ .forEach(
+ constraintKey -> {
+ if (constraintKey.getColumnNames() != null) {
+ constraintKey
+ .getColumnNames()
+ .forEach(
+ column -> {
+ if (column.getColumnName()
!= null) {
+ Assertions.assertFalse(
+
column.getColumnName()
+
.contains("\""),
+ "Column names
should not contain double quotes after Xugu processing");
+ }
+ });
+ }
+ });
+ }
+
+ @Test
+ void testXuguCaseInsensitiveDatabaseHandling() {
+ // Test Xugu's specific case-insensitive database name handling
+ // Xugu forces database names to uppercase internally
+ List<String> databases = catalog.listDatabases();
+ if (!databases.isEmpty()) {
+ String firstDatabase = databases.get(0);
+
+ // Test that all returned database names are uppercase (Xugu
behavior)
+ Assertions.assertEquals(
+ firstDatabase.toUpperCase(),
+ firstDatabase,
+ "Xugu should return database names in uppercase");
+
+ // Test various case combinations all resolve to the same database
+ String[] testCases = {
+ firstDatabase,
+ firstDatabase.toLowerCase(),
+ firstDatabase.toUpperCase(),
+ firstDatabase.substring(0, 1).toLowerCase() +
firstDatabase.substring(1),
+ firstDatabase.substring(0, 1).toUpperCase()
+ + firstDatabase.substring(1).toLowerCase()
+ };
+
+ for (String testCase : testCases) {
+ Assertions.assertTrue(
+ catalog.databaseExists(testCase),
+ "Database existence check should work for case
variant: " + testCase);
+ }
+ }
+ }
}