This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8ee129d20f [Improve][Connector-V2] Reuse connection in 
StarRocksCatalog (#7342)
8ee129d20f is described below

commit 8ee129d20f3f200ba04d02623469e3f3008b8c96
Author: Jia Fan <[email protected]>
AuthorDate: Thu Aug 22 10:03:19 2024 +0800

    [Improve][Connector-V2] Reuse connection in StarRocksCatalog (#7342)
---
 .../starrocks/catalog/StarRocksCatalog.java        | 89 ++++++++++++----------
 1 file changed, 50 insertions(+), 39 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 0aee20aa94..9b1875374b 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -78,6 +78,8 @@ public class StarRocksCatalog implements Catalog {
     protected String defaultUrl;
     private final JdbcUrlUtil.UrlInfo urlInfo;
     private final String template;
+    private Connection conn;
+
     private static final Logger LOG = 
LoggerFactory.getLogger(StarRocksCatalog.class);
 
     public StarRocksCatalog(
@@ -99,8 +101,7 @@ public class StarRocksCatalog implements Catalog {
 
     @Override
     public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW 
DATABASES;");
+        try (PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
                 ResultSet rs = ps.executeQuery()) {
             List<String> databases = new ArrayList<>();
 
@@ -122,20 +123,19 @@ public class StarRocksCatalog implements Catalog {
             throw new DatabaseNotExistException(this.catalogName, 
databaseName);
         }
 
-        try (Connection conn =
-                        DriverManager.getConnection(
-                                urlInfo.getUrlWithDatabase(databaseName), 
username, pwd);
-                PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
-                ResultSet rs = ps.executeQuery()) {
-
-            List<String> tables = new ArrayList<>();
-
-            while (rs.next()) {
-                tables.add(rs.getString(1));
+        try (PreparedStatement ps =
+                conn.prepareStatement(
+                        "SELECT TABLE_NAME FROM information_schema.tables "
+                                + "WHERE TABLE_SCHEMA = ? ORDER BY 
TABLE_NAME")) {
+            ps.setString(1, databaseName);
+            try (ResultSet rs = ps.executeQuery()) {
+                List<String> tables = new ArrayList<>();
+                while (rs.next()) {
+                    tables.add(rs.getString(1));
+                }
+                return tables;
             }
-
-            return tables;
-        } catch (Exception e) {
+        } catch (SQLException e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
         }
@@ -148,8 +148,7 @@ public class StarRocksCatalog implements Catalog {
             throw new TableNotExistException(catalogName, tablePath);
         }
 
-        String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
-        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd)) {
+        try {
             Optional<PrimaryKey> primaryKey =
                     getPrimaryKey(tablePath.getDatabaseName(), 
tablePath.getTableName());
 
@@ -213,7 +212,7 @@ public class StarRocksCatalog implements Catalog {
     @Override
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+        try {
             conn.createStatement()
                     .execute(StarRocksSaveModeUtil.getDropTableSql(tablePath, 
ignoreIfNotExists));
         } catch (Exception e) {
@@ -224,7 +223,7 @@ public class StarRocksCatalog implements Catalog {
 
     public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+        try {
             if (ignoreIfNotExists) {
                 conn.createStatement()
                         
.execute(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
@@ -237,8 +236,8 @@ public class StarRocksCatalog implements Catalog {
     }
 
     public void executeSql(TablePath tablePath, String sql) {
-        try (Connection connection = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            connection.createStatement().execute(sql);
+        try {
+            conn.createStatement().execute(sql);
         } catch (Exception e) {
             throw new CatalogException(String.format("Failed EXECUTE SQL in 
catalog %s", sql), e);
         }
@@ -246,8 +245,7 @@ public class StarRocksCatalog implements Catalog {
 
     public boolean isExistsData(TablePath tablePath) {
         String sql = String.format("select * from %s limit 1", 
tablePath.getFullName());
-        try (Connection connection = DriverManager.getConnection(defaultUrl, 
username, pwd);
-                Statement statement = connection.createStatement();
+        try (Statement statement = conn.createStatement();
                 ResultSet resultSet = statement.executeQuery(sql)) {
             if (resultSet == null) {
                 return false;
@@ -262,7 +260,7 @@ public class StarRocksCatalog implements Catalog {
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+        try {
             conn.createStatement()
                     .execute(
                             StarRocksSaveModeUtil.getCreateDatabaseSql(
@@ -276,7 +274,7 @@ public class StarRocksCatalog implements Catalog {
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+        try {
             conn.createStatement()
                     .execute(
                             StarRocksSaveModeUtil.getDropDatabaseSql(
@@ -368,7 +366,7 @@ public class StarRocksCatalog implements Catalog {
 
     public void createTable(String sql)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+        try {
             log.info("create table sql is :{}", sql);
             conn.createStatement().execute(sql);
         } catch (Exception e) {
@@ -418,7 +416,8 @@ public class StarRocksCatalog implements Catalog {
 
     @Override
     public void open() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+        try {
+            conn = DriverManager.getConnection(defaultUrl, username, pwd);
             // test connection, fail early if we cannot connect to database
             conn.getCatalog();
         } catch (SQLException e) {
@@ -432,6 +431,11 @@ public class StarRocksCatalog implements Catalog {
     @Override
     public void close() throws CatalogException {
         LOG.info("Catalog {} closing", catalogName);
+        try {
+            conn.close();
+        } catch (SQLException e) {
+            throw new CatalogException("close doris catalog failed", e);
+        }
     }
 
     @Override
@@ -442,13 +446,12 @@ public class StarRocksCatalog implements Catalog {
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) 
throws SQLException {
 
         List<String> pkFields = new ArrayList<>();
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
-                ResultSet rs =
-                        conn.createStatement()
-                                .executeQuery(
-                                        String.format(
-                                                "SELECT COLUMN_NAME FROM 
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND 
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
-                                                schema, table))) {
+        try (ResultSet rs =
+                conn.createStatement()
+                        .executeQuery(
+                                String.format(
+                                        "SELECT COLUMN_NAME FROM 
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND 
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
+                                        schema, table))) {
             while (rs.next()) {
                 String columnName = rs.getString("COLUMN_NAME");
                 pkFields.add(columnName);
@@ -471,11 +474,19 @@ public class StarRocksCatalog implements Catalog {
 
     @Override
     public boolean tableExists(TablePath tablePath) throws CatalogException {
-        try {
-            return databaseExists(tablePath.getDatabaseName())
-                    && 
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
-        } catch (DatabaseNotExistException e) {
-            return false;
+        try (PreparedStatement ps =
+                conn.prepareStatement(
+                        "SELECT TABLE_NAME FROM information_schema.tables "
+                                + "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
+                                + "ORDER BY TABLE_NAME")) {
+            ps.setString(1, tablePath.getDatabaseName());
+            ps.setString(2, tablePath.getTableName());
+            try (ResultSet rs = ps.executeQuery()) {
+                return rs.next();
+            }
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("check table [%s] exists failed", 
tablePath.getFullName()), e);
         }
     }
 

Reply via email to