This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 75c9adb280 [Fix][Connector-xugu] Fix several bugs in the xugu 
connector (#9820)
75c9adb280 is described below

commit 75c9adb280ea3e4e37cc0a935cf3a00aba5e9271
Author: Leon Yoah <[email protected]>
AuthorDate: Tue Sep 9 17:18:21 2025 +0800

    [Fix][Connector-xugu] Fix several bugs in the xugu connector (#9820)
---
 .../seatunnel/jdbc/catalog/xugu/XuguCatalog.java   |  97 ++++++++++++--
 .../catalog/xugu/XuguCreateTableSqlBuilder.java    |   2 +-
 .../jdbc/internal/dialect/xugu/XuguDialect.java    |  11 +-
 .../internal/dialect/xugu/XuguTypeConverter.java   |   2 +-
 .../connectors/seatunnel/jdbc/JdbcXuguIT.java      | 141 ++++++++++++++++++++-
 5 files changed, 231 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
index fab09c1852..0401e2d1ba 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
@@ -21,13 +21,18 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
 
+import org.apache.commons.lang3.StringUtils;
+
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
@@ -36,6 +41,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD;
 
 @Slf4j
 public class XuguCatalog extends AbstractJdbcCatalog {
@@ -98,11 +107,11 @@ public class XuguCatalog extends AbstractJdbcCatalog {
                     + "        c.DEF_VAL AS DEFAULT_VALUE,\n"
                     + "        c.NOT_NULl AS IS_NULLABLE\n"
                     + "    FROM\n"
-                    + "        dba_columns c\n"
-                    + "    LEFT JOIN dba_tables tab ON\n"
+                    + "        all_columns c\n"
+                    + "    LEFT JOIN all_tables tab ON\n"
                     + "        c.db_id = tab.db_id\n"
                     + "        AND c.table_id = tab.table_id\n"
-                    + "    LEFT JOIN dba_schemas sc ON\n"
+                    + "    LEFT JOIN all_schemas sc ON\n"
                     + "        tab.schema_id = sc.schema_id\n"
                     + "        AND tab.db_id = sc.db_id\n"
                     + "    WHERE\n"
@@ -122,21 +131,44 @@ public class XuguCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected String getDatabaseWithConditionSql(String databaseName) {
-        return String.format(getListDatabaseSql() + "  where DB_NAME = '%s'", 
databaseName);
+        return String.format(getListDatabaseSql() + "  where UPPER(DB_NAME) = 
'%s'", databaseName);
     }
 
     @Override
     protected String getTableWithConditionSql(TablePath tablePath) {
         return String.format(
                 getListTableSql(tablePath.getDatabaseName())
-                        + "  where user_name = '%s' and table_name = '%s'",
+                        + "  and s.schema_name = '%s' and t.table_name = '%s'",
                 tablePath.getSchemaName(),
                 tablePath.getTableName());
     }
 
+    // "Test" and "TEST" are the same database
     @Override
     protected String getListDatabaseSql() {
-        return "SELECT DB_NAME FROM dba_databases";
+        return "SELECT UPPER(DB_NAME) FROM all_databases";
+    }
+
+    // Rewrite the databaseExists method, and xugu will force the conversion 
to uppercase
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        if (StringUtils.isBlank(databaseName)) {
+            return false;
+        }
+        try {
+            return querySQLResultExists(
+                    defaultUrl, 
getDatabaseWithConditionSql(databaseName.toUpperCase()));
+        } catch (SeaTunnelRuntimeException e) {
+            if 
(e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+                log.warn(
+                        "The catalog: {} is not supported the 
getDatabaseWithConditionSql for databaseExists",
+                        this.catalogName);
+                return listDatabases().contains(databaseName.toUpperCase());
+            }
+            throw e;
+        } catch (SQLException e) {
+            throw new SeaTunnelException("Failed to querySQLResult", e);
+        }
     }
 
     @Override
@@ -162,8 +194,10 @@ public class XuguCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected String getListTableSql(String databaseName) {
-        return "SELECT user_name ,table_name FROM all_users au \n"
-                + "INNER JOIN all_tables at ON au.user_id=at.user_id AND 
au.db_id=at.db_id";
+        return "select s.schema_name,t.table_name \n"
+                + "from all_schemas s,all_tables t\n"
+                + "where\n"
+                + "s.schema_id=t.schema_id";
     }
 
     @Override
@@ -243,11 +277,48 @@ public class XuguCatalog extends AbstractJdbcCatalog {
     protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
             throws SQLException {
         try {
-            return getConstraintKeys(
-                    metaData,
-                    tablePath.getDatabaseName(),
-                    tablePath.getSchemaName(),
-                    tablePath.getTableName());
+            List<ConstraintKey> constraintKeys =
+                    getConstraintKeys(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
+            // Block the unique constraint field name because all returned by 
xugu are enclosed in
+            // double quotes
+            if (constraintKeys != null && !constraintKeys.isEmpty()) {
+                constraintKeys =
+                        constraintKeys.stream()
+                                .filter(Objects::nonNull)
+                                .map(
+                                        constraintKey ->
+                                                ConstraintKey.of(
+                                                        
constraintKey.getConstraintType(),
+                                                        
constraintKey.getConstraintName(),
+                                                        
constraintKey.getColumnNames() != null
+                                                                ? 
constraintKey.getColumnNames()
+                                                                        
.stream()
+                                                                        
.filter(Objects::nonNull)
+                                                                        .map(
+                                                                               
 column ->
+                                                                               
         ConstraintKey
+                                                                               
                 .ConstraintKeyColumn
+                                                                               
                 .of(
+                                                                               
                         column
+                                                                               
                                                 .getColumnName()
+                                                                               
                                         != null
+                                                                               
                                 ? column.getColumnName()
+                                                                               
                                         .replace(
+                                                                               
                                                 "\"",
+                                                                               
                                                 "")
+                                                                               
                                 : null,
+                                                                               
                         column
+                                                                               
                                 .getSortType()))
+                                                                        
.collect(
+                                                                               
 Collectors.toList())
+                                                                : null))
+                                .collect(Collectors.toList());
+            }
+            return constraintKeys;
         } catch (SQLException e) {
             log.info("Obtain constraint failure", e);
             return new ArrayList<>();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
index 170d013eeb..372b3868e9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCreateTableSqlBuilder.java
@@ -143,7 +143,7 @@ public class XuguCreateTableSqlBuilder {
         columnCommentSql
                 .append(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "\""))
                 .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
-                .append(column.getComment())
+                .append(column.getComment().replace("'", "''"))
                 .append("'");
         return columnCommentSql.toString();
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
index adf2cf2190..3ee6093574 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -85,11 +86,6 @@ public class XuguDialect implements JdbcDialect {
         return quoteIdentifier(tableName);
     }
 
-    @Override
-    public String extractTableName(TablePath tablePath) {
-        return tablePath.getSchemaAndTableName();
-    }
-
     @Override
     public TablePath parse(String tablePath) {
         return TablePath.of(tablePath, true);
@@ -107,6 +103,10 @@ public class XuguDialect implements JdbcDialect {
                 Arrays.stream(fieldNames)
                         .filter(fieldName -> 
!Arrays.asList(uniqueKeyFields).contains(fieldName))
                         .collect(Collectors.toList());
+        if (nonUniqueKeyFields.isEmpty()) {
+            throw new SeaTunnelException(
+                    "The non-primary key field cannot be empty. Please set 
other fields");
+        }
         String valuesBinding =
                 Arrays.stream(fieldNames)
                         .map(fieldName -> ":" + fieldName + " " + 
quoteIdentifier(fieldName))
@@ -139,7 +139,6 @@ public class XuguDialect implements JdbcDialect {
                 Arrays.stream(fieldNames)
                         .map(fieldName -> "SOURCE." + 
quoteIdentifier(fieldName))
                         .collect(Collectors.joining(", "));
-
         String upsertSQL =
                 String.format(
                         " MERGE INTO %s TARGET"
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
index 54a8805f3b..38b0553124 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguTypeConverter.java
@@ -33,7 +33,7 @@ import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 // reference
-// 
https://docs.xugudb.com/%E8%99%9A%E8%B0%B7%E6%95%B0%E6%8D%AE%E5%BA%93%E5%AF%B9%E5%A4%96%E5%8F%91%E5%B8%83/06%E5%8F%82%E8%80%83%E6%8C%87%E5%8D%97/SQL%E8%AF%AD%E6%B3%95%E5%8F%82%E8%80%83/%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B/%E6%A6%82%E8%BF%B0/
+// https://docs.xugudb.com/content/reference/sql/datatype/numerical
 @Slf4j
 @AutoService(TypeConverter.class)
 public class XuguTypeConverter implements TypeConverter<BasicTypeDefine> {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
index ab3d0450bd..23915ecc40 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcXuguIT.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu.XuguCatalog;
@@ -26,6 +28,8 @@ import 
org.apache.seatunnel.e2e.common.container.TestContainer;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -245,7 +249,142 @@ public class JdbcXuguIT extends AbstractJdbcIT {
                         jdbcCase.getPassword(),
                         JdbcUrlUtil.getUrlInfo(jdbcUrl),
                         XUGU_SCHEMA,
-                        null);
+                        DRIVER_CLASS);
         catalog.open();
     }
+
+    // Catalog test methods transferred from XuguCatalogTest
+    @Test
+    void testListDatabases() {
+        // Test listing databases functionality
+        List<String> databases = catalog.listDatabases();
+        Assertions.assertNotNull(databases, "Database list should not be 
null");
+        Assertions.assertFalse(databases.isEmpty(), "Database list should not 
be empty");
+    }
+
+    @Test
+    void testDatabaseExists() {
+        // Test specific database existence with case sensitivity
+        Assertions.assertTrue(
+                catalog.databaseExists(XUGU_DATABASE), "SYSTEM database should 
exist");
+        Assertions.assertTrue(
+                catalog.databaseExists(XUGU_DATABASE.toUpperCase()),
+                "Database existence check should be case-insensitive 
(uppercase)");
+
+        // Test mixed case scenarios for SYSTEM database
+        Assertions.assertTrue(catalog.databaseExists("system"), "system should 
exist (lowercase)");
+        Assertions.assertTrue(catalog.databaseExists("System"), "System should 
exist (mixed case)");
+
+        // Test non-existent database
+        Assertions.assertFalse(
+                catalog.databaseExists("NON_EXISTENT_DB"),
+                "Non-existent database should return false");
+    }
+
+    @Test
+    void testTableExists() {
+        // Test specific table existence
+        TablePath testTablePath = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA, 
XUGU_SOURCE);
+        Assertions.assertTrue(
+                catalog.tableExists(testTablePath),
+                "e2e_table_source should exist in SYSDBA schema");
+
+        // Test case-insensitive database name handling
+        TablePath lowerCaseDatabasePath =
+                TablePath.of(XUGU_DATABASE.toLowerCase(), XUGU_SCHEMA, 
XUGU_SOURCE);
+        Assertions.assertTrue(
+                catalog.tableExists(lowerCaseDatabasePath),
+                "Table existence check should be case-insensitive for database 
name");
+
+        // Test non-existent table
+        TablePath nonExistentTable = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA, 
"NON_EXISTENT_TABLE");
+        Assertions.assertFalse(
+                catalog.tableExists(nonExistentTable), "Non-existent table 
should return false");
+    }
+
+    @Test
+    void testGetTable() {
+        // Test getting specific table metadata
+        TablePath testTablePath = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA, 
XUGU_SOURCE);
+        CatalogTable table = catalog.getTable(testTablePath);
+
+        Assertions.assertNotNull(table, "Table metadata should not be null");
+        Assertions.assertNotNull(table.getTableSchema(), "Table schema should 
not be null");
+        Assertions.assertEquals(
+                XUGU_SOURCE, table.getTableId().getTableName(), "Table name 
should match");
+        Assertions.assertEquals(
+                XUGU_SCHEMA, table.getTableId().getSchemaName(), "Schema name 
should match");
+        Assertions.assertEquals(
+                XUGU_DATABASE, table.getTableId().getDatabaseName(), "Database 
name should match");
+
+        // Test that table has columns
+        Assertions.assertNotNull(table.getTableSchema().getColumns(), "Table 
should have columns");
+        Assertions.assertFalse(
+                table.getTableSchema().getColumns().isEmpty(),
+                "e2e_table_source should have columns");
+    }
+
+    @Test
+    void testGetConstraintKeys() {
+        // Test constraint keys for specific table
+        TablePath testTablePath = TablePath.of(XUGU_DATABASE, XUGU_SCHEMA, 
XUGU_SOURCE);
+        CatalogTable table = catalog.getTable(testTablePath);
+
+        Assertions.assertNotNull(table, "Table should not be null");
+        Assertions.assertNotNull(table.getTableSchema(), "Table schema should 
not be null");
+        Assertions.assertNotNull(
+                table.getTableSchema().getConstraintKeys(), "Constraint keys 
should not be null");
+
+        // Test Xugu-specific constraint key processing (removes double quotes)
+        table.getTableSchema()
+                .getConstraintKeys()
+                .forEach(
+                        constraintKey -> {
+                            if (constraintKey.getColumnNames() != null) {
+                                constraintKey
+                                        .getColumnNames()
+                                        .forEach(
+                                                column -> {
+                                                    if (column.getColumnName() 
!= null) {
+                                                        Assertions.assertFalse(
+                                                                
column.getColumnName()
+                                                                        
.contains("\""),
+                                                                "Column names 
should not contain double quotes after Xugu processing");
+                                                    }
+                                                });
+                            }
+                        });
+    }
+
+    @Test
+    void testXuguCaseInsensitiveDatabaseHandling() {
+        // Test Xugu's specific case-insensitive database name handling
+        // Xugu forces database names to uppercase internally
+        List<String> databases = catalog.listDatabases();
+        if (!databases.isEmpty()) {
+            String firstDatabase = databases.get(0);
+
+            // Test that all returned database names are uppercase (Xugu 
behavior)
+            Assertions.assertEquals(
+                    firstDatabase.toUpperCase(),
+                    firstDatabase,
+                    "Xugu should return database names in uppercase");
+
+            // Test various case combinations all resolve to the same database
+            String[] testCases = {
+                firstDatabase,
+                firstDatabase.toLowerCase(),
+                firstDatabase.toUpperCase(),
+                firstDatabase.substring(0, 1).toLowerCase() + 
firstDatabase.substring(1),
+                firstDatabase.substring(0, 1).toUpperCase()
+                        + firstDatabase.substring(1).toLowerCase()
+            };
+
+            for (String testCase : testCases) {
+                Assertions.assertTrue(
+                        catalog.databaseExists(testCase),
+                        "Database existence check should work for case 
variant: " + testCase);
+            }
+        }
+    }
 }

Reply via email to