This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f012b2a6f0 [Improve][Connector-v2] Optimize the way of databases and
tables are checked for existence (#7261)
f012b2a6f0 is described below
commit f012b2a6f093bcad49fbd59946e75ce6d6a97838
Author: dailai <[email protected]>
AuthorDate: Mon Jul 29 21:51:59 2024 +0800
[Improve][Connector-v2] Optimize the way of databases and tables are
checked for existence (#7261)
---
.../seatunnel/common/exception/CommonError.java | 8 +++
.../common/exception/CommonErrorCode.java | 1 +
.../jdbc/catalog/AbstractJdbcCatalog.java | 70 +++++++++++++++++++---
.../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 30 +++++-----
.../seatunnel/jdbc/catalog/iris/IrisCatalog.java | 29 +++++----
.../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 17 ++++++
.../catalog/oceanbase/OceanBaseOracleCatalog.java | 33 +++++-----
.../jdbc/catalog/oracle/OracleCatalog.java | 39 ++++++------
.../jdbc/catalog/psql/PostgresCatalog.java | 35 +++++------
.../jdbc/catalog/redshift/RedshiftCatalog.java | 40 +++++--------
.../jdbc/catalog/saphana/SapHanaCatalog.java | 30 ++++------
.../jdbc/catalog/sqlserver/SqlServerCatalog.java | 29 +++++----
.../seatunnel/jdbc/catalog/xugu/XuguCatalog.java | 32 +++++-----
.../jdbc/catalog/mysql/MySqlCatalogTest.java | 15 ++++-
.../jdbc/catalog/oracle/OracleCatalogTest.java | 33 ++++++++--
.../jdbc/catalog/psql/PostgresCatalogTest.java | 32 ++++++++--
.../catalog/sqlserver/SqlServerCatalogTest.java | 14 +++--
..._mysql_source_and_sink_with_multiple_tables.sql | 4 +-
18 files changed, 309 insertions(+), 182 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index 782a071d01..e9adf4d70a 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -285,4 +285,12 @@ public class CommonError {
params.put("field", field);
return new
SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATE_ERROR, params);
}
+
+ public static SeaTunnelRuntimeException unsupportedMethod(
+ String identifier, String methodName) {
+ Map<String, String> params = new HashMap<>();
+ params.put("identifier", identifier);
+ params.put("methodName", methodName);
+ return new
SeaTunnelRuntimeException(CommonErrorCode.UNSUPPORTED_METHOD, params);
+ }
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 5893924848..79621c4216 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -77,6 +77,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
FORMAT_DATETIME_ERROR(
"COMMON-33",
"The datetime format '<datetime>' of field '<field>' is not
supported. Please check the datetime format."),
+ UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method
'<methodName>'"),
;
private final String code;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index a033d0eaac..8d0301b492 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -44,6 +44,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -63,11 +65,14 @@ import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD;
+@Slf4j
public abstract class AbstractJdbcCatalog implements Catalog {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
protected static final Set<String> SYS_DATABASES = new HashSet<>();
+ protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>();
protected final String catalogName;
protected final String defaultDatabase;
@@ -259,6 +264,10 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
throw new UnsupportedOperationException();
}
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ throw CommonError.unsupportedMethod(this.catalogName,
"getDatabaseWithConditionSql");
+ }
+
@Override
public List<String> listDatabases() throws CatalogException {
try {
@@ -277,15 +286,35 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
@Override
public boolean databaseExists(String databaseName) throws CatalogException
{
- checkArgument(StringUtils.isNotBlank(databaseName));
-
- return listDatabases().contains(databaseName);
+ if (StringUtils.isBlank(databaseName)) {
+ return false;
+ }
+ if (SYS_DATABASES.contains(databaseName)) {
+ return false;
+ }
+ try {
+ return querySQLResultExists(
+ getUrlFromDatabaseName(databaseName),
+ getDatabaseWithConditionSql(databaseName));
+ } catch (SeaTunnelRuntimeException e) {
+ if
(e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+ log.warn(
+ "The catalog: {} is not supported the
getDatabaseWithConditionSql for databaseExists",
+ this.catalogName);
+ return listDatabases().contains(databaseName);
+ }
+ throw e;
+ }
}
protected String getListTableSql(String databaseName) {
throw new UnsupportedOperationException();
}
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ throw CommonError.unsupportedMethod(this.catalogName,
"getTableWithConditionSql");
+ }
+
protected String getTableName(ResultSet rs) throws SQLException {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
@@ -317,12 +346,28 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return databaseExists(tablePath.getDatabaseName())
- &&
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
- } catch (DatabaseNotExistException e) {
+ String databaseName = tablePath.getDatabaseName();
+ if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
+ try {
+ return querySQLResultExists(
+ this.getUrlFromDatabaseName(databaseName),
getTableWithConditionSql(tablePath));
+ } catch (SeaTunnelRuntimeException e1) {
+ if
(e1.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+ log.warn(
+ "The catalog: {} is not supported the
getTableWithConditionSql for tableExists ",
+ this.catalogName);
+ try {
+ return databaseExists(tablePath.getDatabaseName())
+ && listTables(tablePath.getDatabaseName())
+ .contains(getTableName(tablePath));
+ } catch (DatabaseNotExistException e2) {
+ return false;
+ }
+ }
+ throw e1;
+ }
}
@Override
@@ -528,6 +573,17 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
}
}
+ protected boolean querySQLResultExists(String dbUrl, String sql) {
+ try (PreparedStatement stmt =
getConnection(dbUrl).prepareStatement(sql)) {
+ try (ResultSet rs = stmt.executeQuery()) {
+ return rs.next();
+ }
+ } catch (Exception e) {
+ log.info("query exists error", e);
+ return false;
+ }
+ }
+
// If sql is DDL, the execute() method always returns false, so the return
value
// should not be used to determine whether changes were made in database.
protected boolean executeInternal(String url, String sql) throws
SQLException {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index 3796a76025..ede65bc8a0 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -30,8 +30,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeMapper;
-import org.apache.commons.lang3.StringUtils;
-
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
@@ -70,6 +68,20 @@ public class DamengCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(getListDatabaseSql() + " where name = '%s'",
databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ getListTableSql(tablePath.getDatabaseName())
+ + " where OWNER = '%s' and TABLE_NAME = '%s'",
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
+
@Override
protected String getListDatabaseSql() {
return "SELECT name FROM v$database";
@@ -145,20 +157,6 @@ public class DamengCatalog extends AbstractJdbcCatalog {
return tablePath.getSchemaAndTableName();
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- }
- return listTables().contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
index 40f08dc50b..02e58ea857 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java
@@ -57,7 +57,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
public class IrisCatalog extends AbstractJdbcCatalog {
private static final String LIST_TABLES_SQL_TEMPLATE =
- "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables
WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE !=
'SYSTEM VIEW';";
+ "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables
WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE !=
'SYSTEM VIEW'";
public IrisCatalog(
String catalogName, String username, String password,
JdbcUrlUtil.UrlInfo urlInfo) {
@@ -101,13 +101,6 @@ public class IrisCatalog extends AbstractJdbcCatalog {
return schemaName + "." + tableName;
}
- // @Override
- // protected String getSelectColumnsSql(TablePath tablePath) {
- // return String.format(
- // SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(),
- // tablePath.getTableName());
- // }
-
@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
@@ -144,12 +137,24 @@ public class IrisCatalog extends AbstractJdbcCatalog {
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return listTables(tablePath.getSchemaName())
- .contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
+ if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
+ return querySQLResultExists(
+ this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+ getTableWithConditionSql(tablePath));
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ getListTableSql(tablePath.getSchemaName()) + " and TABLE_NAME
= '%s'",
+ tablePath.getTableName());
+ }
+
+ @Override
+ protected String getUrlFromDatabaseName(String databaseName) {
+ return defaultUrl;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 6b263b0fd4..e2df8ab24b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -51,6 +51,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA =
'%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";
+ private static final String SELECT_DATABASE_EXISTS =
+ "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE
SCHEMA_NAME = '%s'";
+
+ private static final String SELECT_TABLE_EXISTS =
+ "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables
WHERE table_schema = '%s' AND table_name = '%s'";
+
static {
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("mysql");
@@ -68,6 +74,17 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
this.typeConverter = new MySqlTypeConverter(version);
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(SELECT_DATABASE_EXISTS, databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ SELECT_TABLE_EXISTS, tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
@Override
protected String getListDatabaseSql() {
return "SHOW DATABASES;";
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
index b4ece7db9c..b98f4c4c2b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
@@ -25,8 +25,6 @@ import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistExcepti
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@@ -34,9 +32,10 @@ import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
public class OceanBaseOracleCatalog extends OracleCatalog {
static {
- EXCLUDED_SCHEMAS =
- Collections.unmodifiableList(
- Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR",
"SYS"));
+ EXCLUDED_SCHEMAS.add("oceanbase");
+ EXCLUDED_SCHEMAS.add("LBACSYS");
+ EXCLUDED_SCHEMAS.add("ORAAUDITOR");
+ EXCLUDED_SCHEMAS.add("SYS");
}
public OceanBaseOracleCatalog(
@@ -53,6 +52,21 @@ public class OceanBaseOracleCatalog extends OracleCatalog {
throw new UnsupportedOperationException();
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
+ return false;
+ }
+ return querySQLResultExists(
+ this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
+ getTableWithConditionSql(tablePath));
+ }
+
@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
@@ -65,15 +79,6 @@ public class OceanBaseOracleCatalog extends OracleCatalog {
}
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index b51369e3f5..1430cb387a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -30,8 +28,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper;
-import org.apache.commons.lang3.StringUtils;
-
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
@@ -46,7 +42,7 @@ import java.util.List;
@Slf4j
public class OracleCatalog extends AbstractJdbcCatalog {
- protected static List<String> EXCLUDED_SCHEMAS =
+ protected static List<String> EXCLUDED_SCHEMAS_ALL =
Collections.unmodifiableList(
Arrays.asList(
"APPQOSSYS",
@@ -101,6 +97,10 @@ public class OracleCatalog extends AbstractJdbcCatalog {
+ "ORDER BY \n"
+ " cols.column_id \n";
+ static {
+ EXCLUDED_SCHEMAS.addAll(EXCLUDED_SCHEMAS_ALL);
+ }
+
public OracleCatalog(
String catalogName,
String username,
@@ -110,6 +110,21 @@ public class OracleCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(getListDatabaseSql() + " where name = '%s'",
databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return getListTableSql(tablePath.getDatabaseName())
+ + " and OWNER = '"
+ + tablePath.getSchemaName()
+ + "' and table_name = '"
+ + tablePath.getTableName()
+ + "'";
+ }
+
@Override
protected String getListDatabaseSql() {
return "SELECT name FROM v$database";
@@ -191,20 +206,6 @@ public class OracleCatalog extends AbstractJdbcCatalog {
return tablePath.getSchemaAndTableName();
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- }
- return listTables().contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index 4697d1999e..d5261e16d5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -30,7 +29,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.Post
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeMapper;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
@@ -104,14 +102,28 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(getListDatabaseSql() + " where datname = '%s'",
databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ getListTableSql(tablePath.getDatabaseName())
+ + " where table_schema = '%s' and table_name= '%s'",
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
+
@Override
protected String getListDatabaseSql() {
- return "select datname from pg_database;";
+ return "select datname from pg_database";
}
@Override
protected String getListTableSql(String databaseName) {
- return "SELECT table_schema, table_name FROM
information_schema.tables;";
+ return "SELECT table_schema, table_name FROM
information_schema.tables";
}
@Override
@@ -231,21 +243,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
super.dropDatabaseInternal(databaseName);
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- }
-
- return
listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
index 7b29bbb8ea..064b247337 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -31,23 +30,17 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeMapper;
-import org.apache.commons.lang3.StringUtils;
-
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class RedshiftCatalog extends AbstractJdbcCatalog {
- protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>(4);
-
private final String SELECT_COLUMNS =
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA =
'%s' AND TABLE_NAME ='%s' ORDER BY ordinal_position ASC";
@@ -80,6 +73,20 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
this.connectionMap = new ConcurrentHashMap<>();
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(getListDatabaseSql() + " where datname = '%s'",
databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ getListTableSql(tablePath.getDatabaseName())
+ + " where table_schema = '%s' and table_name = '%s'",
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
+
@Override
public void close() throws CatalogException {
for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) {
@@ -95,12 +102,12 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
@Override
protected String getListDatabaseSql() {
- return "select datname from pg_database;";
+ return "select datname from pg_database";
}
@Override
protected String getListTableSql(String databaseName) {
- return "SELECT table_schema, table_name FROM
information_schema.tables;";
+ return "SELECT table_schema, table_name FROM
information_schema.tables";
}
@Override
@@ -144,21 +151,6 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
return String.format("DROP DATABASE `%s`;", databaseName);
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
-
.contains(tablePath.getSchemaAndTableName().toLowerCase());
- }
- return listTables(defaultDatabase)
- .contains(tablePath.getSchemaAndTableName().toLowerCase());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
@Override
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(SELECT_COLUMNS, tablePath.getSchemaName(),
tablePath.getTableName());
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
index df6f4b3c24..19b8f668af 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java
@@ -22,8 +22,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -31,8 +29,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;
-import org.apache.commons.lang3.StringUtils;
-
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
@@ -113,6 +109,18 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(getListDatabaseSql() + " where SCHEMA_NAME =
'%s'", databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ getListTableSql(tablePath.getDatabaseName()) + " and
TABLE_NAME = '%s'",
+ tablePath.getTableName());
+ }
+
@Override
protected String getListDatabaseSql() {
return "SELECT SCHEMA_NAME FROM SCHEMAS";
@@ -203,20 +211,6 @@ public class SapHanaCatalog extends AbstractJdbcCatalog {
return tablePath.getTableName();
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getTableName());
- }
- return listTables().contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 55660b36a2..e4c6351522 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -69,6 +68,20 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(getListDatabaseSql() + " where name = '%s'",
databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ getListTableSql(tablePath.getDatabaseName())
+ + " and TABLE_SCHEMA = '%s' and TABLE_NAME = '%s'",
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
+
@Override
protected String getListDatabaseSql() {
return "SELECT NAME FROM sys.databases";
@@ -147,20 +160,6 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
return baseUrl + ";databaseName=" + databaseName + ";" + suffix;
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- }
- return
listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
index 462e109c76..a0b28e49ab 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
@@ -30,8 +28,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
-import org.apache.commons.lang3.StringUtils;
-
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
@@ -128,6 +124,20 @@ public class XuguCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}
+ @Override
+ protected String getDatabaseWithConditionSql(String databaseName) {
+ return String.format(getListDatabaseSql() + " where DB_NAME = '%s'",
databaseName);
+ }
+
+ @Override
+ protected String getTableWithConditionSql(TablePath tablePath) {
+ return String.format(
+ getListTableSql(tablePath.getDatabaseName())
+ + " where user_name = '%s' and table_name = '%s'",
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
+
@Override
protected String getListDatabaseSql() {
return "SELECT DB_NAME FROM dba_databases";
@@ -210,20 +220,6 @@ public class XuguCatalog extends AbstractJdbcCatalog {
return tablePath.getSchemaAndTableName();
}
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
- return databaseExists(tablePath.getDatabaseName())
- && listTables(tablePath.getDatabaseName())
- .contains(tablePath.getSchemaAndTableName());
- }
- return listTables().contains(tablePath.getSchemaAndTableName());
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
index daf87b3693..bc89d4c8c3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServe
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
@@ -39,7 +40,8 @@ class MySqlCatalogTest {
static JdbcUrlUtil.UrlInfo sqlParse =
SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB");
static JdbcUrlUtil.UrlInfo MysqlUrlInfo =
-
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false");
+ JdbcUrlUtil.getUrlInfo(
+
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false&allowPublicKeyRetrieval=true");
static JdbcUrlUtil.UrlInfo pg =
JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest");
static TablePath tablePathSQL;
@@ -74,13 +76,22 @@ class MySqlCatalogTest {
tablePathPG = TablePath.of(databaseName, "pg_to_mysql");
tablePathOracle = TablePath.of(databaseName, "oracle_to_mysql");
sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123",
sqlParse, null);
- mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123",
MysqlUrlInfo);
+ mySqlCatalog = new MySqlCatalog("mysql", "root", "123456",
MysqlUrlInfo);
postgresCatalog = new PostgresCatalog("postgres", "postgres",
"postgres", pg, null);
mySqlCatalog.open();
sqlServerCatalog.open();
postgresCatalog.open();
}
+ @Test
+ void exists() {
+ Assertions.assertTrue(mySqlCatalog.databaseExists("test"));
+ Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test",
"MY_TABLE")));
+ Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test",
"my_table")));
+ Assertions.assertFalse(mySqlCatalog.tableExists(TablePath.of("test",
"test")));
+ Assertions.assertFalse(mySqlCatalog.databaseExists("mysql"));
+ }
+
@Test
@Order(1)
void getTable() {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
index 1c5fb5a2b2..75b22ec24d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -27,17 +29,24 @@ import java.util.List;
@Disabled("Please Test it in your local environment")
class OracleCatalogTest {
- @Test
- void testCatalog() {
- OracleCatalog catalog =
+
+ static OracleCatalog catalog;
+
+ @BeforeAll
+ static void before() {
+ catalog =
new OracleCatalog(
"oracle",
- "test",
- "oracle",
-
OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521:xe"),
+ "c##gguser",
+ "testdb",
+
OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521/CDC_PDB"),
null);
catalog.open();
+ }
+
+ @Test
+ void testCatalog() {
List<String> strings = catalog.listDatabases();
@@ -45,4 +54,16 @@ class OracleCatalogTest {
catalog.createTable(new TablePath("XE", "TEST", "TEST003"), table,
false);
}
+
+ @Test
+ void exist() {
+ Assertions.assertTrue(catalog.databaseExists("ORCLCDB"));
+ Assertions.assertTrue(catalog.tableExists(TablePath.of("ORCLCDB",
"C##GGUSER", "myTable")));
+ Assertions.assertFalse(catalog.databaseExists("ORCL"));
+ Assertions.assertTrue(
+ catalog.tableExists(
+ TablePath.of("ORCLCDB", "CDC_PDB",
"ads_index_public_health_data")));
+ Assertions.assertTrue(
+ catalog.tableExists(TablePath.of("ORCLCDB", "CDC_PDB",
"ADS_INDEX_DISEASE_DATA")));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
index c04c1941b0..05a013ef69 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -31,15 +33,23 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
class PostgresCatalogTest {
- @Test
- void testCatalog() {
- JdbcUrlUtil.UrlInfo urlInfo =
-
JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest");
- PostgresCatalog catalog =
- new PostgresCatalog("postgres", "postgres", "postgres",
urlInfo, null);
+ static PostgresCatalog catalog;
+
+ @BeforeAll
+ static void before() {
+ catalog =
+ new PostgresCatalog(
+ "postgres",
+ "pg",
+ "pg#2024",
+
JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/postgres"),
+ null);
catalog.open();
+ }
+ @Test
+ void testCatalog() {
MySqlCatalog mySqlCatalog =
new MySqlCatalog(
"mysql",
@@ -59,4 +69,14 @@ class PostgresCatalogTest {
catalog.createTable(
new TablePath("liulitest", "public", "all_types_table_02"),
table, false);
}
+
+ @Test
+ void exists() {
+ Assertions.assertFalse(catalog.databaseExists("postgres"));
+ Assertions.assertFalse(
+ catalog.tableExists(TablePath.of("postgres", "pg_catalog",
"pg_aggregate")));
+ Assertions.assertTrue(catalog.databaseExists("zdykdb"));
+ Assertions.assertTrue(
+ catalog.tableExists(TablePath.of("zdykdb", "pg_catalog",
"pg_class")));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
index ea305ca0c1..a18cc4abd9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
@@ -38,7 +39,7 @@ import java.util.List;
class SqlServerCatalogTest {
static JdbcUrlUtil.UrlInfo sqlParse =
-
SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB");
+
SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1433;database=master");
static JdbcUrlUtil.UrlInfo MysqlUrlInfo =
JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false");
static JdbcUrlUtil.UrlInfo pg =
@@ -84,9 +85,14 @@ class SqlServerCatalogTest {
}
@Test
- void tableExists() {
-
- // boolean b = sqlServerCatalog.tableExists(tablePath);
+ void exists() {
+ Assertions.assertTrue(sqlServerCatalog.databaseExists("master"));
+ Assertions.assertTrue(
+ sqlServerCatalog.tableExists(
+ TablePath.of("master", "dbo",
"MSreplication_options")));
+ Assertions.assertTrue(
+ sqlServerCatalog.tableExists(TablePath.of("master", "dbo",
"spt_fallback_db")));
+
Assertions.assertFalse(sqlServerCatalog.tableExists(TablePath.of("master",
"dbo", "xxx")));
}
@Test
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
index a9b02e2ae3..8c624959f8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql
@@ -55,10 +55,10 @@ CREATE TABLE sink_table WITH (
'user' = 'root',
'password' = 'Abc!@#135_seatunnel',
'generate_sink_sql' = 'true',
- 'database' = 'sink'
+ 'database' = 'sink',
'table' = '${table_name}'
);
-- If it's multi-table synchronization, there's no need to set select columns.
-- You can directly use the syntax 'INSERT INTO sink_table SELECT
source_table'.
-INSERT INTO sink_table SELECT source_table;
\ No newline at end of file
+INSERT INTO sink_table SELECT source_table;