This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch revert-4540-refactor-catalog in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 4c9d4bb3b5a70f0e5493b27ccce1addf168660ee Author: hailin0 <[email protected]> AuthorDate: Thu Apr 20 15:03:00 2023 +0800 Revert "[Improve][Catalog] refactor catalog (#4540)" This reverts commit b0a701cb83a2353da3b1f93a483c19cddd55747c. --- .../jdbc/catalog/AbstractJdbcCatalog.java | 234 +-------------------- .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 219 ++++++++++++++++++- .../catalog/mysql/MysqlCreateTableSqlBuilder.java | 2 +- .../jdbc/catalog/sqlserver/SqlServerCatalog.java | 228 +++++++++++++++++++- .../jdbc/catalog/sqlserver/SqlServerType.java | 2 +- .../jdbc/internal/dialect/JdbcDialect.java | 38 ---- .../jdbc/internal/dialect/mysql/MysqlDialect.java | 8 - .../dialect/sqlserver/SqlServerDialect.java | 30 --- .../connector-starrocks/pom.xml | 5 - .../starrocks/catalog/StarRocksCatalog.java | 177 +++++++++++++--- .../starrocks/catalog/StarRocksDialect.java | 27 --- 11 files changed, 605 insertions(+), 365 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index 4fb68775f..bd516e325 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -21,20 +21,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.ConstraintKey; -import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -46,19 +40,14 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -66,6 +55,7 @@ import static com.google.common.base.Preconditions.checkNotNull; public abstract class AbstractJdbcCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); + protected final String catalogName; protected final String defaultDatabase; protected final String username; @@ -73,8 +63,6 @@ public abstract class AbstractJdbcCatalog implements Catalog { protected final String baseUrl; protected final String suffix; protected final String defaultUrl; - protected final JdbcDialect jdbcDialect; - protected static final Set<String> SYS_DATABASES = new HashSet<>(); public AbstractJdbcCatalog( String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { @@ -92,7 +80,6 @@ public abstract class AbstractJdbcCatalog implements Catalog { this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; this.defaultUrl = urlInfo.getOrigin(); this.suffix = urlInfo.getSuffix(); - this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl); } @Override @@ -120,7 +107,6 @@ public abstract class AbstractJdbcCatalog implements Catalog { public void open() throws CatalogException { try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { // test connection, fail early if we cannot connect to database - conn.getCatalog(); } catch (SQLException e) { throw new CatalogException( String.format("Failed connecting to %s via JDBC.", defaultUrl), e); @@ -134,56 +120,6 @@ public abstract class AbstractJdbcCatalog implements Catalog { LOG.info("Catalog {} closing", catalogName); } - @Override - public List<String> listDatabases() throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { - - PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases()); - - List<String> databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - if (!getSysDatabases().contains(databaseName)) { - databases.add(rs.getString(1)); - } - } - - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } - } - - @Override - public List<String> listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) { - - ResultSet rs = ps.executeQuery(); - - List<String> tables = new ArrayList<>(); - - while (rs.next()) { - tables.add(jdbcDialect.getTableName(rs)); - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } - } - protected Optional<PrimaryKey> getPrimaryKey( DatabaseMetaData metaData, String database, String table) throws SQLException { return getPrimaryKey(metaData, database, table, table); @@ -290,8 +226,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { public boolean tableExists(TablePath tablePath) throws CatalogException { try { return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(jdbcDialect.getTableName(tablePath)); + && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName()); } catch (DatabaseNotExistException e) { return false; } @@ -310,86 +245,8 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } - - String dbUrl = - jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { - DatabaseMetaData metaData = conn.getMetaData(); - Optional<PrimaryKey> primaryKey = - getPrimaryKey( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - List<ConstraintKey> constraintKeys = - getConstraintKeys( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - - try (PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM %s WHERE 1 = 0;", - tablePath.getFullNameWithQuoted("\"")))) { - ResultSetMetaData tableMetaData = ps.getMetaData(); - TableSchema.Builder builder = TableSchema.builder(); - // add column - for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { - String columnName = tableMetaData.getColumnName(i); - SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); - int columnDisplaySize = tableMetaData.getColumnDisplaySize(i); - String comment = tableMetaData.getColumnLabel(i); - boolean isNullable = - tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable; - Object defaultValue = - getColumnDefaultValue( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName(), - columnName) - .orElse(null); - - PhysicalColumn physicalColumn = - PhysicalColumn.of( - columnName, - type, - columnDisplaySize, - isNullable, - defaultValue, - comment); - builder.column(physicalColumn); - } - // add primary key - primaryKey.ifPresent(builder::primaryKey); - // add constraint key - constraintKeys.forEach(builder::constraintKey); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - ""); - } - - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } - } + protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table) + throws CatalogException; @Override public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) @@ -400,20 +257,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - String dbUrl = - jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - jdbcDialect.getDropTableSql(tablePath.getFullName()))) { - // Will there exist concurrent drop for one table? - return ps.execute(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed dropping table %s", tablePath.getFullName()), e); - } - } + protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException; @Override public void createDatabase(TablePath tablePath, boolean ignoreIfExists) @@ -429,6 +273,8 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } + protected abstract boolean createDatabaseInternal(String databaseName); + @Override public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { @@ -440,69 +286,5 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } - protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) - throws SQLException { - return null; - } - - protected Set<String> getSysDatabases() { - return SYS_DATABASES; - } - - protected Map<String, String> buildConnectorOptions(TablePath tablePath) { - Map<String, String> options = new HashMap<>(8); - options.put("connector", "jdbc"); - options.put( - "url", - jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix)); - options.put("table-name", tablePath.getFullName()); - options.put("username", username); - options.put("password", pwd); - return options; - } - - protected boolean createDatabaseInternal(String databaseName) { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format(jdbcDialect.createDatabaseSql(databaseName)))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed creating database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed dropping database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - - // todo: If the origin source is mysql, we can directly use create table like to create the - // target table? - protected boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException { - String dbUrl = - jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix); - String createTableSql = jdbcDialect.createTableSql(tablePath, table); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = conn.prepareStatement(createTableSql)) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed creating table %s", tablePath.getFullName()), e); - } - } + protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index f6c9cc679..d8534d5df 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -18,6 +18,16 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -25,13 +35,26 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo import com.mysql.cj.MysqlType; import com.mysql.cj.jdbc.result.ResultSetImpl; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; public class MySqlCatalog extends AbstractJdbcCatalog { + protected static final Set<String> SYS_DATABASES = new HashSet<>(4); + static { SYS_DATABASES.add("information_schema"); SYS_DATABASES.add("mysql"); @@ -44,12 +67,189 @@ public class MySqlCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo); } + @Override + public List<String> listDatabases() throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) { + + List<String> databases = new ArrayList<>(); + ResultSet rs = ps.executeQuery(); + + while (rs.next()) { + String databaseName = rs.getString(1); + if (!SYS_DATABASES.contains(databaseName)) { + databases.add(rs.getString(1)); + } + } + + return databases; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", this.catalogName), e); + } + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + String dbUrl = getUrlFromDatabaseName(databaseName); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) { + + ResultSet rs = ps.executeQuery(); + + List<String> tables = new ArrayList<>(); + + while (rs.next()) { + tables.add(rs.getString(1)); + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional<PrimaryKey> primaryKey = + getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName()); + List<ConstraintKey> constraintKeys = + getConstraintKeys( + metaData, tablePath.getDatabaseName(), tablePath.getTableName()); + + try (PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM %s WHERE 1 = 0;", + tablePath.getFullNameWithQuoted()))) { + ResultSetMetaData tableMetaData = ps.getMetaData(); + TableSchema.Builder builder = TableSchema.builder(); + // add column + for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { + String columnName = tableMetaData.getColumnName(i); + SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); + int columnDisplaySize = tableMetaData.getColumnDisplaySize(i); + String comment = tableMetaData.getColumnLabel(i); + boolean isNullable = + tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable; + Object defaultValue = + getColumnDefaultValue(metaData, tablePath.getTableName(), columnName) + .orElse(null); + + PhysicalColumn physicalColumn = + PhysicalColumn.of( + columnName, + type, + columnDisplaySize, + isNullable, + defaultValue, + comment); + builder.column(physicalColumn); + } + // add primary key + primaryKey.ifPresent(builder::primaryKey); + // add constraint key + constraintKeys.forEach(builder::constraintKey); + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + ""); + } + + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + // todo: If the origin source is mysql, we can directly use create table like to create the + // target table? + @Override + protected boolean createTableInternal(TablePath tablePath, CatalogTable table) + throws CatalogException { + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build(); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement(createTableSql)) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed creating table %s", tablePath.getFullName()), e); + } + } + + @Override + protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) { + // Will there exist concurrent drop for one table? + return ps.execute(); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed dropping table %s", tablePath.getFullName()), e); + } + } + + @Override + protected boolean createDatabaseInternal(String databaseName) throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("CREATE DATABASE `%s`;", databaseName))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed creating database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + @Override + protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed dropping database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + /** * @see com.mysql.cj.MysqlType * @see ResultSetImpl#getObjectStoredProc(int, int) */ - @Override - public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) + @SuppressWarnings("unchecked") + private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException { MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex)); Map<String, Object> dataTypeProperties = new HashMap<>(); @@ -57,4 +257,19 @@ public class MySqlCatalog extends AbstractJdbcCatalog { dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex)); return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties); } + + @SuppressWarnings("MagicNumber") + private Map<String, String> buildConnectorOptions(TablePath tablePath) { + Map<String, String> options = new HashMap<>(8); + options.put("connector", "jdbc"); + options.put("url", baseUrl + tablePath.getDatabaseName()); + options.put("table-name", tablePath.getFullName()); + options.put("username", username); + options.put("password", pwd); + return options; + } + + private String getUrlFromDatabaseName(String databaseName) { + return baseUrl + databaseName + suffix; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index c2e300516..9a015ca73 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -49,7 +49,7 @@ public class MysqlCreateTableSqlBuilder { private List<ConstraintKey> constraintKeys; - private final MysqlDataTypeConvertor mysqlDataTypeConvertor; + private MysqlDataTypeConvertor mysqlDataTypeConvertor; private MysqlCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index f15a0e746..25c02e6b1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -19,21 +19,41 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; import org.apache.commons.lang3.tuple.Pair; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; public class SqlServerCatalog extends AbstractJdbcCatalog { + private static final Set<String> SYS_DATABASES = new HashSet<>(4); + static { SYS_DATABASES.add("master"); SYS_DATABASES.add("tempdb"); @@ -46,6 +66,150 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo); } + @Override + public List<String> listDatabases() throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) { + + List<String> databases = new ArrayList<>(); + ResultSet rs = ps.executeQuery(); + + while (rs.next()) { + String databaseName = rs.getString(1); + if (!SYS_DATABASES.contains(databaseName)) { + databases.add(databaseName); + } + } + + return databases; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", this.catalogName), e); + } + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + String dbUrl = getUrlFromDatabaseName(databaseName); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + "SELECT TABLE_SCHEMA, TABLE_NAME FROM " + + databaseName + + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) { + + ResultSet rs = ps.executeQuery(); + + List<String> tables = new ArrayList<>(); + + while (rs.next()) { + tables.add(rs.getString(1) + "." + rs.getString(2)); + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + try { + return databaseExists(tablePath.getDatabaseName()) + && listTables(tablePath.getDatabaseName()) + .contains(tablePath.getSchemaAndTableName()); + } catch (DatabaseNotExistException e) { + return false; + } + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional<PrimaryKey> primaryKey = + getPrimaryKey( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + List<ConstraintKey> constraintKeys = + getConstraintKeys( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + + try (PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM %s WHERE 1 = 0;", + tablePath.getFullNameWithQuoted("\"")))) { + ResultSetMetaData tableMetaData = ps.getMetaData(); + TableSchema.Builder builder = TableSchema.builder(); + // add column + for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { + String columnName = tableMetaData.getColumnName(i); + SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); + int columnDisplaySize = tableMetaData.getColumnDisplaySize(i); + String comment = tableMetaData.getColumnLabel(i); + boolean isNullable = + tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable; + Object defaultValue = + getColumnDefaultValue( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName(), + columnName) + .orElse(null); + + PhysicalColumn physicalColumn = + PhysicalColumn.of( + columnName, + type, + columnDisplaySize, + isNullable, + defaultValue, + comment); + builder.column(physicalColumn); + } + // add primary key + primaryKey.ifPresent(builder::primaryKey); + // add constraint key + constraintKeys.forEach(builder::constraintKey); + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + ""); + } + + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + @Override protected boolean createTableInternal(TablePath tablePath, CatalogTable table) throws CatalogException { @@ -53,7 +217,54 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { } @Override - public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) + protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) { + // Will there exist concurrent drop for one table? + return ps.execute(); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed dropping table %s", tablePath.getFullName()), e); + } + } + + @Override + protected boolean createDatabaseInternal(String databaseName) throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("CREATE DATABASE `%s`", databaseName))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed creating database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + @Override + protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed dropping database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + @SuppressWarnings("unchecked") + private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException { Pair<SqlServerType, Map<String, Object>> pair = SqlServerType.parse(metadata.getColumnTypeName(colIndex)); @@ -63,4 +274,19 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex)); return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties); } + + @SuppressWarnings("MagicNumber") + private Map<String, String> buildConnectorOptions(TablePath tablePath) { + Map<String, String> options = new HashMap<>(8); + options.put("connector", "jdbc"); + options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName())); + options.put("table-name", tablePath.getFullName()); + options.put("username", username); + options.put("password", pwd); + return options; + } + + private String getUrlFromDatabaseName(String databaseName) { + return baseUrl + ";databaseName=" + databaseName + ";" + suffix; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java index 0b0fa91f2..e848498c9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java @@ -140,7 +140,7 @@ public enum SqlServerType implements SQLType { public static Pair<SqlServerType, Map<String, Object>> parse(String fullTypeName) { Map<String, Object> params = new HashMap<>(); String typeName = fullTypeName; - if (fullTypeName.contains("(")) { + if (fullTypeName.indexOf("(") != -1) { typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim(); String paramsStr = fullTypeName.substring( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 36e47eda1..64ed388df 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; @@ -195,40 +193,4 @@ public interface JdbcDialect extends Serializable { PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery()); return ps.getMetaData(); } - - default String listDatabases() { - return "SHOW DATABASES;"; - } - - default String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) { - return baseUrl + databaseName + suffix; - } - - default String createDatabaseSql(String databaseName) { - return String.format("CREATE DATABASE IF NOT EXISTS %s;", quoteIdentifier(databaseName)); - } - - default String dropDatabaseSql(String databaseName) { - return String.format("DROP DATABASE IF EXISTS %s;", quoteIdentifier(databaseName)); - } - - default String getTableName(ResultSet rs) throws SQLException { - return rs.getString(1); - } - - default String getTableName(TablePath tablePath) { - return tablePath.getTableName(); - } - - default String listTableSql(String databaseName) { - return "SHOW TABLES;"; - } - - default String getDropTableSql(String tableName) { - return String.format("DROP TABLE %s IF EXIST;", tableName); - } - - default String createTableSql(TablePath tablePath, CatalogTable catalogTable) { - return ""; - } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 63584d8ab..128b8ae4b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -17,9 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; @@ -81,9 +78,4 @@ public class MysqlDialect implements JdbcDialect { statement.setFetchSize(Integer.MIN_VALUE); return statement; } - - @Override - public String createTableSql(TablePath tablePath, CatalogTable catalogTable) { - return MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build(); - } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java index 90b376624..697d2d2dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -17,13 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver; -import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -103,31 +100,4 @@ public class SqlServerDialect implements JdbcDialect { return Optional.of(upsertSQL); } - - @Override - public String listDatabases() { - return "SELECT NAME FROM SYS.DATABASES"; - } - - @Override - public String listTableSql(String databaseName) { - return "SELECT TABLE_SCHEMA, TABLE_NAME FROM " - + databaseName - + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'"; - } - - @Override - public String getTableName(ResultSet rs) throws SQLException { - return rs.getString(1) + "." + rs.getString(2); - } - - @Override - public String getTableName(TablePath tablePath) { - return tablePath.getSchemaName() + "." + tablePath.getTableName(); - } - - @Override - public String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) { - return baseUrl + ";databaseName=" + databaseName + ";" + suffix; - } } diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml index 44b899885..08e49bc0f 100644 --- a/seatunnel-connectors-v2/connector-starrocks/pom.xml +++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml @@ -49,11 +49,6 @@ <artifactId>connector-common</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.seatunnel</groupId> - <artifactId>connector-jdbc</artifactId> - <version>${project.version}</version> - </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java index 79d9bf901..7bf308b1c 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java @@ -17,9 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog; +import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; @@ -32,7 +36,6 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.JdbcUrlUtil; -import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; import org.apache.commons.lang3.StringUtils; @@ -44,18 +47,22 @@ import com.mysql.cj.MysqlType; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; -public class StarRocksCatalog extends AbstractJdbcCatalog { +public class StarRocksCatalog implements Catalog { protected final String catalogName; protected String defaultDatabase = "information_schema"; @@ -64,6 +71,8 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { protected final String baseUrl; protected String defaultUrl; private final JdbcUrlUtil.UrlInfo urlInfo; + + private static final Set<String> SYS_DATABASES = new HashSet<>(); private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class); static { @@ -73,8 +82,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) { - super(catalogName, username, pwd, JdbcUrlUtil.getUrlInfo(defaultUrl)); - checkArgument(StringUtils.isNotBlank(username)); checkArgument(StringUtils.isNotBlank(pwd)); checkArgument(StringUtils.isNotBlank(defaultUrl)); @@ -90,9 +97,104 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { } @Override - protected boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException { - throw new UnsupportedOperationException("Unsupported create table"); + public List<String> listDatabases() throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + + PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;"); + + List<String> databases = new ArrayList<>(); + ResultSet rs = ps.executeQuery(); + + while (rs.next()) { + String databaseName = rs.getString(1); + if (!SYS_DATABASES.contains(databaseName)) { + databases.add(rs.getString(1)); + } + } + + return databases; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", this.catalogName), e); + } + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + try (Connection conn = + DriverManager.getConnection( + urlInfo.getUrlWithDatabase(databaseName), username, pwd)) { + PreparedStatement ps = conn.prepareStatement("SHOW TABLES;"); + + ResultSet rs = ps.executeQuery(); + + List<String> tables = new ArrayList<>(); + + while (rs.next()) { + tables.add(rs.getString(1)); + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + + String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName()); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + Optional<PrimaryKey> primaryKey = + getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName()); + + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM %s WHERE 1 = 0;", + tablePath.getFullNameWithQuoted())); + + ResultSetMetaData tableMetaData = ps.getMetaData(); + + TableSchema.Builder builder = TableSchema.builder(); + for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { + SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); + // TODO add default value and test it + builder.column( + PhysicalColumn.of( + tableMetaData.getColumnName(i), + type, + tableMetaData.getColumnDisplaySize(i), + tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable, + null, + tableMetaData.getColumnLabel(i))); + } + + primaryKey.ifPresent(builder::primaryKey); + + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + ""); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } } @Override @@ -107,11 +209,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { throw new UnsupportedOperationException(); } - @Override - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - throw new UnsupportedOperationException(); - } - @Override public void createDatabase(TablePath tablePath, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException { @@ -132,11 +229,6 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { } } - @Override - protected boolean createDatabaseInternal(String databaseName) { - throw new UnsupportedOperationException(); - } - @Override public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { @@ -154,14 +246,8 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { } } - @Override - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - throw new UnsupportedOperationException(); - } - /** @see com.mysql.cj.MysqlType */ - @Override - public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) + private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException { MysqlType starrocksType = MysqlType.getByName(metadata.getColumnTypeName(colIndex)); switch (starrocksType) { @@ -229,8 +315,7 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { } @SuppressWarnings("MagicNumber") - @Override - public Map<String, String> buildConnectorOptions(TablePath tablePath) { + private Map<String, String> buildConnectorOptions(TablePath tablePath) { Map<String, String> options = new HashMap<>(8); options.put("connector", "starrocks"); options.put("url", baseUrl + tablePath.getDatabaseName()); @@ -284,6 +369,29 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { return res; } + @Override + public String getDefaultDatabase() { + return defaultDatabase; + } + + @Override + public void open() throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + // test connection, fail early if we cannot connect to database + conn.getCatalog(); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed connecting to %s via JDBC.", defaultUrl), e); + } + + LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl); + } + + @Override + public void close() throws CatalogException { + LOG.info("Catalog {} closing", catalogName); + } + protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException { List<String> pkFields = new ArrayList<>(); @@ -306,4 +414,21 @@ public class StarRocksCatalog extends AbstractJdbcCatalog { } return Optional.empty(); } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(StringUtils.isNotBlank(databaseName)); + + return listDatabases().contains(databaseName); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + try { + return databaseExists(tablePath.getDatabaseName()) + && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName()); + } catch (DatabaseNotExistException e) { + return false; + } + } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java deleted file mode 100644 index 9286e628f..000000000 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog; - -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect; - -public class StarRocksDialect extends MysqlDialect { - @Override - public String dialectName() { - return "StarRocks"; - } -}
